[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] 01/02: towards proper DATA_ACK handling
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] 01/02: towards proper DATA_ACK handling |
Date: |
Wed, 25 Jan 2017 19:30:30 +0100 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
commit 80d6555ec30182b9a8a59778339f5cbe7929ce60
Author: Christian Grothoff <address@hidden>
AuthorDate: Wed Jan 25 19:29:45 2017 +0100
towards proper DATA_ACK handling
---
src/cadet/gnunet-service-cadet-new_channel.c | 263 +++++++++++++++------------
1 file changed, 150 insertions(+), 113 deletions(-)
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c
b/src/cadet/gnunet-service-cadet-new_channel.c
index dc3d4352c..e561f1992 100644
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ b/src/cadet/gnunet-service-cadet-new_channel.c
@@ -25,6 +25,7 @@
* @author Christian Grothoff
*
* TODO:
+ * - Optimize ACKs by using 'mid_futures' properly!
* - introduce shutdown so we can have half-closed channels, modify
* destroy to include MID to have FIN-ACK equivalents, etc.
* - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
@@ -287,6 +288,8 @@ struct CadetChannel
/**
* Bitfield of already-received messages past @e mid_recv.
+ *
+ * FIXME: not yet properly used (bits here are never set!)
*/
uint64_t mid_futures;
@@ -769,6 +772,11 @@ send_channel_data_ack (struct CadetChannel *ch)
msg.futures = GNUNET_htonll (ch->mid_futures);
if (NULL != ch->last_control_qe)
GCT_send_cancel (ch->last_control_qe);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending DATA_ACK %u:%llX via %s\n",
+ (unsigned int) ntohl (msg.mid.mid),
+ (unsigned long long) ch->mid_futures,
+ GCCH_2s (ch));
ch->last_control_qe = GCT_send (ch->t,
&msg.header,
&send_ack_cb,
@@ -1076,23 +1084,11 @@ is_before (void *cls,
if (delta > (uint32_t) INT_MAX)
{
/* in overflow range, we can safely assume we wrapped around */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%u > %u => %p > %p\n",
- (unsigned int) v1,
- (unsigned int) v2,
- m1,
- m2);
return GNUNET_NO;
}
else
{
/* result is small, thus v2 > v1, thus e1 < e2 */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%u < %u => %p < %p\n",
- (unsigned int) v1,
- (unsigned int) v2,
- m1,
- m2);
return GNUNET_YES;
}
}
@@ -1113,6 +1109,11 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel
*ch,
struct GNUNET_CADET_LocalData *ld;
struct CadetChannelClient *ccc;
size_t payload_size;
+ struct CadetOutOfOrderMessage *com;
+ int duplicate;
+ uint32_t mid_min;
+ uint32_t mid_max;
+ uint32_t mid_msg;
GNUNET_assert (GNUNET_NO == ch->is_loopback);
if ( (GNUNET_YES == ch->destroy) &&
@@ -1153,72 +1154,75 @@ GCCH_handle_channel_plaintext_data (struct CadetChannel
*ch,
env);
ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
ch->mid_futures >>= 1;
+ if (GNUNET_YES == ch->reliable)
+ send_channel_data_ack (ch);
+ return;
}
- else
+
+ /* check if message ought to be dropped because it is anicent/too
distant/duplicate */
+ mid_min = ntohl (ch->mid_recv.mid);
+ mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
+ mid_msg = ntohl (msg->mid.mid);
+ if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
+ ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
{
- struct CadetOutOfOrderMessage *com;
- int duplicate;
- uint32_t mid_min;
- uint32_t mid_max;
- uint32_t mid_msg;
-
- mid_min = ntohl (ch->mid_recv.mid);
- mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
- mid_msg = ntohl (msg->mid.mid);
- if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
- ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Duplicate ancient or future payload of %u bytes on %s (mid %u)
dropped\n",
- (unsigned int) payload_size,
- GCCH_2s (ch),
- ntohl (msg->mid.mid));
- GNUNET_STATISTICS_update (stats,
- "# duplicate DATA (ancient or future)",
- 1,
- GNUNET_NO);
- GNUNET_MQ_discard (env);
- return;
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Duplicate ancient or future payload of %u bytes on %s (mid %u)
dropped\n",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA (ancient or future)",
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_discard (env);
+ return;
+ }
- com = GNUNET_new (struct CadetOutOfOrderMessage);
- com->mid = msg->mid;
- com->env = env;
- duplicate = GNUNET_NO;
- GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
- is_before,
- &duplicate,
- ccc->head_recv,
- ccc->tail_recv,
- com);
- if (GNUNET_YES == duplicate)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
- (unsigned int) payload_size,
- GCCH_2s (ch),
- ntohl (msg->mid.mid));
- GNUNET_STATISTICS_update (stats,
- "# duplicate DATA",
- 1,
- GNUNET_NO);
- GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
- ccc->tail_recv,
- com);
- GNUNET_MQ_discard (com->env);
- GNUNET_free (com);
- return;
- }
+ /* Insert message into sorted out-of-order queue */
+ com = GNUNET_new (struct CadetOutOfOrderMessage);
+ com->mid = msg->mid;
+ com->env = env;
+ duplicate = GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
+ is_before,
+ &duplicate,
+ ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ if (GNUNET_YES == duplicate)
+ {
+ /* Duplicate within the queue, drop also */
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n",
- (GNUNET_YES == ccc->client_ready)
- ? "out-of-order"
- : "client-not-ready",
+ "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
(unsigned int) payload_size,
GCCH_2s (ch),
- ntohl (msg->mid.mid),
- ntohl (ch->mid_recv.mid));
+ ntohl (msg->mid.mid));
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate DATA",
+ 1,
+ GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
+ ccc->tail_recv,
+ com);
+ GNUNET_MQ_discard (com->env);
+ GNUNET_free (com);
+ if (GNUNET_YES == ch->reliable)
+ send_channel_data_ack (ch);
+ return;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
+ (GNUNET_YES == ccc->client_ready)
+ ? "out-of-order"
+ : "client-not-ready",
+ (unsigned int) payload_size,
+ GCCH_2s (ch),
+ ntohl (ccc->ccn.channel_of_client),
+ ccc,
+ ntohl (msg->mid.mid),
+ ntohl (ch->mid_recv.mid));
+ send_channel_data_ack (ch);
}
@@ -1261,6 +1265,36 @@ retry_transmission (void *cls)
/**
+ * We got an ACK for a message in our queue, remove it from
+ * the queue and tell our client that it can send more.
+ *
+ * @param ch the channel that got the ACK
+ * @param crm the message that got acknowledged
+ */
+static void
+handle_matching_ack (struct CadetChannel *ch,
+ struct CadetReliableMessage *crm)
+{
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ ch->pending_messages--;
+ GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
+ GCCH_2s (ch),
+ (unsigned int) ntohl (crm->data_message->mid.mid),
+ ch->pending_messages);
+ GNUNET_free (crm->data_message);
+ GNUNET_free (crm);
+ send_ack_to_client (ch,
+ (NULL == ch->owner)
+ ? GNUNET_NO
+ : GNUNET_YES);
+}
+
+
+/**
* We got an acknowledgement for payload data for a channel.
* Possibly resume transmissions.
*
@@ -1272,7 +1306,11 @@ GCCH_handle_channel_plaintext_data_ack (struct
CadetChannel *ch,
const struct
GNUNET_CADET_ChannelDataAckMessage *ack)
{
struct CadetReliableMessage *crm;
- int was_head;
+ struct CadetReliableMessage *crmn;
+ int found;
+ uint32_t mid_base;
+ uint64_t mid_mask;
+ unsigned int delta;
GNUNET_break (GNUNET_NO == ch->is_loopback);
if (GNUNET_NO == ch->reliable)
@@ -1281,12 +1319,28 @@ GCCH_handle_channel_plaintext_data_ack (struct
CadetChannel *ch,
GNUNET_break_op (0);
return;
}
+ mid_base = ntohl (ack->mid.mid);
+ mid_mask = GNUNET_htonll (ack->futures);
+ found = GNUNET_NO;
for (crm = ch->head_sent;
NULL != crm;
- crm = crm->next)
+ crm = crmn)
+ {
+ crmn = crm->next;
if (ack->mid.mid == crm->data_message->mid.mid)
- break;
- if (NULL == crm)
+ {
+ handle_matching_ack (ch,
+ crm);
+ continue;
+ }
+ delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
+ if (delta >= 64)
+ continue;
+ if (0 != (mid_mask & (1LLU << delta)))
+ handle_matching_ack (ch,
+ crm);
+ }
+ if (GNUNET_NO == found)
{
/* ACK for message we already dropped, might have been a
duplicate ACK? Ignore. */
@@ -1299,36 +1353,16 @@ GCCH_handle_channel_plaintext_data_ack (struct
CadetChannel *ch,
GNUNET_NO);
return;
}
- was_head = (crm == ch->head_sent);
- GNUNET_CONTAINER_DLL_remove (ch->head_sent,
- ch->tail_sent,
- crm);
- GNUNET_free (crm->data_message);
- GNUNET_free (crm);
- ch->pending_messages--;
- GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
- GCCH_2s (ch),
- (unsigned int) ntohl (ack->mid.mid),
- ch->pending_messages);
- send_ack_to_client (ch,
- (NULL == ch->owner)
- ? GNUNET_NO
- : GNUNET_YES);
- if (was_head)
+ if (NULL != ch->retry_data_task)
{
- if (NULL != ch->retry_data_task)
- {
- GNUNET_SCHEDULER_cancel (ch->retry_data_task);
- ch->retry_data_task = NULL;
- }
- if (NULL != ch->head_sent)
- ch->retry_data_task
- = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
- &retry_transmission,
- ch);
+ GNUNET_SCHEDULER_cancel (ch->retry_data_task);
+ ch->retry_data_task = NULL;
}
+ if (NULL != ch->head_sent)
+ ch->retry_data_task
+ = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
+ &retry_transmission,
+ ch);
}
@@ -1591,10 +1625,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
if (NULL == com)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending on
%s)!\n",
+ "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on
%s-%X(%p)!\n",
GSC_2s (ccc->c),
+ ntohl (client_ccn.channel_of_client),
+ GCCH_2s (ch),
ntohl (ccc->ccn.channel_of_client),
- GCCH_2s (ch));
+ ccc);
return; /* none pending */
}
if (GNUNET_YES == ch->is_loopback)
@@ -1637,11 +1673,12 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
return; /* missing next one in-order */
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got LOCAL ACK, passing payload message to %s-%X on %s\n",
- GSC_2s (ccc->c),
- ntohl (ccc->ccn.channel_of_client),
- GCCH_2s (ch));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n",
+ ntohl (com->mid.mid),
+ GSC_2s (ccc->c),
+ ntohl (ccc->ccn.channel_of_client),
+ GCCH_2s (ch));
/* all good, pass next message to client */
GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
@@ -1663,9 +1700,9 @@ GCCH_handle_local_ack (struct CadetChannel *ch,
urgently waiting for an ACK from us. (As we have an inherent
maximum of 64 bits, and 15 is getting too close for comfort.)
So we should send one now. */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sender on %s likely blocked on flow-control, sending ACK
now.\n",
- GCCH_2s (ch));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sender on %s likely blocked on flow-control, sending ACK now.\n",
+ GCCH_2s (ch));
if (GNUNET_YES == ch->reliable)
send_channel_data_ack (ch);
}
--
To stop receiving notification emails like this one, please contact
address@hidden