[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] branch master updated: working on channel passing
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] branch master updated: working on channel passing data to clients |
Date: |
Fri, 20 Jan 2017 20:39:52 +0100 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new f52fc4b00 working on channel passing data to clients
f52fc4b00 is described below
commit f52fc4b001758430bb911759c755d0f06d3eb693
Author: Christian Grothoff <address@hidden>
AuthorDate: Fri Jan 20 20:39:51 2017 +0100
working on channel passing data to clients
---
src/cadet/cadet.h | 2 +-
src/cadet/cadet_api.c | 8 +-
src/cadet/gnunet-service-cadet-new.c | 2 +-
src/cadet/gnunet-service-cadet-new.h | 1 -
src/cadet/gnunet-service-cadet-new_channel.c | 215 ++++++++++++++++++++++++---
src/cadet/gnunet-service-cadet_local.c | 4 +-
6 files changed, 205 insertions(+), 27 deletions(-)
diff --git a/src/cadet/cadet.h b/src/cadet/cadet.h
index c16fb2917..9d154fb99 100644
--- a/src/cadet/cadet.h
+++ b/src/cadet/cadet.h
@@ -198,7 +198,7 @@ struct GNUNET_CADET_LocalData
/**
* ID of the channel
*/
- struct GNUNET_CADET_ClientChannelNumber id;
+ struct GNUNET_CADET_ClientChannelNumber channel_id;
/**
* Payload follows
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 8f1274d63..5dcf43e46 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -569,7 +569,7 @@ request_data (void *cls)
env = GNUNET_MQ_msg_extra (msg,
th->size,
GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
- msg->id = th->channel->chid;
+ msg->channel_id = th->channel->chid;
osize = th->notify (th->notify_cls,
th->size,
&msg[1]);
@@ -697,7 +697,7 @@ check_local_data (void *cls,
}
ch = retrieve_channel (h,
- message->id);
+ message->channel_id);
if (NULL == ch)
{
GNUNET_break_op (0);
@@ -727,7 +727,7 @@ handle_local_data (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got a data message!\n");
- ch = retrieve_channel (h, message->id);
+ ch = retrieve_channel (h, message->channel_id);
GNUNET_assert (NULL != ch);
payload = (struct GNUNET_MessageHeader *) &message[1];
@@ -735,7 +735,7 @@ handle_local_data (void *cls,
GC_f2s (ntohl (ch->chid.channel_of_client) >=
GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
- ntohl (message->id.channel_of_client));
+ ntohl (message->channel_id.channel_of_client));
type = ntohs (payload->type);
LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type));
diff --git a/src/cadet/gnunet-service-cadet-new.c
b/src/cadet/gnunet-service-cadet-new.c
index 7b4a0e95b..7801708c1 100644
--- a/src/cadet/gnunet-service-cadet-new.c
+++ b/src/cadet/gnunet-service-cadet-new.c
@@ -628,7 +628,7 @@ handle_data (void *cls,
struct CadetChannel *ch;
const struct GNUNET_MessageHeader *payload;
- chid = msg->id;
+ chid = msg->channel_id;
map = get_map_by_chid (c,
chid);
ch = GNUNET_CONTAINER_multihashmap32_get (map,
diff --git a/src/cadet/gnunet-service-cadet-new.h
b/src/cadet/gnunet-service-cadet-new.h
index 9f4667e23..b3bb85d85 100644
--- a/src/cadet/gnunet-service-cadet-new.h
+++ b/src/cadet/gnunet-service-cadet-new.h
@@ -220,7 +220,6 @@ extern unsigned long long ratchet_messages;
extern struct GNUNET_TIME_Relative ratchet_time;
-
/**
* Send a message to a client.
*
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c
b/src/cadet/gnunet-service-cadet-new_channel.c
index 5d2eba618..75ec81992 100644
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ b/src/cadet/gnunet-service-cadet-new_channel.c
@@ -25,14 +25,12 @@
* @author Christian Grothoff
*
* TODO:
- * - handle CREATE_ACK
- * - handle plaintext data
- * - handle plaintext ACK
* - handle destroy
* - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
* - check that '0xFFULL' really is sufficient for flow control!
- * - what about the 'no buffer' option?
- * - what about the 'out-of-order' option?
+ * - revisit handling of 'unreliable' traffic!
+ * - revisit handling of 'out-of-order' option, especially in combination
with/without 'reliable'.
+ * - figure out flow control without ACKs (unreliable traffic!)
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -147,7 +145,8 @@ struct CadetOutOfOrderMessage
struct CadetOutOfOrderMessage *prev;
/**
- * ID of the message (ACK needed to free)
+ * ID of the message (messages up to this point needed
+ * before we give this one to the client).
*/
struct ChannelMessageIdentifier mid;
@@ -311,7 +310,6 @@ struct CadetChannel
};
-
/**
* Get the static string for identification of the channel.
*
@@ -480,8 +478,10 @@ GCCH_channel_local_new (struct CadetClient *owner,
struct CadetChannel *ch;
ch = GNUNET_new (struct CadetChannel);
- ch->max_pending_messages = 32; /* FIXME: allow control via options
- or adjust dynamically... */
+ ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
+ ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
+ ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not
hardcode! */
ch->owner = owner;
ch->lid = owner_id;
ch->port = *port;
@@ -490,9 +490,6 @@ GCCH_channel_local_new (struct CadetClient *owner,
ch->chid = GCT_add_channel (ch->t,
ch);
ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
- ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
- ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
- ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create,
ch);
GNUNET_STATISTICS_update (stats,
@@ -538,8 +535,6 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
struct CadetClient *c;
ch = GNUNET_new (struct CadetChannel);
- ch->max_pending_messages = 32; /* FIXME: allow control via options
- or adjust dynamically... */
ch->port = *port;
ch->t = t;
ch->chid = chid;
@@ -547,6 +542,7 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+ ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not
hardcode! */
GNUNET_STATISTICS_update (stats,
"# channels",
1,
@@ -635,6 +631,27 @@ send_connect_ack (void *cls)
/**
+ * Send a LOCAL ACK to the client to solicit more messages.
+ *
+ * @param ch channel the ack is for
+ * @param c client to send the ACK to
+ */
+static void
+send_ack_to_client (struct CadetChannel *ch,
+ struct CadetClient *c)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalAck *ack;
+
+ env = GNUNET_MQ_msg (ack,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
+ ack->channel_id = ch->lid;
+ GSC_send_to_client (c,
+ env);
+}
+
+
+/**
* A client is bound to the port that we have a channel
* open to. Send the acknowledgement for the connection
* request and establish the link with the client.
@@ -672,6 +689,10 @@ GCCH_bind (struct CadetChannel *ch,
/* notify other peer that we accepted the connection */
ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
ch);
+ /* give client it's initial supply of ACKs */
+ for (unsigned int i=0;i<ch->max_pending_messages;i++)
+ send_ack_to_client (ch,
+ ch->owner);
}
@@ -742,12 +763,75 @@ GCCH_channel_incoming_destroy (struct CadetChannel *ch)
void
GCCH_handle_channel_create_ack (struct CadetChannel *ch)
{
- GNUNET_break (0); // FIXME!
+ switch (ch->state)
+ {
+ case CADET_CHANNEL_NEW:
+ /* this should be impossible */
+ GNUNET_break (0);
+ break;
+ case CADET_CHANNEL_CREATE_SENT:
+ if (NULL == ch->owner)
+ {
+ /* We're not the owner, wrong direction! */
+ GNUNET_break_op (0);
+ return;
+ }
+ ch->state = CADET_CHANNEL_READY;
+ /* On first connect, send client as many ACKs as we allow messages
+ to be buffered! */
+ for (unsigned int i=0;i<ch->max_pending_messages;i++)
+ send_ack_to_client (ch,
+ ch->owner);
+ break;
+ case CADET_CHANNEL_READY:
+ /* duplicate ACK, maybe we retried the CREATE. Ignore. */
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate CREATE_ACKs",
+ 1,
+ GNUNET_NO);
+ break;
+ }
}
/**
- * We got payload data for a channel. Pass it on to the client.
+ * Test if element @a e1 comes before element @a e2.
+ *
+ * TODO: use opportunity to create generic list insertion sort
+ * logic in container!
+ *
+ * @param cls closure, our `struct CadetChannel`
+ * @param e1 an element of to sort
+ * @param e2 another element to sort
+ * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
+ */
+static int
+is_before (void *cls,
+ void *e1,
+ void *e2)
+{
+ struct CadetOutOfOrderMessage *m1 = e1;
+ struct CadetOutOfOrderMessage *m2 = e2;
+ uint32_t v1 = ntohl (m1->mid.mid);
+ uint32_t v2 = ntohl (m2->mid.mid);
+ uint32_t delta;
+
+ delta = v1 - v2;
+ if (delta > (uint32_t) INT_MAX)
+ {
+ /* in overflow range, we can safely assume we wrapped around */
+ return GNUNET_NO;
+ }
+ else
+ {
+ return GNUNET_YES;
+ }
+}
+
+
+/**
+ * We got payload data for a channel. Pass it on to the client
+ * and send an ACK to the other end (once flow control allows it!)
*
* @param ch channel that got data
*/
@@ -755,7 +839,70 @@ void
GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
const struct
GNUNET_CADET_ChannelAppDataMessage *msg)
{
- GNUNET_break (0); // FIXME!
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalData *ld;
+ struct CadetOutOfOrderMessage *com;
+ size_t payload_size;
+
+ payload_size = ntohs (msg->header.size) - sizeof (*msg);
+ env = GNUNET_MQ_msg_extra (ld,
+ payload_size,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
+ ld->channel_id = ch->lid;
+ GNUNET_memcpy (&ld[1],
+ &msg[1],
+ payload_size);
+ if ( (GNUNET_YES == ch->client_ready) &&
+ ( (GNUNET_YES == ch->out_of_order) ||
+ (msg->mid.mid == ch->mid_recv.mid) ) )
+ {
+ GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
+ env);
+ ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
+ ch->mid_futures >>= 1;
+ }
+ else
+ {
+ /* FIXME-SECURITY: if the element is WAY too far ahead,
+ drop it (can't buffer too much!) */
+ com = GNUNET_new (struct CadetOutOfOrderMessage);
+ com->mid = msg->mid;
+ com->env = env;
+ /* sort into list ordered by "is_before" */
+ if ( (NULL == ch->head_recv) ||
+ (GNUNET_YES == is_before (ch,
+ com,
+ ch->head_recv)) )
+ {
+ GNUNET_CONTAINER_DLL_insert (ch->head_recv,
+ ch->tail_recv,
+ com);
+ }
+ else
+ {
+ struct CadetOutOfOrderMessage *pos;
+
+ for (pos = ch->head_recv;
+ NULL != pos;
+ pos = pos->next)
+ {
+ if (GNUNET_YES !=
+ is_before (ch,
+ pos,
+ com))
+ break;
+ }
+ if (NULL == pos)
+ GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
+ ch->tail_recv,
+ com);
+ else
+ GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
+ ch->tail_recv,
+ com,
+ pos->prev);
+ }
+ }
}
@@ -770,7 +917,37 @@ void
GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
const struct
GNUNET_CADET_ChannelDataAckMessage *ack)
{
- GNUNET_break (0); // FIXME!
+ struct CadetReliableMessage *crm;
+
+ if (GNUNET_NO == ch->reliable)
+ {
+ /* not expecting ACKs on unreliable channel, odd */
+ GNUNET_break_op (0);
+ return;
+ }
+ for (crm = ch->head_sent;
+ NULL != crm;
+ crm = crm->next)
+ if (ack->mid.mid == crm->data_message.mid.mid)
+ break;
+ if (NULL == crm)
+ {
+ /* ACK for message we already dropped, might have been a
+ duplicate ACK? Ignore. */
+ GNUNET_STATISTICS_update (stats,
+ "# duplicate CHANNEL_DATA_ACKs",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+ ch->tail_sent,
+ crm);
+ ch->pending_messages--;
+ GNUNET_free (crm);
+ GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
+ send_ack_to_client (ch,
+ (NULL == ch->owner) ? ch->dest : ch->owner);
}
@@ -1026,6 +1203,8 @@ send_client_buffered_data (struct CadetChannel *ch)
GNUNET_CONTAINER_DLL_remove (ch->head_recv,
ch->tail_recv,
com);
+ /* FIXME: if unreliable, this is not aggressive
+ enough, as it would be OK to have lost some! */
ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
ch->mid_futures >>= 1; /* equivalent to division by 2 */
GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
diff --git a/src/cadet/gnunet-service-cadet_local.c
b/src/cadet/gnunet-service-cadet_local.c
index e1f6ac4c3..c476f6ac2 100644
--- a/src/cadet/gnunet-service-cadet_local.c
+++ b/src/cadet/gnunet-service-cadet_local.c
@@ -586,7 +586,7 @@ handle_data (void *cls, struct GNUNET_SERVER_Client *client,
return;
}
- chid = msg->id;
+ chid = msg->channel_id;
LOG (GNUNET_ERROR_TYPE_DEBUG, " %u bytes (%u payload) by client %u\n",
payload_size, payload_claimed_size, c->id);
@@ -1531,7 +1531,7 @@ GML_send_data (struct CadetClient *c,
GNUNET_memcpy (©[1], &msg[1], size);
copy->header.size = htons (sizeof (struct GNUNET_CADET_LocalData) + size);
copy->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
- copy->id = id;
+ copy->channel_id = id;
GNUNET_SERVER_notification_context_unicast (nc, c->handle,
©->header, GNUNET_NO);
}
--
To stop receiving notification emails like this one, please contact
address@hidden
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] [gnunet] branch master updated: working on channel passing data to clients,
gnunet <=