gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: generate and handle TRANSPO


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: generate and handle TRANSPORT_FLOW_CONTROL messages (TNG)
Date: Sun, 02 Jun 2019 21:59:16 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 3d9f1acfb generate and handle TRANSPORT_FLOW_CONTROL messages (TNG)
3d9f1acfb is described below

commit 3d9f1acfb62cd53fc7d1441eb33e1341c9ff3790
Author: Christian Grothoff <address@hidden>
AuthorDate: Sun Jun 2 21:58:37 2019 +0200

    generate and handle TRANSPORT_FLOW_CONTROL messages (TNG)
---
 src/include/gnunet_protocols.h     |   5 +
 src/transport/gnunet-service-tng.c | 410 ++++++++++++++++++++++++-------------
 2 files changed, 272 insertions(+), 143 deletions(-)

diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index a00ddacca..d93e12bfb 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3149,6 +3149,11 @@ extern "C" {
  */
 #define GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING 1220
 
+/**
+ * Transport signalling incoming backchannel message to a communicator.
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL 1221
+
 
 /**
  * Message sent to indicate to the transport that a monitor
diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 471ded644..7cc9f193c 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -28,10 +28,7 @@
  *   communicators do not offer flow control).
  *   We do transmit FC window sizes now.  Left:
  *   for SENDING)
- *   - Throttle sending if "outbound_fc_window_size_used" reaches limit
- *   - Send *new* challenge when we get close to the limit (including
- *     at the beginning when the limit is zero!)
- *   - Retransmit challenge if it goes unanswered!
+ *   - need to call consider_sending_fc() periodically if it goes unanswered!
  *
  *   for DV)
  *   - send challenges via DV (when DVH is confirmed *and* we care about
@@ -78,10 +75,6 @@
  *   and high-latency links *if* we have the RAM [GOODPUT / utilization / 
stalls]
  * - Set last_window_consum_limit promise properly based on
  *   latency and bandwidth of the respective connection [GOODPUT / utilization 
/ stalls]
- * - re-sending challenge response without a challenge when we have
- *   significantly increased the FC window (upon CORE being done with messages)
- *   so as to avoid the sender having to give us a fresh challenge [BANDWIDTH]
- *   Also can re-use signature in this case [CPU]. Marked with "TODO-M1"
  *
  * Design realizations / discussion:
  * - communicators do flow control by calling MQ "notify sent"
@@ -1325,6 +1318,13 @@ struct VirtualLink
    */
   struct GNUNET_TIME_Absolute n_challenge_time;
 
+  /**
+   * When did we last send a
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message?
+   * Used to determine whether it is time to re-transmit the message.
+   */
+  struct GNUNET_TIME_Absolute last_fc_transmission;
+
   /**
    * Sender timestamp of the last
    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
@@ -1388,8 +1388,7 @@ struct VirtualLink
 
   /**
    * Our current flow control window size in bytes.  We
-   * are allowed to transmit this many bytes to @a n as per
-   * our @e my_challenge "account".
+   * are allowed to transmit this many bytes to @a n.
    */
   uint64_t outbound_fc_window_size;
 
@@ -3966,138 +3965,6 @@ pick_random_dv_hops (const struct DistanceVector *dv,
 }
 
 
-/**
- * There is a message at the head of the pending messages for @a vl
- * which may be ready for transmission. Check if a queue is ready to
- * take it.
- *
- * This function must (1) check for flow control to ensure that we can
- * right now send to @a vl, (2) check that the pending message in the
- * queue is actually eligible, (3) determine if any applicable queue
- * (direct neighbour or DVH path) is ready to accept messages, and
- * (4) prioritize based on the preferences associated with the
- * pending message.
- *
- * So yeah, easy.
- *
- * @param vl virtual link where we should check for transmission
- */
-static void
-check_vl_transmission (struct VirtualLink *vl)
-{
-  struct Neighbour *n = vl->n;
-  struct DistanceVector *dv = vl->dv;
-  struct GNUNET_TIME_Absolute now;
-  int elig;
-
-  /* FIXME-FC: need to implement virtual link flow control! */
-
-  /* Check that we have an eligible pending message!
-     (cheaper than having #transmit_on_queue() find out!) */
-  elig = GNUNET_NO;
-  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
-       pm = pm->next_vl)
-  {
-    if (NULL != pm->qe)
-      continue; /* not eligible, is in a queue! */
-    elig = GNUNET_YES;
-    break;
-  }
-  if (GNUNET_NO == elig)
-    return;
-
-  /* Notify queues at direct neighbours that we are interested */
-  now = GNUNET_TIME_absolute_get ();
-  if (NULL != n)
-  {
-    for (struct Queue *queue = n->queue_head; NULL != queue;
-         queue = queue->next_neighbour)
-      if ((GNUNET_YES == queue->idle) &&
-          (queue->validated_until.abs_value_us > now.abs_value_us))
-        schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
-  }
-  /* Notify queues via DV that we are interested */
-  if (NULL != dv)
-  {
-    /* Do DV with lower scheduler priority, which effectively means that
-       IF a neighbour exists and is available, we prefer it. */
-    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-         pos = pos->next_dv)
-    {
-      struct Neighbour *nh = pos->next_hop;
-
-      if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
-        continue; /* skip this one: path not validated */
-      for (struct Queue *queue = nh->queue_head; NULL != queue;
-           queue = queue->next_neighbour)
-        if ((GNUNET_YES == queue->idle) &&
-            (queue->validated_until.abs_value_us > now.abs_value_us))
-          schedule_transmit_on_queue (queue,
-                                      GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
-    }
-  }
-}
-
-
-/**
- * Client asked for transmission to a peer.  Process the request.
- *
- * @param cls the client
- * @param obm the send message that was sent
- */
-static void
-handle_client_send (void *cls, const struct OutboundMessage *obm)
-{
-  struct TransportClient *tc = cls;
-  struct PendingMessage *pm;
-  const struct GNUNET_MessageHeader *obmm;
-  uint32_t bytes_msg;
-  struct VirtualLink *vl;
-  enum GNUNET_MQ_PriorityPreferences pp;
-
-  GNUNET_assert (CT_CORE == tc->type);
-  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
-  bytes_msg = ntohs (obmm->size);
-  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
-  vl = lookup_virtual_link (&obm->peer);
-  if (NULL == vl)
-  {
-    /* Failure: don't have this peer as a neighbour (anymore).
-       Might have gone down asynchronously, so this is NOT
-       a protocol violation by CORE. Still count the event,
-       as this should be rare. */
-    GNUNET_SERVICE_client_continue (tc->client);
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# messages dropped (neighbour unknown)",
-                              1,
-                              GNUNET_NO);
-    return;
-  }
-
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
-  pm->logging_uuid = logging_uuid_gen++;
-  pm->prefs = pp;
-  pm->client = tc;
-  pm->vl = vl;
-  pm->bytes_msg = bytes_msg;
-  memcpy (&pm[1], obmm, bytes_msg);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending %u bytes as <%llu> to %s\n",
-              bytes_msg,
-              pm->logging_uuid,
-              GNUNET_i2s (&obm->peer));
-  GNUNET_CONTAINER_MDLL_insert (client,
-                                tc->details.core.pending_msg_head,
-                                tc->details.core.pending_msg_tail,
-                                pm);
-  GNUNET_CONTAINER_MDLL_insert (vl,
-                                vl->pending_msg_head,
-                                vl->pending_msg_tail,
-                                pm);
-  check_vl_transmission (vl);
-}
-
-
 /**
  * Communicator started.  Test message is well-formed.
  *
@@ -4853,6 +4720,187 @@ route_control_message_without_fc (const struct 
GNUNET_PeerIdentity *target,
 }
 
 
+/**
+ * Something changed on the virtual link with respect to flow
+ * control. Consider retransmitting the FC window size.
+ *
+ * @param vl virtual link to work with
+ */
+static void
+consider_sending_fc (struct VirtualLink *vl)
+{
+  struct GNUNET_TIME_Absolute monotime;
+  struct TransportFlowControlMessage fc;
+  struct GNUNET_TIME_Relative duration;
+
+  duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission);
+  /* FIXME: decide sane criteria on when to do this, instead of doing
+     it always! */
+  /* For example, we should probably ONLY do this if a bit more than
+     an RTT has passed, or if the window changed "significantly" since
+     then. */
+  (void) duration;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending FC seq %u to %s with new window %llu\n",
+              (unsigned int) vl->fc_seq_gen,
+              GNUNET_i2s (&vl->target),
+              (unsigned long long) vl->incoming_fc_window_size);
+  monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
+  vl->last_fc_transmission = monotime;
+  fc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL);
+  fc.header.size = htons (sizeof (fc));
+  fc.seq = htonl (vl->fc_seq_gen++);
+  fc.inbound_window_size = GNUNET_htonll (vl->incoming_fc_window_size);
+  fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
+  fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
+  fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
+  route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE);
+}
+
+
+/**
+ * There is a message at the head of the pending messages for @a vl
+ * which may be ready for transmission. Check if a queue is ready to
+ * take it.
+ *
+ * This function must (1) check for flow control to ensure that we can
+ * right now send to @a vl, (2) check that the pending message in the
+ * queue is actually eligible, (3) determine if any applicable queue
+ * (direct neighbour or DVH path) is ready to accept messages, and
+ * (4) prioritize based on the preferences associated with the
+ * pending message.
+ *
+ * So yeah, easy.
+ *
+ * @param vl virtual link where we should check for transmission
+ */
+static void
+check_vl_transmission (struct VirtualLink *vl)
+{
+  struct Neighbour *n = vl->n;
+  struct DistanceVector *dv = vl->dv;
+  struct GNUNET_TIME_Absolute now;
+  int elig;
+
+  /* Check that we have an eligible pending message!
+     (cheaper than having #transmit_on_queue() find out!) */
+  elig = GNUNET_NO;
+  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
+       pm = pm->next_vl)
+  {
+    if (NULL != pm->qe)
+      continue; /* not eligible, is in a queue! */
+    if (pm->bytes_msg + vl->outbound_fc_window_size_used >
+        vl->outbound_fc_window_size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Stalled transmision on VL %s due to flow control: %llu < 
%llu\n",
+                  GNUNET_i2s (&vl->target),
+                  (unsigned long long) vl->outbound_fc_window_size,
+                  (unsigned long long) (pm->bytes_msg +
+                                        vl->outbound_fc_window_size_used));
+      consider_sending_fc (vl);
+      return; /* We have a message, but flow control says "nope" */
+    }
+    elig = GNUNET_YES;
+    break;
+  }
+  if (GNUNET_NO == elig)
+    return;
+
+  /* Notify queues at direct neighbours that we are interested */
+  now = GNUNET_TIME_absolute_get ();
+  if (NULL != n)
+  {
+    for (struct Queue *queue = n->queue_head; NULL != queue;
+         queue = queue->next_neighbour)
+      if ((GNUNET_YES == queue->idle) &&
+          (queue->validated_until.abs_value_us > now.abs_value_us))
+        schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+  }
+  /* Notify queues via DV that we are interested */
+  if (NULL != dv)
+  {
+    /* Do DV with lower scheduler priority, which effectively means that
+       IF a neighbour exists and is available, we prefer it. */
+    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+         pos = pos->next_dv)
+    {
+      struct Neighbour *nh = pos->next_hop;
+
+      if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+        continue; /* skip this one: path not validated */
+      for (struct Queue *queue = nh->queue_head; NULL != queue;
+           queue = queue->next_neighbour)
+        if ((GNUNET_YES == queue->idle) &&
+            (queue->validated_until.abs_value_us > now.abs_value_us))
+          schedule_transmit_on_queue (queue,
+                                      GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+    }
+  }
+}
+
+
+/**
+ * Client asked for transmission to a peer.  Process the request.
+ *
+ * @param cls the client
+ * @param obm the send message that was sent
+ */
+static void
+handle_client_send (void *cls, const struct OutboundMessage *obm)
+{
+  struct TransportClient *tc = cls;
+  struct PendingMessage *pm;
+  const struct GNUNET_MessageHeader *obmm;
+  uint32_t bytes_msg;
+  struct VirtualLink *vl;
+  enum GNUNET_MQ_PriorityPreferences pp;
+
+  GNUNET_assert (CT_CORE == tc->type);
+  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
+  bytes_msg = ntohs (obmm->size);
+  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
+  vl = lookup_virtual_link (&obm->peer);
+  if (NULL == vl)
+  {
+    /* Failure: don't have this peer as a neighbour (anymore).
+       Might have gone down asynchronously, so this is NOT
+       a protocol violation by CORE. Still count the event,
+       as this should be rare. */
+    GNUNET_SERVICE_client_continue (tc->client);
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# messages dropped (neighbour unknown)",
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
+  pm->logging_uuid = logging_uuid_gen++;
+  pm->prefs = pp;
+  pm->client = tc;
+  pm->vl = vl;
+  pm->bytes_msg = bytes_msg;
+  memcpy (&pm[1], obmm, bytes_msg);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending %u bytes as <%llu> to %s\n",
+              bytes_msg,
+              pm->logging_uuid,
+              GNUNET_i2s (&obm->peer));
+  GNUNET_CONTAINER_MDLL_insert (client,
+                                tc->details.core.pending_msg_head,
+                                tc->details.core.pending_msg_tail,
+                                pm);
+  GNUNET_CONTAINER_MDLL_insert (vl,
+                                vl->pending_msg_head,
+                                vl->pending_msg_tail,
+                                pm);
+  check_vl_transmission (vl);
+}
+
+
 /**
  * Communicator requests backchannel transmission.  Process the request.
  * Just repacks it into our `struct TransportBackchannelEncapsulationMessage *`
@@ -5113,7 +5161,7 @@ core_env_sent_cb (void *cls)
   GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
   vl->incoming_fc_window_size_ram -= ctx->size;
   vl->incoming_fc_window_size_used += ctx->isize;
-  /* TODO-M1 */
+  consider_sending_fc (vl);
   GNUNET_free (ctx);
 }
 
