gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: massive refactoring to intr


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: massive refactoring to intruce data structure
Date: Mon, 22 Apr 2019 16:51:46 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new f1834a5f1 massive refactoring to intruce  data structure
f1834a5f1 is described below

commit f1834a5f1c8c2b06f5c140c4aaefe27e474d16d6
Author: Christian Grothoff <address@hidden>
AuthorDate: Mon Apr 22 16:51:36 2019 +0200

    massive refactoring to intruce  data structure
---
 src/include/gnunet_protocols.h     |    7 +-
 src/transport/gnunet-service-tng.c | 1162 ++++++++++++++++++++++--------------
 2 files changed, 730 insertions(+), 439 deletions(-)

diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index e402460c0..27a7034b0 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2121,7 +2121,7 @@ extern "C" {
 
 /** M<->S<->C: PSYC message which contains a header and one or more message
  * parts. */
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER                                
\
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER \
   692 // FIXME: start using this where appropriate
 
 /** Message part: method */
@@ -3107,11 +3107,6 @@ extern "C" {
  */
 #define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT 1214
 
-/**
- * Acknowledgement generated for a fragment.
- */
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK 1215
-
 /**
  * Wrapper around non-fragmented CORE message used to measure RTT
  * and ensure reliability.
diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index a35357d9b..5c51ed59a 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -117,6 +117,11 @@
 #include "gnunet_signatures.h"
 #include "transport.h"
 
+/**
+ * Maximum number of messages we acknowledge together in one
+ * cummulative ACK.  Larger values may save a bit of bandwidth.
+ */
+#define MAX_CUMMULATIVE_ACKS 64
 
 /**
  * What is the size we assume for a read operation in the
@@ -211,6 +216,14 @@
 #define MAX_VALIDATION_CHALLENGE_FREQ \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
 
+/**
+ * How long until we forget about historic accumulators and thus
+ * reset the ACK counter? Should exceed the maximum time an
+ * active connection experiences without an ACK.
+ */
+#define ACK_CUMMULATOR_TIMEOUT \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
+
 /**
  * What is the non-randomized base frequency at which we
  * would initiate DV learn messages?
@@ -278,13 +291,19 @@ struct MessageUUIDP
    * Unique value, generated by incrementing the
    * `message_uuid_ctr` of `struct Neighbour`.
    */
-  uint32_t uuid GNUNET_PACKED;
+  uint64_t uuid GNUNET_PACKED;
+};
 
+
+/**
+ * Unique identifier to map an acknowledgement to a transmission.
+ */
+struct AcknowledgementUUIDP
+{
   /**
-   * UUID of the queue that was used to transmit this message.
-   * Used to map acknowledgements back to the respective queue.
+   * The UUID value.  Not actually a hash, but a random value.
    */
-  uint32_t queue_uuid GNUNET_PACKED;
+  struct GNUNET_ShortHashCode value;
 };
 
 
@@ -474,7 +493,27 @@ struct TransportReliabilityBoxMessage
    * messages sent over possibly unreliable channels.  Should
    * be a random.
    */
-  struct MessageUUIDP msg_uuid;
+  struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayloadP
+{
+  /**
+   * How long was the ACK delayed for generating cummulative ACKs?
+   * Used to calculate the correct network RTT by taking the receipt
+   * time of the ack minus the transmission time of the sender minus
+   * this value.
+   */
+  struct GNUNET_TIME_RelativeNBO ack_delay;
+
+  /**
+   * UUID of a message being acknowledged.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
 };
 
 
@@ -493,19 +532,12 @@ struct TransportReliabilityAckMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Reserved. Zero.
+   * Counter of ACKs transmitted by the sender to us. Incremented
+   * by one for each ACK, used to detect how many ACKs were lost.
    */
-  uint32_t reserved GNUNET_PACKED;
+  uint32_t ack_counter GNUNET_PACKED;
 
-  /**
-   * How long was the ACK delayed relative to the average time of
-   * receipt of the messages being acknowledged?  Used to calculate
-   * the average RTT by taking the receipt time of the ack minus the
-   * average transmission time of the sender minus this value.
-   */
-  struct GNUNET_TIME_RelativeNBO avg_ack_delay;
-
-  /* followed by any number of `struct MessageUUIDP`
+  /* followed by any number of `struct TransportCummulativeAckPayloadP`
      messages providing ACKs */
 };
 
@@ -523,16 +555,15 @@ struct TransportFragmentBoxMessage
   /**
    * Unique ID of this fragment (and fragment transmission!). Will
    * change even if a fragement is retransmitted to make each
-   * transmission attempt unique! Should be incremented by one for
-   * each fragment transmission. If a client receives a duplicate
-   * fragment (same @e frag_off), it must send
-   * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
+   * transmission attempt unique! If a client receives a duplicate
+   * fragment (same @e frag_off for same @a msg_uuid, it must send
+   * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK immediately.
    */
-  struct FragmentUUIDP frag_uuid;
+  struct AcknowledgementUUIDP ack_uuid;
 
   /**
-   * Original message ID for of the message that all the
-   * fragments belong to.  Must be the same for all fragments.
+   * Original message ID for of the message that all the fragments
+   * belong to.  Must be the same for all fragments.
    */
   struct MessageUUIDP msg_uuid;
 
@@ -548,54 +579,6 @@ struct TransportFragmentBoxMessage
 };
 
 
-/**
- * Outer layer of an fragmented application message sent over a queue
- * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
- * received, the receiver has two RTTs or 64 further fragments with
- * the same basic message time to send an acknowledgement, possibly
- * acknowledging up to 65 fragments in one ACK.  ACKs must also be
- * sent immediately once all fragments were sent.
- */
-struct TransportFragmentAckMessage
-{
-  /**
-   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Unique ID of the lowest fragment UUID being acknowledged.
-   */
-  struct FragmentUUIDP frag_uuid;
-
-  /**
-   * Bitfield of up to 64 additional fragments following the
-   * @e msg_uuid being acknowledged by this message.
-   */
-  uint64_t extra_acks GNUNET_PACKED;
-
-  /**
-   * Original message ID for of the message that all the
-   * fragments belong to.
-   */
-  struct MessageUUIDP msg_uuid;
-
-  /**
-   * How long was the ACK delayed relative to the average time of
-   * receipt of the fragments being acknowledged?  Used to calculate
-   * the average RTT by taking the receipt time of the ack minus the
-   * average transmission time of the sender minus this value.
-   */
-  struct GNUNET_TIME_RelativeNBO avg_ack_delay;
-
-  /**
-   * How long until the receiver will stop trying reassembly
-   * of this message?
-   */
-  struct GNUNET_TIME_RelativeNBO reassembly_timeout;
-};
-
-
 /**
  * Content signed by the initator during DV learning.
  *
@@ -1025,19 +1008,129 @@ struct EphemeralCacheEntry
  */
 struct TransportClient;
 
