[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30655 - in gnunet/src: multicast psyc
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30655 - in gnunet/src: multicast psyc |
Date: |
Sun, 10 Nov 2013 00:12:27 +0100 |
Author: tg
Date: 2013-11-10 00:12:27 +0100 (Sun, 10 Nov 2013)
New Revision: 30655
Modified:
gnunet/src/multicast/multicast_api.c
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/test_psyc.c
Log:
psyc: handling messages from multicast and passing them to clients;
pause/resume fixes
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2013-11-09 23:12:23 UTC (rev
30654)
+++ gnunet/src/multicast/multicast_api.c 2013-11-09 23:12:27 UTC (rev
30655)
@@ -363,11 +363,12 @@
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
struct GNUNET_MULTICAST_MessageHeader *msg
- = GNUNET_malloc (sizeof (*msg) + buf_size);
+ = GNUNET_malloc (buf_size);
+ buf_size -= sizeof (*msg);
int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
- || buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)
+ || sizeof (*msg) + buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"MasterTransmitNotify() returned error or invalid message size.\n");
@@ -379,15 +380,15 @@
return; /* Transmission paused. */
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
- msg->header.size = htons (buf_size);
+ msg->header.size = htons (sizeof (*msg) + buf_size);
msg->message_id = mh->message_id;
msg->group_generation = mh->group_generation;
/* FIXME: add fragment ID and signature in the service instead of here */
msg->fragment_id = orig->next_fragment_id++;
msg->fragment_offset = mh->fragment_offset;
- mh->fragment_offset += buf_size;
- msg->purpose.size = htonl (buf_size
+ mh->fragment_offset += sizeof (*msg) + buf_size;
+ msg->purpose.size = htonl (sizeof (*msg) + buf_size
- sizeof (msg->header)
- sizeof (msg->hop_counter)
- sizeof (msg->signature));
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2013-11-09 23:12:23 UTC (rev
30654)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2013-11-09 23:12:27 UTC (rev
30655)
@@ -56,7 +56,7 @@
static struct GNUNET_PSYCSTORE_Handle *store;
/**
- * channel's pub_key_hash -> struct Channel
+ * Channel's pub_key_hash -> struct Channel
*/
static struct GNUNET_CONTAINER_MultiHashMap *clients;
@@ -70,6 +70,9 @@
char *buf;
uint16_t size;
+ /**
+ * enum GNUNET_PSYC_DataStatus
+ */
uint8_t status;
};
@@ -83,15 +86,17 @@
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
- char *tmit_buf;
GNUNET_SCHEDULER_TaskIdentifier tmit_task;
uint32_t tmit_mod_count;
uint32_t tmit_mod_recvd;
- uint16_t tmit_size;
+ /**
+ * enum GNUNET_PSYC_DataStatus
+ */
uint8_t tmit_status;
uint8_t in_transmit;
uint8_t is_master;
+ uint8_t disconnected;
};
/**
@@ -142,6 +147,10 @@
};
+static void
+transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay);
+
+
/**
* Task run during shutdown.
*
@@ -163,6 +172,30 @@
}
}
+
+static void
+client_cleanup (struct Channel *ch)
+{
+ if (ch->is_master)
+ {
+ struct Master *mst = (struct Master *) ch;
+ if (NULL != mst->origin)
+ GNUNET_MULTICAST_origin_stop (mst->origin);
+ }
+ else
+ {
+ struct Slave *slv = (struct Slave *) ch;
+ if (NULL != slv->join_req)
+ GNUNET_free (slv->join_req);
+ if (NULL != slv->relays)
+ GNUNET_free (slv->relays);
+ if (NULL != slv->member)
+ GNUNET_MULTICAST_member_part (slv->member);
+ }
+
+ GNUNET_free (ch);
+}
+
/**
* Called whenever a client is disconnected.
* Frees our resources associated with that client.
@@ -188,30 +221,17 @@
return;
}
- if (NULL != ch->tmit_buf)
- {
- GNUNET_free (ch->tmit_buf);
- ch->tmit_buf = NULL;
- }
+ ch->disconnected = GNUNET_YES;
- if (ch->is_master)
+ /* Send pending messages to multicast before cleanup. */
+ if (NULL != ch->tmit_head)
{
- struct Master *mst = (struct Master *) ch;
- if (NULL != mst->origin)
- GNUNET_MULTICAST_origin_stop (mst->origin);
+ transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
}
else
{
- struct Slave *slv = (struct Slave *) ch;
- if (NULL != slv->join_req)
- GNUNET_free (slv->join_req);
- if (NULL != slv->relays)
- GNUNET_free (slv->relays);
- if (NULL != slv->member)
- GNUNET_MULTICAST_member_part (slv->member);
+ client_cleanup (ch);
}
-
- GNUNET_free (ch);
}
void
@@ -259,14 +279,98 @@
}
+
void
+fragment_store_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "fragment_store() returned %l (%s)\n", result, err_msg);
+}
+
+/**
+ * Send PSYC messages in an incoming multicast message to a client.
+ */
+int
+send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void
*chan)
+{
+ const struct GNUNET_MULTICAST_MessageHeader *msg = cls;
+ struct Channel *ch = chan;
+
+ uint16_t size = ntohs (msg->header.size);
+ uint16_t pos = 0;
+
+ while (sizeof (*msg) + pos < size)
+ {
+ const struct GNUNET_MessageHeader *pmsg
+ = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+ uint16_t psize = ntohs (pmsg->size);
+ if (sizeof (*msg) + pos + psize > size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Ignoring message of type %u with invalid size. "
+ "(%u + %u + %u > %u)\n", ntohs (pmsg->type),
+ sizeof (*msg), pos, psize, size);
+ break;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message of type %u and size %u to client.\n",
+ ntohs (pmsg->type), psize);
+
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg,
+ GNUNET_NO);
+ pos += psize;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Incoming message fragment from multicast.
+ *
+ * Store it using PSYCstore and send it to all clients of the channel.
+ */
+void
message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
{
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from multicast.\n",
- ntohs (msg->type));
+ "Received message of type %u and size %u from multicast.\n",
+ type, size);
+
+ struct Channel *ch = cls;
+ struct Master *mst = cls;
+ struct Slave *slv = cls;
+
+ struct GNUNET_CRYPTO_EddsaPublicKey *ch_key
+ = ch->is_master ? &mst->pub_key : &slv->chan_key;
+ struct GNUNET_HashCode *ch_key_hash
+ = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
+
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+ GNUNET_PSYCSTORE_fragment_store (store, ch_key,
+ (const struct
+ GNUNET_MULTICAST_MessageHeader *) msg,
+ 0, NULL, NULL);
+ GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash,
+ send_to_client, (void *) msg);
+ break;
+
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Ignoring unknown message of type %u and size %u.\n",
+ type, size);
+ }
}
+
+/**
+ * Response from PSYCstore with the current counter values for a channel
master.
+ */
void
master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
uint64_t max_message_id, uint64_t max_group_generation,
@@ -299,6 +403,9 @@
}
+/**
+ * Response from PSYCstore with the current counter values for a channel slave.
+ */
void
slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
uint64_t max_message_id, uint64_t max_group_generation,
@@ -332,6 +439,9 @@
}
+/**
+ * Handle a connecting client starting a channel master.
+ */
static void
handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
@@ -357,6 +467,9 @@
}
+/**
+ * Handle a connecting client joining as a channel slave.
+ */
static void
handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
@@ -389,14 +502,27 @@
}
+/**
+ * Send transmission acknowledgement to a client.
+ *
+ * Sent after the last GNUNET_PSYC_MessageModifier and after each
+ * GNUNET_PSYC_MessageData.
+ *
+ * @param ch The channel struct for the client.
+ */
static void
send_transmit_ack (struct Channel *ch)
{
struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
res->header.size = htons (sizeof (*res));
res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
- res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
+ res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+ struct TransmitMessage *tmit_msg = ch->tmit_tail;
+ if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status)
+ res->buf_avail -= tmit_msg->size;
+ res->buf_avail = htons (res->buf_avail);
+
GNUNET_SERVER_notification_context_add (nc, ch->client);
GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
GNUNET_NO);
@@ -404,30 +530,53 @@
}
+/**
+ * Callback for the transmit functions of multicast.
+ */
static int
transmit_notify (void *cls, size_t *data_size, void *data)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
struct Channel *ch = cls;
struct TransmitMessage *msg = ch->tmit_head;
- if (NULL == msg || *data_size < ntohs (msg->size))
+ if (NULL == msg || *data_size < msg->size)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to
send.\n");
*data_size = 0;
return GNUNET_NO;
}
- *data_size = ntohs (msg->size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "transmit_notify: sending %u bytes.\n", msg->size);
+
+ *data_size = msg->size;
memcpy (data, msg->buf, *data_size);
- GNUNET_free (ch->tmit_buf);
- ch->tmit_buf = NULL;
GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
+ GNUNET_free (msg);
- return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
+ int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
+
+ if (0 == ch->tmit_task)
+ {
+ if (NULL != ch->tmit_head)
+ {
+ transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
+ }
+ else if (ch->disconnected)
+ {
+ /* FIXME: handle partial message (when still in_transmit) */
+ client_cleanup (ch);
+ }
+ }
+
+ return ret;
}
+/**
+ * Transmit a message from a channel master to the multicast group.
+ */
static void
master_transmit_message (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -449,6 +598,9 @@
}
+/**
+ * Transmit a message from a channel slave to the multicast group.
+ */
static void
slave_transmit_message (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -468,50 +620,90 @@
}
+/**
+ * Schedule message transmission from a channel to the multicast group.
+ *
+ * @param ch The channel.
+ * @param delay Transmission delay.
+ */
+static void
+transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
+{
+ if (0 != ch->tmit_task)
+ GNUNET_SCHEDULER_cancel (ch->tmit_task);
+
+ ch->tmit_task
+ = ch->is_master
+ ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
+ : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
+}
+
+/**
+ * Queue incoming message parts from a client for transmission, and send them
to
+ * the multicast group when the buffer is full or reached the end of message.
+ *
+ * @param ch Channel struct for the client.
+ * @param msg Message from the client.
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
+ */
static int
-buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
+queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
{
uint16_t size = ntohs (msg->size);
struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
+ struct TransmitMessage *tmit_msg = ch->tmit_tail;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing message of type %u and size %u "
+ "for transmission to multicast.\n",
+ ntohs (msg->type), size);
+
if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
return GNUNET_SYSERR;
- if (0 == ch->tmit_size)
+ if (NULL == tmit_msg
+ || tmit_msg->status != GNUNET_PSYC_DATA_CONT
+ || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size)
{
- ch->tmit_buf = GNUNET_malloc (size);
- memcpy (ch->tmit_buf, msg, size);
- ch->tmit_size = size;
- }
- else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
- {
- ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
- memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
- ch->tmit_size += size;
- }
-
- if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
- < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
- {
- struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
- tmit_msg->buf = (char *) msg;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Appending message qto new buffer.\n");
+ /* Start filling up new buffer */
+ tmit_msg = GNUNET_new (struct TransmitMessage);
+ tmit_msg->buf = GNUNET_malloc (size);
+ memcpy (tmit_msg->buf, msg, size);
tmit_msg->size = size;
tmit_msg->status = ch->tmit_status;
GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
- tmit_delay = GNUNET_TIME_UNIT_ZERO;
}
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Appending message to existing buffer.\n");
+ /* Append to existing buffer */
+ tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
+ memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
+ tmit_msg->size += size;
+ tmit_msg->status = ch->tmit_status;
+ }
- if (0 != ch->tmit_task)
- GNUNET_SCHEDULER_cancel (ch->tmit_task);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
- ch->tmit_task
- = ch->is_master
- ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
- : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
+ /* Wait a bit for the remaining message parts from the client
+ if there's still some space left in the buffer. */
+ if (GNUNET_PSYC_DATA_CONT == tmit_msg->status
+ && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData)
+ < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE))
+ tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
+ transmit_message (ch, tmit_delay);
+
return GNUNET_OK;
}
+/**
+ * Incoming method from a client.
+ */
static void
handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
@@ -524,18 +716,16 @@
if (GNUNET_NO != ch->in_transmit)
{
- // FIXME: already transmitting a message, send back error message.
+ /* FIXME: already transmitting a message, send back error message. */
return;
}
ch->in_transmit = GNUNET_YES;
- ch->tmit_buf = NULL;
- ch->tmit_size = 0;
ch->tmit_mod_recvd = 0;
ch->tmit_mod_count = ntohl (meth->mod_count);
ch->tmit_status = GNUNET_PSYC_DATA_CONT;
- buffer_message (ch, msg);
+ queue_message (ch, msg);
if (0 == ch->tmit_mod_count)
send_transmit_ack (ch);
@@ -544,18 +734,23 @@
};
+/**
+ * Incoming modifier from a client.
+ */
static void
handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
+ /*
const struct GNUNET_PSYC_MessageModifier *mod
= (const struct GNUNET_PSYC_MessageModifier *) msg;
+ */
struct Channel *ch
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
ch->tmit_mod_recvd++;
- buffer_message (ch, msg);
+ queue_message (ch, msg);
if (ch->tmit_mod_recvd == ch->tmit_mod_count)
send_transmit_ack (ch);
@@ -564,6 +759,9 @@
};
+/**
+ * Incoming data from a client.
+ */
static void
handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
@@ -575,7 +773,7 @@
GNUNET_assert (NULL != ch);
ch->tmit_status = ntohs (data->status);
- buffer_message (ch, msg);
+ queue_message (ch, msg);
send_transmit_ack (ch);
if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2013-11-09 23:12:23 UTC (rev 30654)
+++ gnunet/src/psyc/psyc_api.c 2013-11-09 23:12:27 UTC (rev 30655)
@@ -69,12 +69,12 @@
/**
* Head of operations to transmit.
*/
- struct OperationHandle *transmit_head;
+ struct OperationHandle *tmit_head;
/**
* Tail of operations to transmit.
*/
- struct OperationHandle *transmit_tail;
+ struct OperationHandle *tmit_tail;
/**
* Message to send on reconnect.
@@ -116,6 +116,16 @@
* Buffer space available for transmitting the next data fragment.
*/
uint16_t tmit_buf_avail;
+
+ /**
+ * Is transmission paused?
+ */
+ uint8_t tmit_paused;
+
+ /**
+ * Are we still waiting for a PSYC_TRANSMIT_ACK?
+ */
+ uint8_t tmit_ack_pending;
};
@@ -243,6 +253,11 @@
transmit_next (struct GNUNET_PSYC_Channel *ch);
+/**
+ * Request data from client to transmit.
+ *
+ * @param mst Master handle.
+ */
static void
master_transmit_data (struct GNUNET_PSYC_Master *mst)
{
@@ -268,12 +283,13 @@
default:
mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
data_size = 0;
- LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n");
+ LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n");
}
if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
{
/* Transmission paused, nothing to send. */
+ ch->tmit_paused = GNUNET_YES;
GNUNET_free (op);
}
else
@@ -281,7 +297,8 @@
GNUNET_assert (data_size <= ch->tmit_buf_avail);
pdata->header.size = htons (sizeof (*pdata) + data_size);
pdata->status = htons (mst->tmit->status);
- GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail,
op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+ ch->tmit_ack_pending = GNUNET_YES;
transmit_next (ch);
}
}
@@ -305,7 +322,6 @@
struct CountersResult *cres;
struct TransmitAck *tack;
-
if (NULL == msg)
{
reschedule_connect (ch);
@@ -317,7 +333,8 @@
uint16_t type = ntohs (msg->type);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %d from PSYC service\n", type);
+ "Received message of type %d and size %u from PSYC service\n",
+ type, size);
switch (type)
{
@@ -328,10 +345,16 @@
case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
size_eq = sizeof (struct TransmitAck);
break;
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ size_min = sizeof (struct GNUNET_PSYC_MessageData);
}
if (! ((0 < size_eq && size == size_eq)
- || (0 < size_min && size >= size_min)))
+ || (0 < size_min && size_min <= size)))
{
GNUNET_break (0);
reschedule_connect (ch);
@@ -370,7 +393,9 @@
else
{
ch->tmit_buf_avail = ntohs (tack->buf_avail);
- master_transmit_data (mst);
+ ch->tmit_ack_pending = GNUNET_NO;
+ if (GNUNET_NO == ch->tmit_paused)
+ master_transmit_data (mst);
}
}
else
@@ -378,6 +403,18 @@
/* TODO: slave */
}
break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+
+ break;
}
GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
@@ -397,9 +434,9 @@
send_next_message (void *cls, size_t size, void *buf)
{
struct GNUNET_PSYC_Channel *ch = cls;
- struct OperationHandle *op = ch->transmit_head;
+ struct OperationHandle *op = ch->tmit_head;
size_t ret;
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
ch->th = NULL;
if (NULL == op->msg)
return 0;
@@ -409,15 +446,12 @@
reschedule_connect (ch);
return 0;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending message of type %d to PSYC service\n",
- ntohs (op->msg->type));
memcpy (buf, op->msg, ret);
- GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op);
GNUNET_free (op);
- if (NULL != ch->transmit_head)
+ if (NULL != ch->tmit_head)
transmit_next (ch);
if (GNUNET_NO == ch->in_receive)
@@ -438,10 +472,11 @@
static void
transmit_next (struct GNUNET_PSYC_Channel *ch)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
if (NULL != ch->th || NULL == ch->client)
return;
- struct OperationHandle *op = ch->transmit_head;
+ struct OperationHandle *op = ch->tmit_head;
if (NULL == op)
return;
@@ -472,14 +507,14 @@
ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
GNUNET_assert (NULL != ch->client);
- if (NULL == ch->transmit_head ||
- ch->transmit_head->msg->type != ch->reconnect_msg->type)
+ if (NULL == ch->tmit_head ||
+ ch->tmit_head->msg->type != ch->reconnect_msg->type)
{
uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
memcpy (&op[1], ch->reconnect_msg, reconn_size);
op->msg = (struct GNUNET_MessageHeader *) &op[1];
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
}
transmit_next (ch);
}
@@ -496,7 +531,7 @@
struct GNUNET_PSYC_Channel *ch = c;
GNUNET_assert (NULL != ch);
- if (ch->transmit_head != ch->transmit_tail)
+ if (ch->tmit_head != ch->tmit_tail)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"Disconnecting while there are still outstanding messages!\n");
@@ -654,7 +689,7 @@
memcpy (&pmod[1], mod->name, name_size);
memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
return GNUNET_YES;
}
@@ -699,7 +734,7 @@
pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count
(env));
memcpy (&pmeth[1], method_name, size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
GNUNET_ENV_environment_iterate (env, send_modifier, master);
transmit_next (ch);
@@ -720,7 +755,12 @@
void
GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*th)
{
- master_transmit_data (th->master);
+ struct GNUNET_PSYC_Channel *ch = &th->master->ch;
+ if (GNUNET_NO == ch->tmit_ack_pending)
+ {
+ ch->tmit_paused = GNUNET_NO;
+ master_transmit_data (th->master);
+ }
}
@@ -938,8 +978,8 @@
slvadd->header.size = htons (sizeof (*slvadd));
slvadd->announced_at = GNUNET_htonll (announced_at);
slvadd->effective_since = GNUNET_htonll (effective_since);
- GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head,
- channel->transmit_tail,
+ GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
+ channel->tmit_tail,
op);
transmit_next (channel);
}
@@ -979,8 +1019,8 @@
slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
slvrm->header.size = htons (sizeof (*slvrm));
slvrm->announced_at = GNUNET_htonll (announced_at);
- GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head,
- channel->transmit_tail,
+ GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
+ channel->tmit_tail,
op);
transmit_next (channel);
}
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2013-11-09 23:12:23 UTC (rev 30654)
+++ gnunet/src/psyc/test_psyc.c 2013-11-09 23:12:27 UTC (rev 30655)
@@ -144,10 +144,12 @@
return GNUNET_OK;
}
+
struct TransmitClosure
{
struct GNUNET_PSYC_MasterTransmitHandle *handle;
uint8_t n;
+ uint8_t paused;
uint8_t fragment_count;
char *fragments[16];
uint16_t fragment_sizes[16];
@@ -157,8 +159,9 @@
static void
transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
struct TransmitClosure *tmit = cls;
+ tmit->paused = GNUNET_NO;
GNUNET_PSYC_master_transmit_resume (tmit->handle);
}
@@ -167,33 +170,36 @@
transmit_notify (void *cls, size_t *data_size, void *data)
{
struct TransmitClosure *tmit = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Transmit notify: %lu bytes\n", *data_size);
-
- if (tmit->fragment_count <= tmit->n)
- return GNUNET_YES;
-
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmit notify: %lu bytes available, "
+ "processing fragment %u/%u.\n",
+ *data_size, tmit->n + 1, tmit->fragment_count);
GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
- *data_size = tmit->fragment_sizes[tmit->n];
- memcpy (data, tmit->fragments[tmit->n], *data_size);
- tmit->n++;
-
- if (tmit->n == tmit->fragment_count - 1)
+ if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1)
{
/* Send last fragment later. */
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume,
- tmit);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+ tmit->paused = GNUNET_YES;
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 3),
+ &transmit_resume, tmit);
*data_size = 0;
return GNUNET_NO;
}
- return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
+
+ GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
+ *data_size = tmit->fragment_sizes[tmit->n];
+ memcpy (data, tmit->fragments[tmit->n], *data_size);
+
+ return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
}
void
master_started (void *cls, uint64_t max_message_id)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n",
max_message_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Master started: %lu\n", max_message_id);
struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
@@ -202,11 +208,13 @@
"_foo_bar", "foo bar baz", 11);
struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
- tmit->fragment_count = 2;
- tmit->fragments[0] = "foo bar";
- tmit->fragment_sizes[0] = 7;
- tmit->fragments[1] = "baz!";
- tmit->fragment_sizes[1] = 4;
+ tmit->fragment_count = 3;
+ tmit->fragments[0] = "foo";
+ tmit->fragment_sizes[0] = 4;
+ tmit->fragments[1] = "foo bar";
+ tmit->fragment_sizes[1] = 7;
+ tmit->fragments[2] = "foo bar baz";
+ tmit->fragment_sizes[2] = 11;
tmit->handle
= GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit,
GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30655 - in gnunet/src: multicast psyc,
gnunet <=