gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated (1e664eddb -> 1d22f56d1)


From: gnunet
Subject: [gnunet] branch master updated (1e664eddb -> 1d22f56d1)
Date: Wed, 20 Dec 2023 09:04:13 +0100

This is an automated email from the git hooks/post-receive script.

t3sserakt pushed a change to branch master
in repository gnunet.

    from 1e664eddb build: dist scripts
     new 36a9952f0 Transport: Added cleanup task to remove QueueEntry we got no 
ACK for.
     new 164badbe1 Transport: Changed logic to first create a queue before 
starting validation.
     new 1d22f56d1 Merge branch 'master' of ssh://git.gnunet.org/gnunet

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/service/transport/gnunet-service-transport.c | 260 ++++++++++++++++-------
 1 file changed, 183 insertions(+), 77 deletions(-)

diff --git a/src/service/transport/gnunet-service-transport.c 
b/src/service/transport/gnunet-service-transport.c
index d0d605465..f48455868 100644
--- a/src/service/transport/gnunet-service-transport.c
+++ b/src/service/transport/gnunet-service-transport.c
@@ -318,6 +318,12 @@
  */
 #define QUEUE_LENGTH_LIMIT 32
 
+/**
+ *
+ */
+#define QUEUE_ENTRY_TIMEOUT \
+        GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
@@ -1820,6 +1826,11 @@ struct QueueEntry
    * Message ID used for this message with the queue used for transmission.
    */
   uint64_t mid;
+
+  /**
+   * Timestamp this QueueEntry was created.
+   */
+  struct GNUNET_TIME_Absolute creation_timestamp;
 };
 
 
@@ -2546,6 +2557,11 @@ struct TransportClient
        */
       unsigned int total_queue_length;
 
+      /**
+       * Task to check for timed out QueueEntry.
+       */
+      struct GNUNET_SCHEDULER_Task *free_queue_entry_task;
+
       /**
        * Characteristics of this communicator.
        */
@@ -4025,6 +4041,8 @@ client_disconnect_cb (void *cls,
       struct Queue *q;
       struct AddressListEntry *ale;
 
+      if (NULL != tc->details.communicator.free_queue_entry_task)
+        GNUNET_SCHEDULER_cancel 
(tc->details.communicator.free_queue_entry_task);
       while (NULL != (q = tc->details.communicator.queue_head))
         free_queue (q);
       while (NULL != (ale = tc->details.communicator.addr_head))
@@ -4482,6 +4500,38 @@ sign_ephemeral (struct DistanceVector *dv)
 }
 
 