-
 /**
  * A neighbour that at least one communicator is connected to.
  */
 struct Neighbour;
 
-
 /**
  * Entry in our #dv_routes table, representing a (set of) distance
  * vector routes to a particular peer.
  */
 struct DistanceVector;
 
+/**
+ * A queue is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
+ */
+struct Queue;
+
+/**
+ * Message awaiting transmission. See detailed comments below.
+ */
+struct PendingMessage;
+
+/**
+ * One possible hop towards a DV target.
+ */
+struct DistanceVectorHop;
+
+
+/**
+ * Data structure kept when we are waiting for an acknowledgement.
+ */
+struct PendingAcknowledgement
+{
+
+  /**
+   * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to its pending message.
+   */
+  struct PendingAcknowledgement *next_pm;
+
+  /**
+   * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to its pending message.
+   */
+  struct PendingAcknowledgement *prev_pm;
+
+  /**
+   * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the queue that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *next_queue;
+
+  /**
+   * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the queue that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *prev_queue;
+
+  /**
+   * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the DVH that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *next_dvh;
+
+  /**
+   * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+   * is kept in relation to the DVH that was used to transmit the
+   * @a pm.
+   */
+  struct PendingAcknowledgement *prev_dvh;
+
+  /**
+   * Pointers for the DLL of all pending acknowledgements.
+   * This list is sorted by @e transmission time.  If the list gets too
+   * long, the oldest entries are discarded.
+   */
+  struct PendingAcknowledgement *next_pa;
+
+  /**
+   * Pointers for the DLL of all pending acknowledgements.
+   * This list is sorted by @e transmission time.  If the list gets too
+   * long, the oldest entries are discarded.
+   */
+  struct PendingAcknowledgement *prev_pa;
+
+  /**
+   * Unique identifier for this transmission operation.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
+
+  /**
+   * Message that was transmitted, may be NULL if the message was ACKed
+   * via another channel.
+   */
+  struct PendingMessage *pm;
+
+  /**
+   * Distance vector path chosen for this transmission, NULL if transmission
+   * was to a direct neighbour OR if the path was forgotten in the meantime.
+   */
+  struct DistanceVectorHop *dvh;
+
+  /**
+   * Queue used for transmission, NULL if the queue has been destroyed
+   * (which may happen before we get an acknowledgement).
+   */
+  struct Queue *queue;
+
+  /**
+   * Time of the transmission, for RTT calculation.
+   */
+  struct GNUNET_TIME_Absolute transmission_time;
+
+  /**
+   * Number of bytes of the original message (to calculate bandwidth).
+   */
+  uint16_t message_size;
+};
+
+
 /**
  * One possible hop towards a DV target.
  */
@@ -1064,6 +1157,16 @@ struct DistanceVectorHop
    */
   struct DistanceVectorHop *prev_neighbour;
 
+  /**
+   * Head of DLL of PAs that used our @a path.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs that used our @a path.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
   /**
    * What would be the next hop to @e target?
    */
@@ -1161,17 +1264,6 @@ struct DistanceVector
 };
 
 
-/**
- * A queue is a message queue provided by a communicator
- * via which we can reach a particular neighbour.
- */
-struct Queue;
-
-/**
- * Message awaiting transmission. See detailed comments below.
- */
-struct PendingMessage;
-
 /**
  * Entry identifying transmission in one of our `struct
  * Queue` which still awaits an ACK.  This is used to
@@ -1237,6 +1329,16 @@ struct Queue
    */
   struct Queue *next_client;
 
+  /**
+   * Head of DLL of PAs that used this queue.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs that used this queue.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
   /**
    * Head of DLL of unacked transmission requests.
    */
@@ -1299,14 +1401,6 @@ struct Queue
    */
   uint32_t qid;
 
-  /**
-   * UUID used to map acknowledgements back to the queue that
-   * was used for transmission. Note that @e queue_uuid-s are
-   * only unique per neighbour (generated via `queue_uuid_gen`
-   * of `struct Neighbour`).
-   */
-  uint32_t queue_uuid;
-
   /**
    * Maximum transmission unit supported by this queue.
    */