@@ -6046,6 +6094,7 @@ activate_core_visible_dv_path (struct DistanceVectorHop 
*hop)
                   &vl->target,
                   vl,
                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  consider_sending_fc (vl);
   /* We lacked a confirmed connection to the target
      before, so tell CORE about it (finally!) */
   cores_send_connect_info (&dv->target);
@@ -8031,6 +8080,7 @@ handle_validation_response (
                   &vl->target,
                   vl,
                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  consider_sending_fc (vl);
   /* We lacked a confirmed connection to the target
      before, so tell CORE about it (finally!) */
   cores_send_connect_info (&n->pid);
@@ -8059,6 +8109,76 @@ handle_incoming_msg (void *cls,
 }
 
 
+/**
+ * Communicator gave us a transport address validation response.  Process the
+ * request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param fc the message that was received
+ */
+static void
+handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  struct VirtualLink *vl;
+  uint32_t seq;
+  struct GNUNET_TIME_Absolute st;
+  uint64_t os;
+  uint64_t wnd;
+
+  vl = lookup_virtual_link (&cmc->im.sender);
+  if (NULL == vl)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# FC dropped: virtual link unknown",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
+  }
+  st = GNUNET_TIME_absolute_ntoh (fc->sender_time);
+  if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us)
+  {
+    /* out of order, drop */
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# FC dropped: message out of order",
+                              1,
+                              GNUNET_NO);
+    finish_cmc_handling (cmc);
+    return;
+  }
+  seq = ntohl (fc->seq);
+  if (seq < vl->last_fc_seq)
+  {
+    /* Wrap-around/reset of other peer; start all counters from zero */
+    vl->outbound_fc_window_size_used = 0;
+  }
+  vl->last_fc_seq = seq;
+  vl->last_fc_timestamp = st;
+  vl->outbound_fc_window_size = GNUNET_ntohll (fc->inbound_window_size);
+  os = GNUNET_ntohll (fc->outbound_sent);
+  vl->incoming_fc_window_size_loss =
+    (int64_t) (os - vl->incoming_fc_window_size_used);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received FC from %s, seq %u, new window %llu (loss at %lld)\n",
+              GNUNET_i2s (&vl->target),
+              (unsigned int) seq,
+              (unsigned long long) vl->outbound_fc_window_size,
+              (long long) vl->incoming_fc_window_size_loss);
+  wnd = GNUNET_ntohll (fc->outbound_window_size);
+  if (wnd < vl->incoming_fc_window_size)
+  {
+    /* Consider re-sending our FC message, as clearly the
+       other peer's idea of the window is not up-to-date */
+    consider_sending_fc (vl);
+  }
+  /* FC window likely increased, check transmission possibilities! */
+  check_vl_transmission (vl);
+  finish_cmc_handling (cmc);
+}
+
+
 /**
  * Given an inbound message @a msg from a communicator @a cmc,
  * demultiplex it based on the type calling the right handler.
@@ -8100,6 +8220,10 @@ demultiplex_with_cmc (struct CommunicatorMessageContext 
*cmc,
        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
        struct TransportValidationChallengeMessage,
        &cmc),
+     GNUNET_MQ_hd_fixed_size (flow_control,
+                              GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL,
+                              struct TransportFlowControlMessage,
+                              &cmc),
      GNUNET_MQ_hd_fixed_size (
        validation_response,
        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]