+static void
+free_queue_entry (struct QueueEntry *qe,
+                  struct TransportClient *tc);
+
+
+static void
+free_timedout_queue_entry (void *cls)
+{
+  struct TransportClient *tc = cls;
+  struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
+
+  for (struct Queue *queue = tc->details.communicator.queue_head; NULL != 
queue;
+       queue = queue->next_client)
+  {
+    for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
+      qep = qep->next)
+    {
+      struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference 
(qep->creation_timestamp, now);
+      if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, < , diff))
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Freeing timed out QueueEntry with MID %" PRIu64
+                " and QID %u\n",
+                qep->mid,
+                queue->qid);
+        free_queue_entry(qep, tc);
+      }
+    }
+  }
+}
+
+
 /**
  * Send the message @a payload on @a queue.
  *
@@ -4522,6 +4572,7 @@ queue_send_msg (struct Queue *queue,
     struct QueueEntry *qe;
 
     qe = GNUNET_new (struct QueueEntry);
+    qe->creation_timestamp = GNUNET_TIME_absolute_get ();
     qe->mid = queue->mid_gen;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Create QueueEntry with MID %" PRIu64
@@ -4552,11 +4603,14 @@ queue_send_msg (struct Queue *queue,
     {
       // Messages without FC or fragments can get here.
       if (NULL != pm)
+      {
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                     "Message %" PRIu64
                     " (pm type %u) was not send because queue has no 
capacity.\n",
                     pm->logging_uuid,
                     pm->pmt);
+        pm->qe = NULL;
+      }
       GNUNET_free (env);
       GNUNET_free (qe);
       return;
@@ -4579,6 +4633,15 @@ queue_send_msg (struct Queue *queue,
     if (0 == queue->q_capacity)
       queue->idle = GNUNET_NO;
 
+    if (GNUNET_NO == queue->idle)
+    {
+      struct TransportClient *tc = queue->tc;
+
+      if (NULL == tc->details.communicator.free_queue_entry_task)
+        tc->details.communicator.free_queue_entry_task = 
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+                                                                               
        &free_timedout_queue_entry,
+                                                                               
        tc);
+    }
     if (NULL != pm && NULL != (pa = pm->pa_head))
     {
       while (pm != pa->pm)
@@ -8679,12 +8742,21 @@ start_address_validation (const struct 
GNUNET_PeerIdentity *pid,
 }
 
 
+static struct Queue *
+find_queue (const struct GNUNET_PeerIdentity *pid, const char *address);
+
+
+static void
+suggest_to_connect (const struct GNUNET_PeerIdentity *pid, const char 
*address);
+
+
 static void
 hello_for_incoming_cb (void *cls,
                        const struct GNUNET_PeerIdentity *pid,
                        const char *uri)
 {
   (void) cls;
+  struct Queue *q;
   int pfx_len;
   const char *eou;
   char *address;
@@ -8702,8 +8774,13 @@ hello_for_incoming_cb (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "helo for client %s\n",
               address);
-
-  start_address_validation (pid, address);
+  q = find_queue (pid, address);
+  if (NULL == q)
+  {
+    suggest_to_connect (pid, address);
+  }
+  else
+    start_address_validation (pid, address);
   GNUNET_free (address);
 }
 
@@ -10368,77 +10445,12 @@ handle_del_queue_message (void *cls,
 }
 
 
-/**
- * Message was transmitted.  Process the request.
- *
- * @param cls the client
- * @param sma the send message that was sent
- */
 static void
-handle_send_message_ack (void *cls,
-                         const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
+free_queue_entry (struct QueueEntry *qe,
+                  struct TransportClient *tc)
 {
-  struct TransportClient *tc = cls;
-  struct QueueEntry *qe;
   struct PendingMessage *pm;
 
-  if (CT_COMMUNICATOR != tc->type)
-  {
-    GNUNET_break (0);
-    GNUNET_SERVICE_client_drop (tc->client);
-    return;
-  }
-
-  /* find our queue entry matching the ACK */
-  qe = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Looking for queue for PID %s\n",
-              GNUNET_i2s (&sma->receiver));
-  for (struct Queue *queue = tc->details.communicator.queue_head; NULL != 
queue;
-       queue = queue->next_client)
-  {
-    if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver))
-      continue;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Found PID %s\n",
-                GNUNET_i2s (&queue->neighbour->pid));
-
-
-    for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
-         qep = qep->next)
-    {
-      if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl (
-            sma->qid))
-        continue;
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %"
-                  PRIu64 " Ack QID %u\n",
-                  qep->mid,
-                  queue->qid,
-                  GNUNET_ntohll (sma->mid),
-                  ntohl (sma->qid));
-      qe = qep;
-      if ((NULL != qe->pm) && (qe->pm->qe != qe))
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "For pending message %" PRIu64 " we had 
retransmissions.\n",
-                    qe->pm->logging_uuid);
-      break;
-    }
-  }
-  if (NULL == qe)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n",
-                GNUNET_ntohll (sma->mid),
-                ntohl (sma->qid));
-    // TODO I guess this can happen, if the Ack from the peer comes before the 
Ack from the queue.
-    // Update: Maybe QueueEntry was accidentally freed during freeing 
PendingMessage.
-    /* this should never happen */
-    // GNUNET_break (0);
-    // GNUNET_SERVICE_client_drop (tc->client);
-    GNUNET_SERVICE_client_continue (tc->client);
-    return;
-  }
   GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
                                qe->queue->queue_tail,
                                qe);
@@ -10451,7 +10463,6 @@ handle_send_message_ack (void *cls,
               GNUNET_i2s (&qe->queue->neighbour->pid),
               qe->queue->queue_length,
               tc->details.communicator.total_queue_length);