@@ -1356,8 +1450,8 @@ struct ReassemblyContext
 {
 
   /**
-   * Original message ID for of the message that all the
-   * fragments belong to.
+   * Original message ID for of the message that all the fragments
+   * belong to.
    */
   struct MessageUUIDP msg_uuid;
 
@@ -1392,36 +1486,12 @@ struct ReassemblyContext
    */
   struct GNUNET_TIME_Absolute reassembly_timeout;
 
-  /**
-   * Average delay of all acks in @e extra_acks and @e frag_uuid.
-   * Should be reset to zero when @e num_acks is set to 0.
-   */
-  struct GNUNET_TIME_Relative avg_ack_delay;
-
   /**
    * Time we received the last fragment.  @e avg_ack_delay must be
    * incremented by now - @e last_frag multiplied by @e num_acks.
    */
   struct GNUNET_TIME_Absolute last_frag;
 
-  /**
-   * Bitfield of up to 64 additional fragments following @e frag_uuid
-   * to be acknowledged in the next cummulative ACK.
-   */
-  uint64_t extra_acks;
-
-  /**
-   * Unique ID of the lowest fragment UUID to be acknowledged in the
-   * next cummulative ACK.  Only valid if @e num_acks > 0.
-   */
-  uint32_t frag_uuid;
-
-  /**
-   * Number of ACKs we have accumulated so far.  Reset to 0
-   * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
-   */
-  unsigned int num_acks;
-
   /**
    * How big is the message we are reassembling in total?
    */
@@ -1526,26 +1596,6 @@ struct Neighbour
    */
   struct GNUNET_TIME_Absolute earliest_timeout;
 
-  /**
-   * Incremented by one for each queue to generate
-   * unique queue identifiers. Initially set to a random value.
-   *
-   * FIXME: Deal with wrap around (might be triggered by very
-   * persistent adversary).
-   */
-  uint32_t queue_uuid_gen;
-
-  /**
-   * Incremented by one for each message sent to this neighbour, to
-   * uniquely identify that message in replies (note that fragments
-   * use another additional counter). Initially set to a random value.
-   *
-   * It should be safe to assume that by the time this value may wrap
-   * around, the original message is long "gone" and no longer
-   * relevant.
-   */
-  uint32_t message_uuid_ctr;
-
   /**
    * Do we have a confirmed working queue and are thus visible to
    * CORE?
@@ -1681,6 +1731,16 @@ struct PendingMessage
    */
   struct PendingMessage *prev_frag;
 
+  /**
+   * Head of DLL of PAs for this pending message.
+   */
+  struct PendingAcknowledgement *pa_head;
+
+  /**
+   * Tail of DLL of PAs for this pending message.
+   */
+  struct PendingAcknowledgement *pa_tail;
+
   /**
    * This message, reliability boxed. Only possibly available if @e pmt is
    * #PMT_CORE.
@@ -1738,11 +1798,6 @@ struct PendingMessage
    */
   struct MessageUUIDP msg_uuid;
 
-  /**
-   * Counter incremented per generated fragment.
-   */
-  uint32_t frag_uuidgen;
-
   /**
    * Type of the pending message.
    */
@@ -1767,6 +1822,66 @@ struct PendingMessage
 };
 
 
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayload
+{
+  /**
+   * When did we receive the message we are ACKing?  Used to calculate
+   * the delay we introduced by cummulating ACKs.
+   */
+  struct GNUNET_TIME_Absolute receive_time;
+
+  /**
+   * UUID of a message being acknowledged.
+   */
+  struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Data structure in which we track acknowledgements still to
+ * be sent to the
+ */
+struct AcknowledgementCummulator
+{
+  /**
+   * Target peer for which we are accumulating ACKs here.
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * ACK data being accumulated.  Only @e num_acks slots are valid.
+   */
+  struct TransportCummulativeAckPayload ack_uuids[MAX_CUMMULATIVE_ACKS];
+
+  /**
+   * Task scheduled either to transmit the cummulative ACK message,
+   * or to clean up this data structure after extended periods of
+   * inactivity (if @e num_acks is zero).
+   */
+  struct GNUNET_SCHEDULER_Task *task;
+
+  /**
+   * When is @e task run (only used if @e num_acks is non-zero)?
+   */
+  struct GNUNET_TIME_Absolute min_transmission_time;
+
+  /**
+   * Counter to produce the `ack_counter` in the `struct
+   * TransportReliabilityAckMessage`.  Allows the receiver to detect
+   * lost ACK messages.  Incremented by @e num_acks upon transmission.
+   */
+  uint32_t ack_counter;
+
+  /**
+   * Number of entries used in @e ack_uuids.  Reset to 0 upon transmission.
+   */
+  unsigned int num_acks;
+};
+
+
 /**
  * One of the addresses of this peer.
  */
@@ -2164,6 +2279,18 @@ static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers;
 
+/**
+ * Map from PIDs to `struct AcknowledgementCummulator`s.
+ * Here we track the cummulative ACKs for transmission.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators;
+
+/**
+ * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
+ * a `struct PendingAcknowledgement`.
+ */
+static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
+
 /**
  * Map from PIDs to `struct DistanceVector` entries describing
  * known paths to the peer.
@@ -2233,6 +2360,61 @@ static struct GNUNET_SCHEDULER_Task *dvlearn_task;
  */
 static struct GNUNET_SCHEDULER_Task *validation_task;
 
+/**
+ * The most recent PA we have created, head of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_head;
+
+/**
+ * The oldest PA we have created, tail of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_tail;
+
+/**
+ * Number of entries in the #pa_head/#pa_tail DLL.  Used to
+ * limit the size of the data structure.
+ */
+static unsigned int pa_count;
+
+
+/**
+ * Release @a pa data structure.
+ *
+ * @param pa data structure to release
+ */
+static void
+free_pending_acknowledgement (struct PendingAcknowledgement *pa)
+{
+  struct Queue *q = pa->queue;
+  struct PendingMessage *pm = pa->pm;
+  struct DistanceVectorHop *dvh = pa->dvh;
+
+  GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
+  pa_count--;
+  if (NULL != q)
+  {
+    GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
+    pa->queue = NULL;
+  }
+  if (NULL != pm)
+  {
+    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+    pa->pm = NULL;
+  }
+  if (NULL != dvh)
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+    pa->queue = NULL;
+  }
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multishortmap_remove (pending_acks,
+                                                        &pa->ack_uuid.value,
+                                                        pa));
+  GNUNET_free (pa);
+}
+
 
 /**
  * Free cached ephemeral key.
@@ -2329,7 +2511,13 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
 {
   struct Neighbour *n = dvh->next_hop;
   struct DistanceVector *dv = dvh->dv;
+  struct PendingAcknowledgement *pa;
 
+  while (NULL != (pa = dvh->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+    pa->dvh = NULL;
+  }
   GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
   GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh);
   GNUNET_free (dvh);
@@ -2733,6 +2921,7 @@ free_queue (struct Queue *queue)
                             .rtt = GNUNET_TIME_UNIT_FOREVER_REL};
   struct QueueEntry *qe;
   int maxxed;
+  struct PendingAcknowledgement *pa;
 
   if (NULL != queue->transmit_task)
   {
@@ -2744,6 +2933,12 @@ free_queue (struct Queue *queue)
     GNUNET_SCHEDULER_cancel (queue->visibility_task);
     queue->visibility_task = NULL;
   }
+  while (NULL != (pa = queue->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
+    pa->queue = NULL;
+  }
+
   GNUNET_CONTAINER_MDLL_remove (neighbour,
                                 neighbour->queue_head,
                                 neighbour->queue_tail,
@@ -3006,6 +3201,9 @@ check_client_send (void *cls, const struct 
OutboundMessage *obm)
 
 /**
  * Free fragment tree below @e root, excluding @e root itself.
+ * FIXME: this does NOT seem to have the intended semantics
+ * based on how this is called. Seems we generally DO expect
+ * @a root to be free'ed itself as well!
  *
  * @param root root of the tree to free
  */
