[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r16473 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r16473 - gnunet/src/transport |
Date: |
Fri, 12 Aug 2011 17:49:57 +0200 |
Author: grothoff
Date: 2011-08-12 17:49:57 +0200 (Fri, 12 Aug 2011)
New Revision: 16473
Modified:
gnunet/src/transport/gnunet-service-transport_neighbours.c
gnunet/src/transport/plugin_transport_tcp.c
Log:
finishing neighbours
Modified: gnunet/src/transport/gnunet-service-transport_neighbours.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.c 2011-08-12
15:00:47 UTC (rev 16472)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.c 2011-08-12
15:49:57 UTC (rev 16473)
@@ -26,6 +26,7 @@
#include "platform.h"
#include "gnunet_ats_service.h"
#include "gnunet-service-transport_neighbours.h"
+#include "gnunet-service-transport_plugins.h"
#include "gnunet-service-transport_validation.h"
#include "gnunet-service-transport.h"
#include "gnunet_peerinfo_service.h"
@@ -45,15 +46,12 @@
#define QUOTA_VIOLATION_DROP_THRESHOLD 10
-// TODO:
-// - have a way to access the currently 'connected' session
-// (for sending and to notice disconnect of it!)
-// - have a way to access/update bandwidth/quota information per peer
-// (for CostReport/TrafficReport callbacks)
+/**
+ * Entry in neighbours.
+ */
+struct NeighbourMapEntry;
-struct NeighbourMapEntry;
-
/**
* For each neighbour we keep a list of messages
* that we still want to transmit to the neighbour.
@@ -72,6 +70,12 @@
struct MessageQueue *prev;
/**
+ * Once this message is actively being transmitted, which
+ * neighbour is it associated with?
+ */
+ struct NeighbourMapEntry *n;
+
+ /**
* Function to call once we're done.
*/
GST_NeighbourSendContinuation cont;
@@ -130,6 +134,11 @@
struct GNUNET_TRANSPORT_ATS_Information *ats;
/**
+ * Are we currently trying to send a message? If so, which one?
+ */
+ struct MessageQueue *is_active;
+
+ /**
* Active session for communicating with the peer.
*/
struct Session *session;
@@ -161,20 +170,12 @@
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
/**
- * ID of task scheduled to run when we should retry transmitting
- * the head of the message queue. Actually triggered when the
- * transmission is timing out (we trigger instantly when we have
- * a chance of success).
+ * ID of task scheduled to run when we should try transmitting
+ * the head of the message queue.
*/
- GNUNET_SCHEDULER_TaskIdentifier retry_task;
+ GNUNET_SCHEDULER_TaskIdentifier transmission_task;
/**
- * How long until we should consider this peer dead (if we don't
- * receive another message in the meantime)?
- */
- struct GNUNET_TIME_Absolute peer_timeout;
-
- /**
* Tracker for inbound bandwidth.
*/
struct GNUNET_BANDWIDTH_Tracker in_tracker;
@@ -192,15 +193,9 @@
unsigned int ats_count;
/**
- * Have we seen an PONG from this neighbour in the past (and
- * not had a disconnect since)?
- */
- // int received_pong;
-
- /**
* Are we already in the process of disconnecting this neighbour?
*/
- // int in_disconnect;
+ int in_disconnect;
/**
* Do we currently consider this neighbour connected? (as far as
@@ -246,8 +241,50 @@
}
-#if 0
/**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * We're done with our transmission attempt, continue processing.
+ *
+ * @param cls the 'struct MessageQueue' of the message
+ * @param receiver intended receiver
+ * @param success whether it worked or not
+ */
+static void
+transmit_send_continuation (void *cls,
+ const struct GNUNET_PeerIdentity *receiver,
+ int success)
+{
+ struct MessageQueue *mq;
+ struct NeighbourMapEntry *n;
+
+ mq = cls;
+ n = mq->n;
+ if (NULL != n)
+ {
+ GNUNET_assert (n->is_active == mq);
+ n->is_active = NULL;
+ GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
+ n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+ n);
+ }
+ if (NULL != mq->cont)
+ mq->cont (mq->cont_cls,
+ success);
+ GNUNET_free (mq);
+}
+
+
+/**
* Check the ready list for the given neighbour and if a plugin is
* ready for transmission (and if we have a message), do so!
*
@@ -259,44 +296,76 @@
struct MessageQueue *mq;
struct GNUNET_TIME_Relative timeout;
ssize_t ret;
+ struct GNUNET_TRANSPORT_PluginFunctions *papi;
- if (n->messages_head == NULL)
+ if (n->is_active != NULL)
+ return; /* transmission already pending */
+ if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
+ return; /* currently waiting for bandwidth */
+ mq = n->messages_head;
+ while (NULL != (mq = n->messages_head))
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission queue for `%4s' is empty\n",
- GNUNET_i2s (&n->id));
-#endif
- return; /* nothing to do */
+ timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+ if (timeout.rel_value > 0)
+ break;
+ transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
}
- mq = n->messages_head;
+ if (NULL == mq)
+ return; /* no more messages */
+
+ papi = GST_plugins_find (n->plugin_name);
+ if (papi == NULL)
+ {
+ GNUNET_break (0);
+ return;
+ }
GNUNET_CONTAINER_DLL_remove (n->messages_head,
n->messages_tail,
mq);
+ n->is_active = mq;
+ mq->n = n;
ret = papi->send (papi->cls,
- &n->pid,
+ &n->id,
mq->message_buf,
mq->message_buf_size,
- mq->priority,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ 0 /* priority -- remove from plugin API? */,
+ timeout,
n->session,
n->addr,
n->addrlen,
- GNUNET_YES /*?*/,
+ GNUNET_YES,
&transmit_send_continuation, mq);
if (ret == -1)
{
/* failure, but 'send' would not call continuation in this case,
so we need to do it here! */
transmit_send_continuation (mq,
- &mq->neighbour_id,
+ &n->id,
GNUNET_SYSERR);
+ n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+ n);
}
}
-#endif
/**
+ * Task invoked to start a transmission to another peer.
+ *
+ * @param cls the 'struct NeighbourMapEntry'
+ * @param tc scheduler context
+ */
+static void
+transmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct NeighbourMapEntry *n = cls;
+
+ n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+}
+
+
+/**
* Initialize the neighbours subsystem.
*
* @param cls closure for callbacks
@@ -325,23 +394,42 @@
{
struct MessageQueue *mq;
- if (n->is_connected)
+ if (GNUNET_YES == n->in_disconnect)
+ return;
+ n->in_disconnect = GNUNET_YES;
+ while (NULL != (mq = n->messages_head))
{
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
+ mq->cont (mq->cont_cls, GNUNET_SYSERR);
+ GNUNET_free (mq);
+ }
+ if (NULL != n->is_active)
+ {
+ n->is_active->n = NULL;
+ n->is_active = NULL;
+ }
+ if (GNUNET_YES == n->is_connected)
+ {
+ n->is_connected = GNUNET_NO;
disconnect_notify_cb (callback_cls,
&n->id);
- n->is_connected = GNUNET_NO;
}
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (neighbours,
&n->id.hashPubKey,
n));
- while (NULL != (mq = n->messages_head))
+ if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
{
- GNUNET_CONTAINER_DLL_remove (n->messages_head,
- n->messages_tail,
- mq);
- GNUNET_free (mq);
+ GNUNET_SCHEDULER_cancel (n->timeout_task);
+ n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
}
+ if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
+ {
+ GNUNET_SCHEDULER_cancel (n->timeout_task);
+ n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
+ }
if (NULL != n->asc)
{
GNUNET_ATS_suggest_address_cancel (n->asc);
@@ -446,6 +534,7 @@
uint32_t ats_count)
{
struct NeighbourMapEntry *n;
+ struct GNUNET_MessageHeader connect_msg;
n = lookup_neighbour (peer);
if (NULL == n)
@@ -466,6 +555,17 @@
ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
GNUNET_free_non_null (n->plugin_name);
n->plugin_name = GNUNET_strdup (plugin_name);
+ GNUNET_SCHEDULER_cancel (n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+ connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+ GST_neighbours_send (peer,
+ &connect_msg,
+ sizeof (connect_msg),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
}
@@ -564,7 +664,7 @@
n = lookup_neighbour (target);
if ( (NULL == n) ||
- (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) )
+ (n->is_connected == GNUNET_YES) )
return GNUNET_NO; /* not connected */
return GNUNET_YES;
}
@@ -593,7 +693,7 @@
n = lookup_neighbour (target);
if ( (n == NULL) ||
- (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) )
+ (GNUNET_YES != n->is_connected) )
{
GNUNET_STATISTICS_update (GST_stats,
gettext_noop ("# SET QUOTA messages ignored (no
such peer)"),
@@ -620,7 +720,10 @@
GNUNET_CONTAINER_DLL_insert_tail (n->messages_head,
n->messages_tail,
mq);
- // try_transmission_to_peer (n);
+ if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
+ (NULL == n->is_active) )
+ n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
+ n);
}
@@ -667,9 +770,6 @@
n->quota_violation_count--;
}
}
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
GNUNET_SCHEDULER_cancel (n->timeout_task);
n->timeout_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
@@ -773,8 +873,8 @@
struct IteratorContext *ic = cls;
struct NeighbourMapEntry *n = value;
- if (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0)
- return GNUNET_OK; /* not connected */
+ if (GNUNET_YES != n->is_connected)
+ return GNUNET_OK;
GNUNET_assert (n->ats_count > 0);
ic->cb (ic->cb_cls,
&n->id,
@@ -813,9 +913,25 @@
GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
{
struct NeighbourMapEntry *n;
+ struct GNUNET_TRANSPORT_PluginFunctions *papi;
+ struct GNUNET_MessageHeader disconnect_msg;
n = lookup_neighbour (target);
- /* FIXME: send disconnect message to target... */
+ disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+ disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+ papi = GST_plugins_find (n->plugin_name);
+ if (papi != NULL)
+ papi->send (papi->cls,
+ target,
+ (const void*) &disconnect_msg,
+ sizeof (struct GNUNET_MessageHeader),
+ UINT32_MAX /* priority */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ n->session,
+ n->addr,
+ n->addrlen,
+ GNUNET_YES,
+ NULL, NULL);
disconnect_neighbour (n);
}
Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2011-08-12 15:00:47 UTC (rev
16472)
+++ gnunet/src/transport/plugin_transport_tcp.c 2011-08-12 15:49:57 UTC (rev
16473)
@@ -834,6 +834,9 @@
(session->transmit_handle);
session->transmit_handle = NULL;
}
+ session->plugin->env->session_end (session->plugin->env->cls,
+ &session->target,
+ session);
while (NULL != (pm = session->pending_messages_head))
{
#if DEBUG_TCP
@@ -878,9 +881,6 @@
-1,
GNUNET_NO);
GNUNET_free_non_null (session->connect_addr);
- session->plugin->env->session_end (session->plugin->env->cls,
- &session->target,
- session);
GNUNET_assert (NULL == session->transmit_handle);
GNUNET_free (session);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r16473 - gnunet/src/transport,
gnunet <=