[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r19484 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r19484 - gnunet/src/transport |
Date: |
Fri, 27 Jan 2012 14:48:31 +0100 |
Author: wachs
Date: 2012-01-27 14:48:30 +0100 (Fri, 27 Jan 2012)
New Revision: 19484
Modified:
gnunet/src/transport/plugin_transport_unix.c
Log:
complete select write implementation
Modified: gnunet/src/transport/plugin_transport_unix.c
===================================================================
--- gnunet/src/transport/plugin_transport_unix.c 2012-01-27 13:21:36 UTC
(rev 19483)
+++ gnunet/src/transport/plugin_transport_unix.c 2012-01-27 13:48:30 UTC
(rev 19484)
@@ -64,6 +64,8 @@
*/
#define UNIX_NAT_DEFAULT_PORT 22086
+#define MAX_RETRIES 5
+
GNUNET_NETWORK_STRUCT_BEGIN
/**
@@ -83,22 +85,26 @@
};
-struct RetryList
+struct UNIXMessageWrapper
{
- /**
- * Pointer to next element.
- */
- struct RetryList *next;
+ struct UNIXMessageWrapper *next;
+ struct UNIXMessageWrapper *prev;
- /**
- * Pointer to previous element.
- */
- struct RetryList *prev;
+ struct UNIXMessage * msg;
+ size_t msgsize;
- /**
- * The actual retry context.
- */
- struct RetrySendContext *retry_ctx;
+ int retry_counter;
+
+ struct GNUNET_PeerIdentity target;
+
+ struct GNUNET_TIME_Relative timeout;
+ unsigned int priority;
+
+ void *addr;
+ size_t addrlen;
+ struct Session *session;
+ GNUNET_TRANSPORT_TransmitContinuation cont;
+ void *cont_cls;
};
/**
@@ -339,6 +345,9 @@
*/
char *unix_socket_path;
+ struct UNIXMessageWrapper *msg_head;
+ struct UNIXMessageWrapper *msg_tail;
+
/**
* ATS network
*/
@@ -346,17 +355,6 @@
};
/**
- * Head of retry DLL.
- */
-static struct RetryList *retry_list_head;
-
-/**
- * Tail of retry DLL.
- */
-static struct RetryList *retry_list_tail;
-
-
-/**
* Disconnect from a remote node. Clean up session if we have one for this
peer
*
* @param cls closure for this call (should be handle to Plugin)
@@ -383,21 +381,16 @@
unix_transport_server_stop (void *cls)
{
struct Plugin *plugin = cls;
- struct RetryList *pos;
- pos = retry_list_head;
+ struct UNIXMessageWrapper * msgw = plugin->msg_head;
- while (NULL != (pos = retry_list_head))
+ while (NULL != (msgw = plugin->msg_head))
{
- GNUNET_CONTAINER_DLL_remove (retry_list_head, retry_list_tail, pos);
- if (GNUNET_SCHEDULER_NO_TASK != pos->retry_ctx->retry_task)
- {
- GNUNET_SCHEDULER_cancel (pos->retry_ctx->retry_task);
- }
- GNUNET_free (pos->retry_ctx->msg);
- GNUNET_free (pos->retry_ctx->addr);
- GNUNET_free (pos->retry_ctx);
- GNUNET_free (pos);
+ GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
+ if (msgw->cont != NULL)
+ msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR);
+ GNUNET_free (msgw->msg);
+ GNUNET_free (msgw);
}
if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
@@ -441,34 +434,6 @@
void *cont_cls);
/**
- * Retry sending a message.
- *
- * @param cls closure a struct RetrySendContext
- * @param tc context information
- */
-void
-retry_send_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct RetrySendContext *retry_ctx = cls;
-
- if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
- {
- GNUNET_free (retry_ctx->msg);
- GNUNET_free (retry_ctx->addr);
- GNUNET_free (retry_ctx);
- return;
- }
-
- unix_real_send (retry_ctx->plugin, retry_ctx, retry_ctx->send_handle,
- &retry_ctx->target, retry_ctx->msg, retry_ctx->msg_size,
- retry_ctx->priority,
- GNUNET_TIME_absolute_get_remaining (retry_ctx->timeout),
- retry_ctx->addr, retry_ctx->addrlen, retry_ctx->cont,
- retry_ctx->cont_cls);
- return;
-}
-
-/**
* Actually send out the message, assume we've got the address and
* send_handle squared away!
*
@@ -499,16 +464,12 @@
size_t addrlen, GNUNET_TRANSPORT_TransmitContinuation cont,
void *cont_cls)
{
- struct Plugin *plugin = cls;
- struct UNIXMessage *message;
- struct RetrySendContext *retry_ctx;
- int ssize;
+
ssize_t sent;
const void *sb;
size_t sbs;
struct sockaddr_un un;
size_t slen;
- struct RetryList *retry_list_entry;
int retry;
if (send_handle == NULL)
@@ -533,16 +494,6 @@
return 0; /* Can never send if we don't have an
address!! */
}
- /* Build the message to be sent */
- message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
- ssize = sizeof (struct UNIXMessage) + msgbuf_size;
-
- message->header.size = htons (ssize);
- message->header.type = htons (0);
- memcpy (&message->sender, plugin->env->my_identity,
- sizeof (struct GNUNET_PeerIdentity));
- memcpy (&message[1], msgbuf, msgbuf_size);
-
memset (&un, 0, sizeof (un));
un.sun_family = AF_UNIX;
slen = strlen (addr) + 1;
@@ -562,9 +513,8 @@
sb = (struct sockaddr *) &un;
sbs = slen;
retry = GNUNET_NO;
+ sent = GNUNET_NETWORK_socket_sendto (send_handle, msgbuf, msgbuf_size, sb,
sbs);
- sent = GNUNET_NETWORK_socket_sendto (send_handle, message, ssize, sb, sbs);
-
if ((GNUNET_SYSERR == sent) && ((errno == EAGAIN) || (errno == ENOBUFS)))
retry = GNUNET_YES;
@@ -577,14 +527,14 @@
send_handle, SOL_SOCKET, SO_SNDBUF,
&size,
&len);
- if (size < ssize)
+ if (size < msgbuf_size)
{
#if DEBUG_UNIX
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Trying to increase socket buffer size from %i to %i for
message size %i\n",
size, ((ssize / 1000) + 2) * 1000, ssize);
#endif
- size = ((ssize / 1000) + 2) * 1000;
+ size = ((msgbuf_size / 1000) + 2) * 1000;
if (GNUNET_NETWORK_socket_setsockopt
((struct GNUNET_NETWORK_Handle *) send_handle, SOL_SOCKET, SO_SNDBUF,
&size, sizeof (size)) == GNUNET_OK)
@@ -594,71 +544,31 @@
}
}
- if (retry == GNUNET_YES)
- {
- if (incoming_retry_context == NULL)
- {
- retry_list_entry = GNUNET_malloc (sizeof (struct RetryList));
- retry_ctx = GNUNET_malloc (sizeof (struct RetrySendContext));
- retry_ctx->addr = GNUNET_malloc (addrlen);
- retry_ctx->msg = GNUNET_malloc (msgbuf_size);
- retry_ctx->plugin = plugin;
- memcpy (retry_ctx->addr, addr, addrlen);
- memcpy (retry_ctx->msg, msgbuf, msgbuf_size);
- retry_ctx->msg_size = msgbuf_size;
- retry_ctx->addrlen = addrlen;
- retry_ctx->send_handle = send_handle;
- retry_ctx->cont = cont;
- retry_ctx->cont_cls = cont_cls;
- retry_ctx->priority = priority;
- retry_ctx->timeout = GNUNET_TIME_relative_to_absolute (timeout);
- memcpy (&retry_ctx->target, target, sizeof (struct GNUNET_PeerIdentity));
- retry_ctx->delay = GNUNET_TIME_UNIT_MILLISECONDS;
- retry_ctx->retry_list_entry = retry_list_entry;
- retry_list_entry->retry_ctx = retry_ctx;
- GNUNET_CONTAINER_DLL_insert (retry_list_head, retry_list_tail,
- retry_list_entry);
- }
- else
- {
- retry_ctx = incoming_retry_context;
- retry_ctx->delay = GNUNET_TIME_relative_multiply (retry_ctx->delay, 2);
- }
- retry_ctx->retry_task =
- GNUNET_SCHEDULER_add_delayed (retry_ctx->delay, &retry_send_message,
- retry_ctx);
-
- //GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
- GNUNET_free (message);
- return ssize;
- }
#if DEBUG_UNIX
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"UNIX transmit %u-byte message to %s (%d: %s)\n",
- (unsigned int) ssize, GNUNET_a2s (sb, sbs), (int) sent,
+ (unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent,
(sent < 0) ? STRERROR (errno) : "ok");
#endif
+ /* Calling continuation */
if (cont != NULL)
{
- if (sent == GNUNET_SYSERR)
+ if ((sent == GNUNET_SYSERR) && (retry == GNUNET_NO))
cont (cont_cls, target, GNUNET_SYSERR);
- else
- {
+ if (sent > 0)
cont (cont_cls, target, GNUNET_OK);
- }
}
- if (incoming_retry_context != NULL)
- {
- GNUNET_CONTAINER_DLL_remove (retry_list_head, retry_list_tail,
- incoming_retry_context->retry_list_entry);
- GNUNET_free (incoming_retry_context->retry_list_entry);
- GNUNET_free (incoming_retry_context->msg);
- GNUNET_free (incoming_retry_context->addr);
- GNUNET_free (incoming_retry_context);
- }
+ /* return number of bytes successfully sent */
+ if (sent > 0)
+ return sent;
+ /* failed and retry: return 0 */
+ if ((GNUNET_SYSERR == sent) && (retry == GNUNET_YES))
+ return 0;
+ /* failed and no retry: return -1 */
+ if ((GNUNET_SYSERR == sent) && (retry == GNUNET_NO))
+ return -1;
- GNUNET_free (message);
return sent;
}
@@ -756,25 +666,47 @@
GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
{
struct Plugin *plugin = cls;
- ssize_t sent;
+ struct UNIXMessage *message;
+ struct UNIXMessageWrapper *wrapper;
+ int ssize;
GNUNET_assert (NULL == session);
+ /* Build the message to be sent */
+ wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper) + addrlen);
+ message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
+ ssize = sizeof (struct UNIXMessage) + msgbuf_size;
+
#if DEBUG_UNIX
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to send message to `%s'\n",
(char *) addr);
#endif
- sent =
- unix_real_send (cls, NULL, plugin->unix_sock.desc, target, msgbuf,
- msgbuf_size, priority, timeout, addr, addrlen, cont,
- cont_cls);
+
+ message->header.size = htons (ssize);
+ message->header.type = htons (0);
+ memcpy (&message->sender, plugin->env->my_identity,
+ sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&message[1], msgbuf, msgbuf_size);
+
+ wrapper->msg = message;
+ wrapper->msgsize = ssize;
+ wrapper->priority = priority;
+ wrapper->timeout = timeout;
+ wrapper->cont = cont;
+ wrapper->cont_cls = cont_cls;
+ wrapper->addr = &wrapper[1];
+ wrapper->addrlen = addrlen;
+ wrapper->retry_counter = 0;
+ memcpy (&wrapper->target, target, sizeof (struct GNUNET_PeerIdentity));
+ memcpy (&wrapper[1], addr, addrlen);
+
+ GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper);
+
#if DEBUG_UNIX
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", sent,
(char *) addr);
#endif
- if (sent == GNUNET_SYSERR)
- return 0;
- return sent;
+ return ssize;
}
@@ -880,8 +812,53 @@
static void
unix_plugin_select_write (struct Plugin * plugin)
{
+ int sent = 0;
+ struct UNIXMessageWrapper * msgw = plugin->msg_head;
+ sent = unix_real_send (plugin, NULL,
+ plugin->unix_sock.desc,
+ &msgw->target,
+ (const char *) msgw->msg,
+ msgw->msgsize,
+ msgw->priority,
+ msgw->timeout,
+ msgw->addr,
+ msgw->addrlen,
+ msgw->cont, msgw->cont_cls);
+ /* successfully sent bytes */
+ if (sent > 0)
+ {
+ GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+ GNUNET_free (msgw);
+ return;
+ }
+
+ /* max retries */
+ if (msgw->retry_counter > MAX_RETRIES)
+ {
+ msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR);
+ GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+ GNUNET_break (0);
+ GNUNET_free (msgw);
+ return;
+ }
+
+ /* failed and no retry */
+ if (sent == -1)
+ {
+ GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+ GNUNET_free (msgw);
+ return;
+ }
+
+ /* failed and retry */
+ if (sent == 0)
+ {
+ msgw->retry_counter++;
+ return;
+ }
+
}
/*
@@ -907,7 +884,8 @@
{
GNUNET_assert (GNUNET_NETWORK_fdset_isset
(tc->write_ready, plugin->unix_sock.desc));
- unix_plugin_select_write (plugin);
+ if (plugin->msg_head != NULL)
+ unix_plugin_select_write (plugin);
}
if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r19484 - gnunet/src/transport,
gnunet <=