@@ -3016,7 +3214,14 @@ free_fragment_tree (struct PendingMessage *root)
 
   while (NULL != (frag = root->head_frag))
   {
+    struct PendingAcknowledgement *pa;
+
     free_fragment_tree (frag);
+    while (NULL != (pa = frag->pa_head))
+    {
+      GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
+      pa->pm = NULL;
+    }
     GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, 
frag);
     GNUNET_free (frag);
   }
@@ -3035,6 +3240,7 @@ free_pending_message (struct PendingMessage *pm)
 {
   struct TransportClient *tc = pm->client;
   struct Neighbour *target = pm->target;
+  struct PendingAcknowledgement *pa;
 
   if (NULL != tc)
   {
@@ -3047,6 +3253,12 @@ free_pending_message (struct PendingMessage *pm)
                                 target->pending_msg_head,
                                 target->pending_msg_tail,
                                 pm);
+  while (NULL != (pa = pm->pa_head))
+  {
+    GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+    pa->pm = NULL;
+  }
+
   free_fragment_tree (pm);
   if (NULL != pm->qe)
   {
@@ -4260,32 +4472,108 @@ check_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
 
 
 /**
- * Generate a fragment acknowledgement for an @a rc.
+ * Clean up an idle cummulative acknowledgement data structure.
  *
- * @param rc context to generate ACK for, @a rc ACK state is reset
+ * @param cls a `struct AcknowledgementCummulator *`
  */
 static void
-send_fragment_ack (struct ReassemblyContext *rc)
+destroy_ack_cummulator (void *cls)
 {
-  struct TransportFragmentAckMessage *ack;
+  struct AcknowledgementCummulator *ac = cls;
 
-  ack = GNUNET_new (struct TransportFragmentAckMessage);
-  ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
-  ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
-  ack->frag_uuid.uuid = htonl (rc->frag_uuid);
-  ack->extra_acks = GNUNET_htonll (rc->extra_acks);
-  ack->msg_uuid = rc->msg_uuid;
-  ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
-  if (0 == rc->msg_missing)
-    ack->reassembly_timeout = GNUNET_TIME_relative_hton (
-      GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
+  ac->task = NULL;
+  GNUNET_assert (0 == ac->num_acks);
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac));
+  GNUNET_free (ac);
+}
+
+
+/**
+ * Do the transmission of a cummulative acknowledgement now.
+ *
+ * @param cls a `struct AcknowledgementCummulator *`
+ */
+static void
+transmit_cummulative_ack_cb (void *cls)
+{
+  struct AcknowledgementCummulator *ac = cls;
+  struct TransportReliabilityAckMessage *ack;
+  struct TransportCummulativeAckPayloadP *ap;
+
+  ac->task = NULL;
+  GNUNET_assert (0 < ac->ack_counter);
+  ack = GNUNET_malloc (sizeof (*ack) +
+                       ac->ack_counter *
+                         sizeof (struct TransportCummulativeAckPayloadP));
+  ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
+  ack->header.size =
+    htons (sizeof (*ack) +
+           ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
+  ack->ack_counter = htonl (ac->ack_counter++);
+  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
+  for (unsigned int i = 0; i < ac->ack_counter; i++)
+  {
+    ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
+    ap[i].ack_delay = GNUNET_TIME_relative_hton (
+      GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
+  }
+  route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+  ac->num_acks = 0;
+  ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
+                                           &destroy_ack_cummulator,
+                                           ac);
+}
+
+
+/**
+ * Transmit an acknowledgement for @a ack_uuid to @a pid delaying
+ * transmission by at most @a ack_delay.
+ *
+ * @param pid target peer
+ * @param ack_uuid UUID to ack
+ * @param max_delay how long can the ACK wait
+ */
+static void
+cummulative_ack (const struct GNUNET_PeerIdentity *pid,
+                 const struct AcknowledgementUUIDP *ack_uuid,
+                 struct GNUNET_TIME_Absolute max_delay)
+{
+  struct AcknowledgementCummulator *ac;
+
+  ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
+  if (NULL == ac)
+  {
+    ac = GNUNET_new (struct AcknowledgementCummulator);
+    ac->target = *pid;
+    ac->min_transmission_time = max_delay;
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multipeermap_put (
+                     ack_cummulators,
+                     &ac->target,
+                     ac,
+                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  }
   else
-    ack->reassembly_timeout = GNUNET_TIME_relative_hton (
-      GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
-  route_message (&rc->neighbour->pid, &ack->header, RMO_DV_ALLOWED);
-  rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
-  rc->num_acks = 0;
-  rc->extra_acks = 0LLU;
+  {
+    if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
+    {
+      /* must run immediately, ack buffer full! */
+      GNUNET_SCHEDULER_cancel (ac->task);
+      transmit_cummulative_ack_cb (ac);
+    }
+    GNUNET_SCHEDULER_cancel (ac->task);
+    ac->min_transmission_time =
+      GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
+  }
+  GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
+  ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
+  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
+  ac->num_acks++;
+  ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
+                                      &transmit_cummulative_ack_cb,
+                                      ac);
 }
 
 
@@ -4348,10 +4636,8 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
   uint16_t msize;
   uint16_t fsize;
   uint16_t frag_off;
-  uint32_t frag_uuid;
   char *target;
   struct GNUNET_TIME_Relative cdelay;
-  int ack_now;
   struct FindByMessageUuidContext fc;
 
   n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
@@ -4417,6 +4703,12 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
 
   /* reassemble */
   fsize = ntohs (fb->header.size) - sizeof (*fb);
+  if (0 == fsize)
+  {
+    GNUNET_break (0);
+    finish_cmc_handling (cmc);
+    return;
+  }
   frag_off = ntohs (fb->frag_off);
   memcpy (&target[frag_off], &fb[1], fsize);
   /* update bitfield and msg_missing */
@@ -4430,60 +4722,17 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
   }
 
   /* Compute cummulative ACK */
-  frag_uuid = ntohl (fb->frag_uuid.uuid);
   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
-  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks);
+  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
+  if (0 == rc->msg_missing)
+    cdelay = GNUNET_TIME_UNIT_ZERO;
+  cummulative_ack (&cmc->im.sender,
+                   &fb->ack_uuid,
+                   GNUNET_TIME_relative_to_absolute (cdelay));
   rc->last_frag = GNUNET_TIME_absolute_get ();
-  rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay);
-  ack_now = GNUNET_NO;
-  if (0 == rc->num_acks)
-  {
-    /* case one: first ack */
-    rc->frag_uuid = frag_uuid;
-    rc->extra_acks = 0LLU;
-    rc->num_acks = 1;
-  }
-  else if ((frag_uuid >= rc->frag_uuid) && (frag_uuid <= rc->frag_uuid + 64))
-  {
-    /* case two: ack fits after existing min UUID */
-    if ((frag_uuid == rc->frag_uuid) ||
-        (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))))
-    {
-      /* duplicate fragment, ack now! */
-      ack_now = GNUNET_YES;
-    }
-    else
-    {
-      rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
-      rc->num_acks++;
-    }
-  }
-  else if ((rc->frag_uuid > frag_uuid) &&
-           (((rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks)) ||
-            ((rc->frag_uuid < frag_uuid + 64) &&
-             (rc->extra_acks ==
-              (rc->extra_acks &
-               ~((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))))))
-  {
-    /* can fit ack by shifting extra acks and starting at
-       frag_uid, test above esured that the bits we will
-       shift 'extra_acks' by are all zero. */
-    rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
-    rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
-    rc->frag_uuid = frag_uuid;
-    rc->num_acks++;
-  }
-  if (65 == rc->num_acks) /* OPTIMIZE-FIXME: maybe use smaller threshold? This
-                             is very aggressive. */
-    ack_now = GNUNET_YES; /* maximum acks received */
-  // FIXME: possibly also ACK based on RTT (but for that we'd need to
-  // determine the queue used for the ACK first!)
-
   /* is reassembly complete? */
   if (0 != rc->msg_missing)
   {
-    if (ack_now)
-      send_fragment_ack (rc);
     finish_cmc_handling (cmc);
     return;
   }
@@ -4497,7 +4746,6 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
     return;
   }
   /* successful reassembly */
