[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r16419 - gnunet/src/transport
From: |
gnunet |
Subject: |
[GNUnet-SVN] r16419 - gnunet/src/transport |
Date: |
Sun, 7 Aug 2011 08:04:52 +0200 |
Author: grothoff
Date: 2011-08-07 08:04:52 +0200 (Sun, 07 Aug 2011)
New Revision: 16419
Modified:
gnunet/src/transport/gnunet-service-transport_neighbours.c
Log:
towards neighbour management
Modified: gnunet/src/transport/gnunet-service-transport_neighbours.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport_neighbours.c 2011-08-06
20:43:50 UTC (rev 16418)
+++ gnunet/src/transport/gnunet-service-transport_neighbours.c 2011-08-07
06:04:52 UTC (rev 16419)
@@ -26,7 +26,15 @@
#include "platform.h"
#include "gnunet-service-transport_neighbours.h"
#include "gnunet-service-transport.h"
+#include "gnunet_constants.h"
+
+/**
+ * Size of the neighbour hash map.
+ */
+#define NEIGHBOUR_TABLE_SIZE 256
+
+
// TODO:
// - have a way to access the currently 'connected' session
// (for sending and to notice disconnect of it!)
@@ -34,8 +42,500 @@
// (for CostReport/TrafficReport callbacks)
+struct NeighbourMapEntry;
/**
+ * For each neighbour we keep a list of messages
+ * that we still want to transmit to the neighbour.
+ */
+struct MessageQueue
+{
+
+ /**
+ * This is a doubly linked list.
+ */
+ struct MessageQueue *next;
+
+ /**
+ * This is a doubly linked list.
+ */
+ struct MessageQueue *prev;
+
+ /**
+ * The message(s) we want to transmit, GNUNET_MessageHeader(s)
+ * stuck together in memory. Allocated at the end of this struct.
+ */
+ const char *message_buf;
+
+ /**
+ * Size of the message buf
+ */
+ size_t message_buf_size;
+
+ /**
+ * Client responsible for queueing the message; used to check that a
+ * client has no two messages pending for the same target and to
+ * notify the client of a successful transmission; NULL if this is
+ * an internal message.
+ */
+ struct TransportClient *client;
+
+ /**
+ * At what time should we fail?
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * Internal message of the transport system that should not be
+ * included in the usual SEND-SEND_OK transmission confirmation
+ * traffic management scheme. Typically, "internal_msg" will
+ * be set whenever "client" is NULL (but it is not strictly
+ * required).
+ */
+ int internal_msg;
+
+ /**
+ * How important is the message?
+ */
+ unsigned int priority;
+
+};
+
+
+
+/**
+ * Entry in neighbours.
+ */
+struct NeighbourMapEntry
+{
+
+ /**
+ * Head of list of messages we would like to send to this peer;
+ * must contain at most one message per client.
+ */
+ struct MessageQueue *messages_head;
+
+ /**
+ * Tail of list of messages we would like to send to this peer; must
+ * contain at most one message per client.
+ */
+ struct MessageQueue *messages_tail;
+
+ /**
+ * Context for peerinfo iteration.
+ * NULL after we are done processing peerinfo's information.
+ */
+ struct GNUNET_PEERINFO_IteratorContext *piter;
+
+ /**
+ * Performance data for the peer.
+ */
+ struct GNUNET_TRANSPORT_ATS_Information *ats;
+
+ /**
+ * Public key for this peer. Valid only if the respective flag is set below.
+ */
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+
+ /**
+ * Identity of this neighbour.
+ */
+ struct GNUNET_PeerIdentity id;
+
+ /**
+ * ID of task scheduled to run when this peer is about to
+ * time out (will free resources associated with the peer).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * ID of task scheduled to run when we should retry transmitting
+ * the head of the message queue. Actually triggered when the
+ * transmission is timing out (we trigger instantly when we have
+ * a chance of success).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
+ /**
+ * How long until we should consider this peer dead (if we don't
+ * receive another message in the meantime)?
+ */
+ struct GNUNET_TIME_Absolute peer_timeout;
+
+ /**
+ * Tracker for inbound bandwidth.
+ */
+ struct GNUNET_BANDWIDTH_Tracker in_tracker;
+
+ /**
+ * The latency we have seen for this particular address for
+ * this particular peer. This latency may have been calculated
+ * over multiple transports. This value reflects how long it took
+ * us to receive a response when SENDING via this particular
+ * transport/neighbour/address combination!
+ *
+ * FIXME: we need to periodically send PINGs to update this
+ * latency (at least more often than the current "huge" (11h?)
+ * update interval).
+ */
+ struct GNUNET_TIME_Relative latency;
+
+ /**
+ * How often has the other peer (recently) violated the inbound
+ * traffic limit? Incremented by 10 per violation, decremented by 1
+ * per non-violation (for each time interval).
+ */
+ unsigned int quota_violation_count;
+
+ /**
+ * DV distance to this peer (1 if no DV is used).
+ */
+ uint32_t distance;
+
+ /**
+ * Have we seen an PONG from this neighbour in the past (and
+ * not had a disconnect since)?
+ */
+ int received_pong;
+
+ /**
+ * Do we have a valid public key for this neighbour?
+ */
+ int public_key_valid;
+
+ /**
+ * Are we already in the process of disconnecting this neighbour?
+ */
+ int in_disconnect;
+
+};
+
+
+/**
+ * All known neighbours and their HELLOs.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
+
+/**
+ * Closure for connect_notify_cb and disconnect_notify_cb
+ */
+static void *callback_cls;
+
+/**
+ * Function to call when we connected to a neighbour.
+ */
+static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
+
+/**
+ * Function to call when we disconnected from a neighbour.
+ */
+static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
+
+
+#if 0
+/**
+ * Check the ready list for the given neighbour and if a plugin is
+ * ready for transmission (and if we have a message), do so!
+ *
+ * @param neighbour target peer for which to transmit
+ */
+static void
+try_transmission_to_peer (struct NeighbourMapEntry *n)
+{
+ struct ReadyList *rl;
+ struct MessageQueue *mq;
+ struct GNUNET_TIME_Relative timeout;
+ ssize_t ret;
+ int force_address;
+
+ if (n->messages_head == NULL)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission queue for `%4s' is empty\n",
+ GNUNET_i2s (&n->id));
+#endif
+ return; /* nothing to do */
+ }
+ rl = NULL;
+ mq = n->messages_head;
+ force_address = GNUNET_YES;
+ if (mq->specific_address == NULL)
+ {
+ /* TODO: ADD ATS */
+ mq->specific_address = get_preferred_ats_address(n);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# transport selected peer
address freely"),
+ 1,
+ GNUNET_NO);
+ force_address = GNUNET_NO;
+ }
+ if (mq->specific_address == NULL)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# transport failed to selected
peer address"),
+ 1,
+ GNUNET_NO);
+ timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+ if (timeout.rel_value == 0)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No destination address available to transmit message of
size %u to peer `%4s'\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&mq->neighbour_id));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for
other peers"),
+ - (int64_t) mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded (no
destination address available)"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ if (mq->client != NULL)
+ transmit_send_ok (mq->client, n, &n->id, GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
+ GNUNET_free (mq);
+ return; /* nobody ready */
+ }
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# message delivery deferred (no
address)"),
+ 1,
+ GNUNET_NO);
+ if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (n->retry_task);
+ n->retry_task = GNUNET_SCHEDULER_add_delayed (timeout,
+ &retry_transmission_task,
+ n);
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No validated destination address available to transmit
message of size %u to peer `%4s', will wait %llums to find an address.\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&mq->neighbour_id),
+ timeout.rel_value);
+#endif
+ /* FIXME: might want to trigger peerinfo lookup here
+ (unless that's already pending...) */
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
+ if (mq->specific_address->connected == GNUNET_NO)
+ mq->specific_address->connect_attempts++;
+ rl = mq->specific_address->ready_list;
+ mq->plugin = rl->plugin;
+ if (!mq->internal_msg)
+ mq->specific_address->in_transmit = GNUNET_YES;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&n->id),
+ (mq->specific_address->addr != NULL)
+ ? a2s (mq->plugin->short_name,
+ mq->specific_address->addr,
+ mq->specific_address->addrlen)
+ : "<inbound>",
+ rl->plugin->short_name);
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other
peers"),
+ - (int64_t) mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes pending with plugins"),
+ mq->message_buf_size,
+ GNUNET_NO);
+
+ GNUNET_CONTAINER_DLL_insert (n->cont_head,
+ n->cont_tail,
+ mq);
+
+ ret = rl->plugin->api->send (rl->plugin->api->cls,
+ &mq->neighbour_id,
+ mq->message_buf,
+ mq->message_buf_size,
+ mq->priority,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ mq->specific_address->session,
+ mq->specific_address->addr,
+ mq->specific_address->addrlen,
+ force_address,
+ &transmit_send_continuation, mq);
+ if (ret == -1)
+ {
+ /* failure, but 'send' would not call continuation in this case,
+ so we need to do it here! */
+ transmit_send_continuation (mq,
+ &mq->neighbour_id,
+ GNUNET_SYSERR);
+ }
+}
+
+
+/**
+ * Send the specified message to the specified peer.
+ *
+ * @param client source of the transmission request (can be NULL)
+ * @param peer_address ForeignAddressList where we should send this message
+ * @param priority how important is the message
+ * @param timeout how long do we have to transmit?
+ * @param message_buf message(s) to send GNUNET_MessageHeader(s)
+ * @param message_buf_size total size of all messages in message_buf
+ * @param is_internal is this an internal message; these are pre-pended and
+ * also do not count for plugins being "ready" to transmit
+ * @param neighbour handle to the neighbour for transmission
+ */
+static void
+transmit_to_peer (struct TransportClient *client,
+ struct ForeignAddressList *peer_address,
+ unsigned int priority,
+ struct GNUNET_TIME_Relative timeout,
+ const char *message_buf,
+ size_t message_buf_size,
+ int is_internal, struct NeighbourMapEntry *neighbour)
+{
+ struct MessageQueue *mq;
+
+#if EXTRA_CHECKS
+ if (client != NULL)
+ {
+ /* check for duplicate submission */
+ mq = neighbour->messages_head;
+ while (NULL != mq)
+ {
+ if (mq->client == client)
+ {
+ /* client transmitted to same peer twice
+ before getting SEND_OK! */
+ GNUNET_break (0);
+ return;
+ }
+ mq = mq->next;
+ }
+ }
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other
peers"),
+ message_buf_size,
+ GNUNET_NO);
+ mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
+ mq->specific_address = peer_address;
+ mq->client = client;
+ /* FIXME: this memcpy can be up to 7% of our total runtime! */
+ memcpy (&mq[1], message_buf, message_buf_size);
+ mq->message_buf = (const char*) &mq[1];
+ mq->message_buf_size = message_buf_size;
+ memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct
GNUNET_PeerIdentity));
+ mq->internal_msg = is_internal;
+ mq->priority = priority;
+ mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ if (is_internal)
+ GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
+ else
+ GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
+ neighbour->messages_tail,
+ neighbour->messages_tail,
+ mq);
+ try_transmission_to_peer (neighbour);
+}
+
+
+/**
+ * Create a fresh entry in our neighbour list for the given peer.
+ * Will try to transmit our current HELLO to the new neighbour.
+ * Do not call this function directly, use 'setup_peer_check_blacklist.
+ *
+ * @param peer the peer for which we create the entry
+ * @param do_hello should we schedule transmitting a HELLO
+ * @return the new neighbour list entry
+ */
+static struct NeighbourMapEntry *
+setup_new_neighbour (const struct GNUNET_PeerIdentity *peer,
+ int do_hello)
+{
+ struct NeighbourMapEntry *n;
+ struct TransportPlugin *tp;
+ struct ReadyList *rl;
+
+ GNUNET_assert (0 != memcmp (peer,
+ &my_identity,
+ sizeof (struct GNUNET_PeerIdentity)));
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Setting up state for neighbour `%4s'\n",
+ GNUNET_i2s (peer));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# active neighbours"),
+ 1,
+ GNUNET_NO);
+ n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
+ n->id = *peer;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
+ GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+ MAX_BANDWIDTH_CARRY_S);
+ tp = plugins;
+ while (tp != NULL)
+ {
+ if ((tp->api->send != NULL) && (!is_blacklisted(peer, tp)))
+ {
+ rl = GNUNET_malloc (sizeof (struct ReadyList));
+ rl->neighbour = n;
+ rl->next = n->plugins;
+ n->plugins = rl;
+ rl->plugin = tp;
+ rl->addresses = NULL;
+ }
+ tp = tp->next;
+ }
+ n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+ n->distance = -1;
+ n->timeout_task = GNUNET_SCHEDULER_add_delayed
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ GNUNET_CONTAINER_multihashmap_put (neighbours,
+ &n->id.hashPubKey,
+ n,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ if (do_hello)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peerinfo new neighbor iterate
requests"),
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# outstanding peerinfo iterate
requests"),
+ 1,
+ GNUNET_NO);
+ n->piter = GNUNET_PEERINFO_iterate (peerinfo, peer,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &add_hello_for_peer, n);
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLO's sent to new
neighbors"),
+ 1,
+ GNUNET_NO);
+ if (NULL != our_hello)
+ transmit_to_peer (NULL, NULL, 0,
+ HELLO_ADDRESS_EXPIRATION,
+ (const char *) our_hello,
GNUNET_HELLO_size(our_hello),
+ GNUNET_NO, n);
+ }
+ return n;
+}
+#endif
+
+
+/**
* Initialize the neighbours subsystem.
*
* @param cls closure for callbacks
@@ -47,15 +547,54 @@
GNUNET_TRANSPORT_NotifyConnect connect_cb,
GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
{
+ callback_cls = cls;
+ connect_notify_cb = connect_cb;
+ disconnect_notify_cb = disconnect_cb;
+ neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
}
/**
+ * Disconnect from the given neighbour.
+ *
+ * @param cls unused
+ * @param key hash of neighbour's public key (not used)
+ * @param value the 'struct NeighbourMapEntry' of the neighbour
+ */
+static int
+disconnect_all_neighbours (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct NeighbourMapEntry *n = value;
+
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Disconnecting peer `%4s', %s\n",
+ GNUNET_i2s(&n->id),
+ "SHUTDOWN_TASK");
+#endif
+ // FIXME:
+ // disconnect_neighbour (n);
+ n++;
+ return GNUNET_OK;
+}
+
+
+/**
* Cleanup the neighbours subsystem.
*/
void
GST_neighbours_stop ()
{
+ GNUNET_CONTAINER_multihashmap_iterate (neighbours,
+ &disconnect_all_neighbours,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (neighbours);
+ neighbours = NULL;
+ callback_cls = NULL;
+ connect_notify_cb = NULL;
+ disconnect_notify_cb = NULL;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r16419 - gnunet/src/transport,
gnunet <=