-  GNUNET_SERVICE_client_continue (tc->client);
 
   /* if applicable, resume transmissions that waited on ACK */
   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 ==
@@ -10525,6 +10536,81 @@ handle_send_message_ack (void *cls,
 }
 
 
+/**
+ * Message was transmitted.  Process the request.
+ *
+ * @param cls the client
+ * @param sma the send message that was sent
+ */
+static void
+handle_send_message_ack (void *cls,
+                         const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
+{
+  struct TransportClient *tc = cls;
+  struct QueueEntry *qe;
+
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+
+  /* find our queue entry matching the ACK */
+  qe = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Looking for queue for PID %s\n",
+              GNUNET_i2s (&sma->receiver));
+  for (struct Queue *queue = tc->details.communicator.queue_head; NULL != 
queue;
+       queue = queue->next_client)
+  {
+    if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver))
+      continue;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Found PID %s\n",
+                GNUNET_i2s (&queue->neighbour->pid));
+
+
+    for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
+         qep = qep->next)
+    {
+      if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl (
+            sma->qid))
+        continue;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %"
+                  PRIu64 " Ack QID %u\n",
+                  qep->mid,
+                  queue->qid,
+                  GNUNET_ntohll (sma->mid),
+                  ntohl (sma->qid));
+      qe = qep;
+      if ((NULL != qe->pm) && (qe->pm->qe != qe))
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "For pending message %" PRIu64 " we had 
retransmissions.\n",
+                    qe->pm->logging_uuid);
+      break;
+    }
+  }
+  if (NULL == qe)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n",
+                GNUNET_ntohll (sma->mid),
+                ntohl (sma->qid));
+    // TODO I guess this can happen, if the Ack from the peer comes before the 
Ack from the queue.
+    // Update: Maybe QueueEntry was accidentally freed during freeing 
PendingMessage.
+    /* this should never happen */
+    // GNUNET_break (0);
+    // GNUNET_SERVICE_client_drop (tc->client);
+    GNUNET_SERVICE_client_continue (tc->client);
+    return;
+  }
+  free_queue_entry (qe, tc);
+  GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
 /**
  * Iterator telling new MONITOR client about all existing
  * queues to peers.
@@ -11150,11 +11236,17 @@ handle_add_queue_message (void *cls,
   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
   queue->idle = GNUNET_YES;
   /* check if valdiations are waiting for the queue */
-  (void)
-  GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
-                                              &aqm->receiver,
-                                              
&check_validation_request_pending,
-                                              queue);
+  if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (validation_map,
+                                          &aqm->receiver))
+  {
+    if (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_get_multiple 
(validation_map,
+                                                  &aqm->receiver,
+                                                  
&check_validation_request_pending,
+                                                                     queue))
+      start_address_validation (&aqm->receiver, queue->address);
+  }
+  else
+    start_address_validation (&aqm->receiver, queue->address);
   /* look for traffic for this queue */
   //TODO Check whether this makes any sense at all.
   /*schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
@@ -11319,6 +11411,7 @@ hello_for_client_cb (void *cls,
                      const char *uri)
 {
   (void) cls;
+  struct Queue *q;
   int pfx_len;
   const char *eou;
   char *address;
@@ -11337,7 +11430,13 @@ hello_for_client_cb (void *cls,
               "hello for client %s\n",
               address);
 
-  start_address_validation (pid, address);
+  q = find_queue (pid, address);
+  if (NULL == q)
+  {
+    suggest_to_connect (pid, address);
+  }
+  else
+    start_address_validation (pid, address);
   GNUNET_free (address);
 }
 
@@ -11459,8 +11558,15 @@ handle_request_hello_validation (void *cls,
                                  const struct RequestHelloValidationMessage *m)
 {
   struct TransportClient *tc = cls;
+  struct Queue *q;
 
-  start_address_validation (&m->peer, (const char *) &m[1]);
+  q = find_queue (&m->peer, (const char *) &m[1]);
+  if (NULL == q)
+  {
+    suggest_to_connect (&m->peer, (const char *) &m[1]);
+  }
+  else
+    start_address_validation (&m->peer, (const char *) &m[1]);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]