-  send_fragment_ack (rc);
   demultiplex_with_cmc (cmc, msg);
   /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
      en-route and we forget that we finished this reassembly immediately!
@@ -4508,172 +4756,149 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
 
 
 /**
- * Check the @a fa against the fragments associated with @a pm.
- * If it matches, remove the matching fragments from the transmission
- * list.
+ * Communicator gave us a reliability box.  Check the message.
  *
- * @param pm pending message to check against the ack
- * @param fa the ack that was received
- * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
  */
 static int
-check_ack_against_pm (struct PendingMessage *pm,
-                      const struct TransportFragmentAckMessage *fa)
-{
-  int match;
-  struct PendingMessage *nxt;
-  uint32_t fs = ntohl (fa->frag_uuid.uuid);
-  uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
-
-  match = GNUNET_NO;
-  for (struct PendingMessage *frag = pm->head_frag; NULL != frag; frag = nxt)
-  {
-    const struct TransportFragmentBoxMessage *tfb =
-      (const struct TransportFragmentBoxMessage *) &pm[1];
-    uint32_t fu = ntohl (tfb->frag_uuid.uuid);
-
-    GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
-    nxt = frag->next_frag;
-    /* Check for exact match or match in the 'xtra' bitmask */
-    if ((fu == fs) ||
-        ((fu > fs) && (fu <= fs + 64) && (0 != (1LLU << (fu - fs - 1) & 
xtra))))
-    {
-      match = GNUNET_YES;
-      free_fragment_tree (frag);
-    }
-  }
-  return match;
+check_reliability_box (void *cls,
+                       const struct TransportReliabilityBoxMessage *rb)
+{
+  GNUNET_MQ_check_boxed_message (rb);
+  return GNUNET_YES;
 }
 
 
 /**
- * Communicator gave us a fragment acknowledgement.  Process the request.
+ * Communicator gave us a reliability box.  Process the request.
  *
  * @param cls a `struct CommunicatorMessageContext` (must call
  * #finish_cmc_handling() when done)
- * @param fa the message that was received
+ * @param rb the message that was received
  */
 static void
-handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa)
+handle_reliability_box (void *cls,
+                        const struct TransportReliabilityBoxMessage *rb)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct Neighbour *n;
-  int matched;
+  const struct GNUNET_MessageHeader *inbox =
+    (const struct GNUNET_MessageHeader *) &rb[1];
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
-  if (NULL == n)
-  {
-    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+  // FIXME: call cummulative_ack(), have ack_countdown influence max_delay!
+  (void) (0 == ntohl (rb->ack_countdown));
+  /* continue with inner message */
+  demultiplex_with_cmc (cmc, inbox);
+}
 
-    GNUNET_break (0);
-    finish_cmc_handling (cmc);
-    GNUNET_SERVICE_client_drop (client);
-    return;
-  }
-  /* FIXME-OPTIMIZE: maybe use another hash map here? */
-  matched = GNUNET_NO;
-  for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm;
-       pm = pm->prev_neighbour)
-  {
-    if (0 != GNUNET_memcmp (&fa->msg_uuid, &pm->msg_uuid))
-      continue;
-    matched = GNUNET_YES;
-    if (GNUNET_YES == check_ack_against_pm (pm, fa))
-    {
-      struct GNUNET_TIME_Relative avg_ack_delay =
-        GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
-      // FIXME: update RTT and other reliability data!
-      // ISSUE: we don't know which of n's queues the message(s)
-      // took (and in fact the different messages might have gone
-      // over different queues and possibly over multiple).
-      // => track queues with PendingMessages, and update RTT only if
-      //    the queue used is unique?
-      //    -> how can we get loss rates?
-      //    -> or, add extra state to Box and ACK to identify queue?
-      // IDEA: generate MULTIPLE frag-uuids per fragment and track
-      //    the queue with the fragment! (-> this logic must
-      //    be moved into check_ack_against_pm!)
-      (void) avg_ack_delay;
-    }
-    else
-    {
-      GNUNET_STATISTICS_update (GST_stats,
-                                "# FRAGMENT_ACKS dropped, no matching 
fragment",
-                                1,
-                                GNUNET_NO);
-    }
-    if (NULL == pm->head_frag)
-    {
-      // if entire message is ACKed, handle that as well.
-      // => clean up PM, any post actions?
-      free_pending_message (pm);
-    }
-    else
-    {
-      struct GNUNET_TIME_Relative reassembly_timeout =
-        GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
-      // OPTIMIZE-FIXME: adjust retransmission strategy based on
-      // reassembly_timeout!
-      (void) reassembly_timeout;
-    }
-    break;
-  }
-  if (GNUNET_NO == matched)
-  {
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# FRAGMENT_ACKS dropped, no matching pending 
message",
-                              1,
-                              GNUNET_NO);
-  }
-  finish_cmc_handling (cmc);
+
+/**
+ * We have successfully transmitted data via @a q, update metrics.
+ *
+ * @param q queue to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
+ */
+static void
+update_queue_performance (struct Queue *q,
+                          struct GNUNET_TIME_Relative rtt,
+                          uint16_t bytes_transmitted_ok)
+{
+  // FIXME: implement!
 }
 
 
 /**
- * Communicator gave us a reliability box.  Check the message.
+ * We have successfully transmitted data via @a dvh, update metrics.
  *
- * @param cls a `struct CommunicatorMessageContext`
- * @param rb the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param dvh distance vector path data to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
  */
-static int
-check_reliability_box (void *cls,
-                       const struct TransportReliabilityBoxMessage *rb)
+static void
+update_dvh_performance (struct DistanceVectorHop *dvh,
+                        struct GNUNET_TIME_Relative rtt,
+                        uint16_t bytes_transmitted_ok)
 {
-  GNUNET_MQ_check_boxed_message (rb);
-  return GNUNET_YES;
+  // FIXME: implement!
 }
 
 
 /**
- * Communicator gave us a reliability box.  Process the request.
+ * The @a pa was acknowledged, process the acknowledgement.
  *
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param rb the message that was received
+ * @param pa the pending acknowledgement that was satisfied
+ * @param ack_delay artificial delay from cummulative acks created by the other
+ * peer
  */
 static void
-handle_reliability_box (void *cls,
-                        const struct TransportReliabilityBoxMessage *rb)
+handle_acknowledged (struct PendingAcknowledgement *pa,
+                     struct GNUNET_TIME_Relative ack_delay)
 {
-  struct CommunicatorMessageContext *cmc = cls;
-  const struct GNUNET_MessageHeader *inbox =
-    (const struct GNUNET_MessageHeader *) &rb[1];
+  struct PendingMessage *pm = pa->pm;
+  struct GNUNET_TIME_Relative delay;
 
-  if (0 == ntohl (rb->ack_countdown))
-  {
-    struct TransportReliabilityAckMessage *ack;
+  delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
+  if (delay.rel_value_us > ack_delay.rel_value_us)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  else
+    delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
+  if (NULL != pa->queue)
+    update_queue_performance (pa->queue, delay, pa->message_size);
+  if (NULL != pa->dvh)
+    update_dvh_performance (pa->dvh, delay, pa->message_size);
+  if (NULL != pm)
+  {
+    if (NULL != pm->frag_parent)
+    {
+      pm = pm->frag_parent;
+      free_fragment_tree (pa->pm);
+    }
+    while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
+    {
+      struct PendingMessage *parent = pm->frag_parent;
 
-    /* FIXME-OPTIMIZE: implement cummulative ACKs and ack_countdown,
-       then setting the avg_ack_delay field below: */
-    ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct MessageUUIDP));
-    ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
-    ack->header.size = htons (sizeof (*ack) + sizeof (struct MessageUUIDP));
-    memcpy (&ack[1], &rb->msg_uuid, sizeof (struct MessageUUIDP));
-    route_message (&cmc->im.sender, &ack->header, RMO_DV_ALLOWED);
+      free_fragment_tree (pm);
+      pm = parent;
+    }
+    if (NULL != pm->head_frag)
+      pm = NULL; /* we are done, otherwise free 'pm' below */
   }
-  /* continue with inner message */
-  demultiplex_with_cmc (cmc, inbox);
+  if (NULL != pm)
+    free_pending_message (pm);
+  free_pending_acknowledgement (pa);
+}
+
+
+/**
+ * Communicator gave us a reliability ack.  Check it is well-formed.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (unused)
+ * @param ra the message that was received
+ * @return #GNUNET_Ok if @a ra is well-formed
+ */
+static int
+check_reliability_ack (void *cls,
+                       const struct TransportReliabilityAckMessage *ra)
+{
+  unsigned int n_acks;
+
+  (void) cls;
+  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+           sizeof (struct TransportCummulativeAckPayloadP);
+  if (0 == n_acks)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if ((ntohs (ra->header.size) - sizeof (*ra)) !=
+      n_acks * sizeof (struct TransportCummulativeAckPayloadP))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
 }
 
 
@@ -4689,70 +4914,33 @@ handle_reliability_ack (void *cls,
                         const struct TransportReliabilityAckMessage *ra)
 {
   struct CommunicatorMessageContext *cmc = cls;
-  struct Neighbour *n;
+  const struct TransportCummulativeAckPayloadP *ack;
+  struct PendingAcknowledgement *pa;
   unsigned int n_acks;
-  const struct MessageUUIDP *msg_uuids;
-  struct PendingMessage *nxt;
-  int matched;
+  uint32_t ack_counter;
 
-  n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
-  if (NULL == n)
+  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+           sizeof (struct TransportCummulativeAckPayloadP);
+  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
+  for (unsigned int i = 0; i < n_acks; i++)
   {
-    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
-
-    GNUNET_break (0);
-    finish_cmc_handling (cmc);
-    GNUNET_SERVICE_client_drop (client);
-    return;
-  }
-  n_acks =
-    (ntohs (ra->header.size) - sizeof (*ra)) / sizeof (struct MessageUUIDP);
-  msg_uuids = (const struct MessageUUIDP *) &ra[1];
-
-  /* FIXME-OPTIMIZE: maybe use another hash map here? */
-  matched = GNUNET_NO;
-  for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; pm = nxt)
-  {
-    int in_list;
-
-    nxt = pm->next_neighbour;
-    in_list = GNUNET_NO;
-    for (unsigned int i = 0; i < n_acks; i++)
+    pa =
+      GNUNET_CONTAINER_multishortmap_get (pending_acks, 
&ack[i].ack_uuid.value);
+    if (NULL == pa)
     {
-      if (0 != GNUNET_memcmp (&msg_uuids[i], &pm->msg_uuid))
-        continue;
-      in_list = GNUNET_YES;
-      break;
-    }
-    if (GNUNET_NO == in_list)
+      GNUNET_STATISTICS_update (
+        GST_stats,
+        "# FRAGMENT_ACKS dropped, no matching pending message",
+        1,
+        GNUNET_NO);
       continue;
-
-    /* this pm was acked! */
-    matched = GNUNET_YES;
-    free_pending_message (pm);
-
-    {
-      struct GNUNET_TIME_Relative avg_ack_delay =
-        GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
-      // FIXME: update RTT and other reliability data!
-      // ISSUE: we don't know which of n's queues the message(s)
-      // took (and in fact the different messages might have gone
-      // over different queues and possibly over multiple).
-      // => track queues with PendingMessages, and update RTT only if
-      //    the queue used is unique?
-      //    -> how can we get loss rates?
-      //    -> or, add extra state to MSG and ACKs to identify queue?
-      //    -> if we do this, might just do the same for the avg_ack_delay!
-      (void) avg_ack_delay;
     }
+    handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
   }
-  if (GNUNET_NO == matched)
-  {
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# FRAGMENT_ACKS dropped, no matching pending 
message",
-                              1,
-                              GNUNET_NO);
-  }
+
+  ack_counter = htonl (ra->ack_counter);
+  // FIXME: track ACK losses based on ack_counter somewhere!
+  // (DV and/or Neighbour?)
   finish_cmc_handling (cmc);
 }
 
@@ -4968,7 +5156,8 @@ backtalker_monotime_cb (void *cls,
       1,
       GNUNET_NO);
     b->monotonic_time = mt;
-    /* Setting body_size to 0 prevents call to #forward_backchannel_payload() 
*/
+    /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
+     */
     b->body_size = 0;
     return;
   }
@@ -5268,13 +5457,13 @@ activate_core_visible_dv_path (struct DistanceVectorHop 
*hop)
  * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
  *
  * @param path the path we learned, path[0] should be us,
- *             and then path contains a valid path from us to 
`path[path_len-1]`
- *             path[1] should be a direct neighbour (we should check!)
+ *             and then path contains a valid path from us to
+ * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
  * @param path_len number of entries on the @a path, at least three!
  * @param network_latency how long does the message take from us to
  * `path[path_len-1]`? set to "forever" if unknown
- * @param path_valid_until how long is this path considered validated? Maybe be
- * zero.
+ * @param path_valid_until how long is this path considered validated? Maybe
+ * be zero.
  * @return #GNUNET_YES on success,
  *         #GNUNET_NO if we have better path(s) to the target
  *         #GNUNET_SYSERR if the path is useless and/or invalid
@@ -5603,7 +5792,8 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
   finish_cmc_handling (cmc);
 
   /* OPTIMIZE-FIXME: Technically, we only need to bother checking
-     the initiator signature if we send the message back to the initiator... */
+     the initiator signature if we send the message back to the initiator...
+   */
   if (GNUNET_OK != validate_dv_initiator_signature (&dvl->initiator,
                                                     &dvl->challenge,
                                                     &dvl->init_sig))
@@ -6349,18 +6539,14 @@ demultiplex_with_cmc (struct CommunicatorMessageContext 
*cmc,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
                             struct TransportFragmentBoxMessage,
                             &cmc),
-     GNUNET_MQ_hd_fixed_size (fragment_ack,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
-                              struct TransportFragmentAckMessage,
-                              &cmc),
      GNUNET_MQ_hd_var_size (reliability_box,
                             GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
                             struct TransportReliabilityBoxMessage,
                             &cmc),
-     GNUNET_MQ_hd_fixed_size (reliability_ack,
-                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
-                              struct TransportReliabilityAckMessage,
-                              &cmc),
+     GNUNET_MQ_hd_var_size (reliability_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+                            struct TransportReliabilityAckMessage,
+                            &cmc),
      GNUNET_MQ_hd_var_size (backchannel_encapsulation,
                             
GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
                             struct TransportBackchannelEncapsulationMessage,
@@ -6460,28 +6646,77 @@ set_pending_message_uuid (struct PendingMessage *pm)
 }
 
 
+/**
+ * Setup data structure waiting for acknowledgements.
+ *
+ * @param queue queue the @a pm will be sent over
+ * @param dvh path the message will take, may be NULL
+ * @param pm the pending message for transmission
+ * @return corresponding fresh pending acknowledgement
+ */
+static struct PendingAcknowledgement *
+prepare_pending_acknowledgement (struct Queue *queue,
+                                 struct DistanceVectorHop *dvh,
+                                 struct PendingMessage *pm)
+{
+  struct PendingAcknowledgement *pa;
+
+  pa = GNUNET_new (struct PendingAcknowledgement);
+  pa->queue = queue;
+  pa->dvh = dvh;
+  pa->pm = pm;
+  do
+  {
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                                &pa->ack_uuid,
+                                sizeof (pa->ack_uuid));
+  } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
+                           pending_acks,
+                           &pa->ack_uuid.value,
+                           pa,
+                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
+  GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
+  if (NULL != dvh)
+    GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa);
+  pa->transmission_time = GNUNET_TIME_absolute_get ();
+  pa->message_size = pm->bytes_msg;
+  return pa;
+}
+
+
 /**
  * Fragment the given @a pm to the given @a mtu.  Adds
  * additional fragments to the neighbour as well. If the
  * @a mtu is too small, generates and error for the @a pm
  * and returns NULL.
  *
+ * @param queue which queue to fragment for
+ * @param dvh path the message will take, or NULL
  * @param pm pending message to fragment for transmission
- * @param mtu MTU to apply
  * @return new message to transmit
  */
 static struct PendingMessage *
-fragment_message (struct PendingMessage *pm, uint16_t mtu)
+fragment_message (struct Queue *queue,
+                  struct DistanceVectorHop *dvh,
+                  struct PendingMessage *pm)
 {
+  struct PendingAcknowledgement *pa;
   struct PendingMessage *ff;
+  uint16_t mtu;
 
+  pa = prepare_pending_acknowledgement (queue, dvh, pm);
+  mtu = (0 == queue->mtu)
+          ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
+          : queue->mtu;
   set_pending_message_uuid (pm);
 
   /* This invariant is established in #handle_add_queue_message() */
   GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
 
   /* select fragment for transmission, descending the tree if it has
-     been expanded until we are at a leaf or at a fragment that is small enough
+     been expanded until we are at a leaf or at a fragment that is small
+     enough
    */
   ff = pm;
   while (((ff->bytes_msg > mtu) || (pm == ff)) &&
@@ -6527,7 +6762,7 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu)
     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
     tfb.header.size =
       htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
-    tfb.frag_uuid.uuid = htonl (pm->frag_uuidgen++);
+    tfb.ack_uuid = pa->ack_uuid;
     tfb.msg_uuid = pm->msg_uuid;
     tfb.frag_off = htons (ff->frag_off + xoff);
     tfb.msg_size = htons (pm->bytes_msg);
@@ -6558,13 +6793,18 @@ fragment_message (struct PendingMessage *pm, uint16_t 
mtu)
  * @a pm).  If the @a pm is already fragmented or reliability boxed,
  * or itself an ACK, this function simply returns @a pm.
  *
+ * @param queue which queue to prepare transmission for
+ * @param dvh path the message will take, or NULL
  * @param pm pending message to box for transmission over unreliabile queue
  * @return new message to transmit
  */
 static struct PendingMessage *
-reliability_box_message (struct PendingMessage *pm)
+reliability_box_message (struct Queue *queue,
+                         struct DistanceVectorHop *dvh,
+                         struct PendingMessage *pm)
 {
   struct TransportReliabilityBoxMessage rbox;
+  struct PendingAcknowledgement *pa;
   struct PendingMessage *bpm;
   char *msg;
 
@@ -6581,6 +6821,8 @@ reliability_box_message (struct PendingMessage *pm)
     client_send_response (pm, GNUNET_NO, 0);
     return NULL;
   }
+  pa = prepare_pending_acknowledgement (queue, dvh, pm);
+
   bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
                        pm->bytes_msg);
   bpm->target = pm->target;
@@ -6593,7 +6835,8 @@ reliability_box_message (struct PendingMessage *pm)
   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
-  rbox.msg_uuid = pm->msg_uuid;
+
+  rbox.ack_uuid = pa->ack_uuid;
   msg = (char *) &bpm[1];
   memcpy (msg, &rbox, sizeof (rbox));
   memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
@@ -6698,11 +6941,7 @@ transmit_on_queue (void *cls)
        (NULL != pm->head_frag /* fragments already exist, should
                                 respect that even if MTU is 0 for
                                 this queue */) )
-    s = fragment_message (s,
-                          (0 == queue->mtu)
-                            ? UINT16_MAX -
-                                sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
-                            : queue->mtu);
+    s = fragment_message (queue, NULL /*FIXME! */, s);
   if (NULL == s)
   {
     /* Fragmentation failed, try next message... */
@@ -6710,7 +6949,7 @@ transmit_on_queue (void *cls)
     return;
   }
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    s = reliability_box_message (s);
+    s = reliability_box_message (queue, NULL /* FIXME! */, s);
   if (NULL == s)
   {
     /* Reliability boxing failed, try next message... */
@@ -7509,8 +7748,8 @@ handle_queue_create_ok (void *cls,
 
 
 /**
- * Communicator tells us that our request to create a queue failed. This 
usually
- * indicates that the provided address is simply invalid or that the
+ * Communicator tells us that our request to create a queue failed. This
+ * usually indicates that the provided address is simply invalid or that the
  * communicator's resources are exhausted.
  *
  * @param cls the `struct TransportClient`
@@ -7803,7 +8042,8 @@ handle_address_consider_verify (
   (void) cls;
   // OPTIMIZE-FIXME: checking that we know this address already should
   //        be done BEFORE checking the signature => HELLO API change!
-  // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / 
validation?!
+  // OPTIMIZE-FIXME: pre-check: rate-limit signature verification /
+  // validation?!
   address =
     GNUNET_HELLO_extract_address (&hdr[1],
                                   ntohs (hdr->header.size) - sizeof (*hdr),
@@ -7950,6 +8190,50 @@ free_validation_state_cb (void *cls,
 }
 
 
+/**
+ * Free pending acknowledgement.
+ *
+ * @param cls NULL
+ * @param key unused
+ * @param value a `struct PendingAcknowledgement`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_pending_ack_cb (void *cls,
+                     const struct GNUNET_ShortHashCode *key,
+                     void *value)
+{
+  struct PendingAcknowledgement *pa = value;
+
+  (void) cls;
+  (void) key;
+  free_pending_acknowledgement (pa);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Free acknowledgement cummulator.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct AcknowledgementCummulator`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_ack_cummulator_cb (void *cls,
+                        const struct GNUNET_PeerIdentity *pid,
+                        void *value)
+{
+  struct AcknowledgementCummulator *ac = value;
+
+  (void) cls;
+  (void) pid;
+  GNUNET_free (ac);
+  return GNUNET_OK;
+}
+
+
 /**
  * Function called when the service shuts down.  Unloads our plugins
  * and cancels pending validations.
@@ -7983,6 +8267,16 @@ do_shutdown (void *cls)
     GNUNET_free (GST_my_private_key);
     GST_my_private_key = NULL;
   }
+  GNUNET_CONTAINER_multipeermap_iterate (ack_cummulators,
+                                         &free_ack_cummulator_cb,
+                                         NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
+  ack_cummulators = NULL;
+  GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
+                                          &free_pending_ack_cb,
+                                          NULL);
+  GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
+  pending_acks = NULL;
   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
   neighbours = NULL;
   GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
@@ -8034,6 +8328,8 @@ run (void *cls,
   /* setup globals */
   GST_cfg = c;
   backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+  pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
+  ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
   neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]