[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r36380 - in gnunet/src: include multicast psyc psycstore so
From: |
gnunet |
Subject: |
[GNUnet-SVN] r36380 - in gnunet/src: include multicast psyc psycstore social util |
Date: |
Sun, 27 Sep 2015 23:04:34 +0200 |
Author: tg
Date: 2015-09-27 23:04:34 +0200 (Sun, 27 Sep 2015)
New Revision: 36380
Modified:
gnunet/src/include/gnunet_protocols.h
gnunet/src/multicast/gnunet-service-multicast.c
gnunet/src/multicast/multicast_api.c
gnunet/src/multicast/test_multicast.c
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/test_psyc.c
gnunet/src/psycstore/psyc_util_lib.c
gnunet/src/social/social_api.c
gnunet/src/util/client_manager.c
Log:
multicast/psyc/social: message acks & scheduling
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2015-09-27 21:04:23 UTC (rev
36379)
+++ gnunet/src/include/gnunet_protocols.h 2015-09-27 21:04:34 UTC (rev
36380)
@@ -2439,19 +2439,24 @@
#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758
/**
+ * C->S: Acknowledgement of a message or request fragment for the client.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK 759
+
+/**
* C<->S<->T: Replay request from a group member to another member.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 760
/**
* C<->S<->T: Replay response from a group member to another member.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 763
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 761
/**
* C<->S: End of replay response.
*/
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 764
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 762
Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c 2015-09-27 21:04:23 UTC
(rev 36379)
+++ gnunet/src/multicast/gnunet-service-multicast.c 2015-09-27 21:04:34 UTC
(rev 36380)
@@ -181,6 +181,11 @@
int8_t join_status;
/**
+ * Number of messages waiting to be sent to CADET.
+ */
+ uint8_t msgs_pending;
+
+ /**
* Channel direction.
* @see enum ChannelDirection
*/
@@ -619,8 +624,10 @@
/**
* Send message to all origin and member clients connected to the group.
*
- * @param grp The group to send @a msg to.
- * @param msg Message to send.
+ * @param pub_key_hash
+ * H(key_pub) of the group.
+ * @param msg
+ * Message to send.
*/
static int
client_send_all (struct GNUNET_HashCode *pub_key_hash,
@@ -660,8 +667,10 @@
/**
* Send message to all origin clients connected to the group.
*
- * @param grp The group to send @a msg to.
- * @param msg Message to send.
+ * @param pub_key_hash
+ * H(key_pub) of the group.
+ * @param msg
+ * Message to send.
*/
static int
client_send_origin (struct GNUNET_HashCode *pub_key_hash,
@@ -676,6 +685,33 @@
/**
+ * Send fragment acknowledgement to all clients of the channel.
+ *
+ * @param pub_key_hash
+ * H(key_pub) of the group.
+ */
+static void
+client_send_ack (struct GNUNET_HashCode *pub_key_hash)
+{
+ static struct GNUNET_MessageHeader *msg = NULL;
+ if (NULL == msg)
+ {
+ msg = GNUNET_malloc (sizeof (*msg));
+ msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
+ msg->size = htons (sizeof (*msg));
+ }
+ client_send_all (pub_key_hash, msg);
+}
+
+
+struct CadetTransmitClosure
+{
+ struct Channel *chn;
+ const struct GNUNET_MessageHeader *msg;
+};
+
+
+/**
* CADET is ready to transmit a message.
*/
size_t
@@ -686,10 +722,21 @@
/* FIXME: connection closed */
return 0;
}
- const struct GNUNET_MessageHeader *msg = cls;
- uint16_t msg_size = ntohs (msg->size);
+ struct CadetTransmitClosure *tcls = cls;
+ struct Channel *chn = tcls->chn;
+ uint16_t msg_size = ntohs (tcls->msg->size);
GNUNET_assert (msg_size <= buf_size);
- memcpy (buf, msg, msg_size);
+ memcpy (buf, tcls->msg, msg_size);
+ GNUNET_free (tcls);
+
+ if (0 == chn->msgs_pending)
+ {
+ GNUNET_break (0);
+ }
+ else if (0 == --chn->msgs_pending)
+ {
+ client_send_ack (&chn->group_key_hash);
+ }
return msg_size;
}
@@ -703,6 +750,11 @@
static void
cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader
*msg)
{
+ struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
+ tcls->chn = chn;
+ tcls->msg = msg;
+
+ chn->msgs_pending++;
chn->tmit_handle
= GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1132,7 +1184,10 @@
}
client_send_all (&grp->pub_key_hash, &out->header);
- cadet_send_children (&grp->pub_key_hash, &out->header);
+ if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
+ {
+ client_send_ack (&grp->pub_key_hash);
+ }
GNUNET_free (out);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1174,11 +1229,13 @@
GNUNET_assert (0);
}
+ uint8_t send_ack = GNUNET_YES;
if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
{ /* No local origins, send to remote origin */
if (NULL != mem->origin_channel)
{
cadet_send_channel (mem->origin_channel, &out->header);
+ send_ack = GNUNET_NO;
}
else
{
@@ -1188,6 +1245,10 @@
return;
}
}
+ if (GNUNET_YES == send_ack)
+ {
+ client_send_ack (&grp->pub_key_hash);
+ }
GNUNET_free (out);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2015-09-27 21:04:23 UTC (rev
36379)
+++ gnunet/src/multicast/multicast_api.c 2015-09-27 21:04:34 UTC (rev
36380)
@@ -102,6 +102,11 @@
uint8_t in_transmit;
/**
+ * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+ */
+ uint8_t acks_pending;
+
+ /**
* Is this the origin or a member?
*/
uint8_t is_origin;
@@ -185,6 +190,13 @@
};
+static void
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
+
+static void
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
+
+
/**
* Send first message to the service after connecting.
*/
@@ -274,6 +286,38 @@
/**
+ * Receive message/request fragment acknowledgement from service.
+ */
+static void
+group_recv_fragment_ack (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+ grp, grp->in_transmit, grp->acks_pending);
+
+ if (0 == grp->acks_pending)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Ignoring extraneous fragment ACK.\n", grp);
+ return;
+ }
+ grp->acks_pending--;
+
+ if (GNUNET_YES != grp->in_transmit)
+ return;
+
+ if (GNUNET_YES == grp->is_origin)
+ origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+ else
+ member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+}
+
+/**
* Origin receives uniquest request from a member.
*/
static void
@@ -447,6 +491,10 @@
GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
+ { group_recv_fragment_ack, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
{ group_recv_join_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -470,6 +518,10 @@
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
+ { group_recv_fragment_ack, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
{ group_recv_join_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -577,6 +629,7 @@
memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+ GNUNET_free (hdcsn);
GNUNET_free (join);
return NULL;
}
@@ -774,9 +827,10 @@
static void
origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+ GNUNET_assert (GNUNET_YES == grp->in_transmit);
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
@@ -786,7 +840,8 @@
|| GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
- "OriginTransmitNotify() returned error or invalid message size.\n");
+ "%p OriginTransmitNotify() returned error or invalid message size.\n",
+ orig);
/* FIXME: handle error */
GNUNET_free (msg);
return;
@@ -794,6 +849,8 @@
if (GNUNET_NO == ret && 0 == buf_size)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p OriginTransmitNotify() - transmission paused.\n", orig);
GNUNET_free (msg);
return; /* Transmission paused. */
}
@@ -805,7 +862,12 @@
msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*msg) + buf_size;
+ grp->acks_pending++;
GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
+ GNUNET_free (msg);
+
+ if (GNUNET_YES == ret)
+ grp->in_transmit = GNUNET_NO;
}
@@ -834,11 +896,10 @@
GNUNET_MULTICAST_OriginTransmitNotify notify,
void *notify_cls)
{
-/* FIXME
- if (GNUNET_YES == orig->grp.in_transmit)
+ struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+ if (GNUNET_YES == grp->in_transmit)
return NULL;
- orig->grp.in_transmit = GNUNET_YES;
-*/
+ grp->in_transmit = GNUNET_YES;
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
tmit->origin = orig;
@@ -861,6 +922,9 @@
void
GNUNET_MULTICAST_origin_to_all_resume (struct
GNUNET_MULTICAST_OriginTransmitHandle *th)
{
+ struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+ if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+ return;
origin_to_all (th->origin);
}
@@ -874,6 +938,7 @@
void
GNUNET_MULTICAST_origin_to_all_cancel (struct
GNUNET_MULTICAST_OriginTransmitHandle *th)
{
+ th->origin->grp.in_transmit = GNUNET_NO;
}
@@ -1094,6 +1159,7 @@
LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+ GNUNET_assert (GNUNET_YES == grp->in_transmit);
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
@@ -1124,6 +1190,10 @@
tmit->fragment_offset += sizeof (*req) + buf_size;
GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
+ GNUNET_free (req);
+
+ if (GNUNET_YES == ret)
+ grp->in_transmit = GNUNET_NO;
}
@@ -1147,11 +1217,9 @@
GNUNET_MULTICAST_MemberTransmitNotify
notify,
void *notify_cls)
{
-/* FIXME
if (GNUNET_YES == mem->grp.in_transmit)
return NULL;
mem->grp.in_transmit = GNUNET_YES;
-*/
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
tmit->member = mem;
@@ -1173,6 +1241,9 @@
void
GNUNET_MULTICAST_member_to_origin_resume (struct
GNUNET_MULTICAST_MemberTransmitHandle *th)
{
+ struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+ if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+ return;
member_to_origin (th->member);
}
@@ -1186,6 +1257,7 @@
void
GNUNET_MULTICAST_member_to_origin_cancel (struct
GNUNET_MULTICAST_MemberTransmitHandle *th)
{
+ th->member->grp.in_transmit = GNUNET_NO;
}
Modified: gnunet/src/multicast/test_multicast.c
===================================================================
--- gnunet/src/multicast/test_multicast.c 2015-09-27 21:04:23 UTC (rev
36379)
+++ gnunet/src/multicast/test_multicast.c 2015-09-27 21:04:34 UTC (rev
36380)
@@ -183,7 +183,7 @@
struct TransmitClosure *tmit = cls;
if (NULL != tmit->orig_tmit)
GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit);
- else
+ else if (NULL != tmit->mem_tmit)
GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit);
}
@@ -453,7 +453,7 @@
*tmit = (struct TransmitClosure) {};
tmit->data[0] = "abc def";
tmit->data[1] = "ghi jkl mno";
- tmit->data_delay[1] = 1;
+ tmit->data_delay[1] = 2;
tmit->data[2] = "pqr stuw xyz";
tmit->data_count = 3;
@@ -460,7 +460,8 @@
origin_cls.n = 0;
origin_cls.msgs_expected = 1;
- GNUNET_MULTICAST_member_to_origin (member, 1, tmit_notify, tmit);
+ tmit->mem_tmit = GNUNET_MULTICAST_member_to_origin (member, 1,
+ tmit_notify, tmit);
}
@@ -533,15 +534,19 @@
struct TransmitClosure *tmit = &tmit_cls;
*tmit = (struct TransmitClosure) {};
tmit->data[0] = "ABC DEF";
- tmit->data[1] = "GHI JKL MNO";
- tmit->data_delay[1] = 1;
- tmit->data[2] = "PQR STUW XYZ";
- tmit->data_count = 3;
+ tmit->data[1] = GNUNET_malloc (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + 1);
+ for (uint16_t i = 0; i < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; i++)
+ tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
+ tmit->data[2] = "GHI JKL MNO";
+ tmit->data_delay[2] = 2;
+ tmit->data[3] = "PQR STUW XYZ";
+ tmit->data_count = 4;
origin_cls.n = member_cls.n = 0;
- origin_cls.msgs_expected = member_cls.msgs_expected = 1;
+ origin_cls.msgs_expected = member_cls.msgs_expected = tmit->data_count;
- GNUNET_MULTICAST_origin_to_all (origin, 1, 1, tmit_notify, tmit);
+ tmit->orig_tmit = GNUNET_MULTICAST_origin_to_all (origin, 1, 1,
+ tmit_notify, tmit);
}
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2015-09-27 21:04:23 UTC (rev
36379)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2015-09-27 21:04:34 UTC (rev
36380)
@@ -107,17 +107,6 @@
*/
uint16_t last_ptype;
- /**
- * @see enum MessageState
- */
- uint8_t state;
-
- /**
- * Whether a message ACK has already been sent to the client.
- * #GNUNET_YES or #GNUNET_NO
- */
- uint8_t ack_sent;
-
/* Followed by message */
};
@@ -281,11 +270,6 @@
uint32_t tmit_mod_value_size;
/**
- * @see enum MessageState
- */
- uint8_t tmit_state;
-
- /**
* Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
*/
uint8_t is_master;
@@ -438,6 +422,15 @@
message_queue_drop (struct Channel *chn);
+static void
+schedule_transmit_message (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Channel *chn = cls;
+ transmit_message (chn);
+}
+
+
/**
* Task run during shutdown.
*
@@ -1145,8 +1138,8 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Header of message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->header_size);
+ chn, GNUNET_ntohll (mmsg->message_id),
+ frag_offset, fragq->header_size);
}
}
@@ -1159,8 +1152,8 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->size);
+ chn, GNUNET_ntohll (mmsg->message_id),
+ frag_offset, fragq->size);
break;
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
@@ -1486,17 +1479,26 @@
uint16_t size = ntohs (mmsg->header.size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received multicast message of size %u.\n",
- chn, size);
+ "%p Received multicast message of size %u. "
+ "fragment_id=%" PRIu64 ", message_id=%" PRIu64
+ ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
+ chn, size,
+ GNUNET_ntohll (mmsg->fragment_id),
+ GNUNET_ntohll (mmsg->message_id),
+ GNUNET_ntohll (mmsg->fragment_offset),
+ GNUNET_ntohll (mmsg->flags));
GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
&store_recv_fragment_store_result, chn);
uint16_t first_ptype = 0, last_ptype = 0;
- if (GNUNET_SYSERR
- == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
- (const char *) &mmsg[1],
- &first_ptype, &last_ptype))
+ int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
+ (const char *) &mmsg[1],
+ &first_ptype, &last_ptype);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Message check result %d, first part type %u, last part type
%u\n",
+ chn, check, first_ptype, last_ptype);
+ if (GNUNET_SYSERR == check)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Dropping incoming multicast message with invalid parts.\n",
@@ -1505,10 +1507,6 @@
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message parts: first: type %u, last: type %u\n",
- first_ptype, last_ptype);
-
fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
message_queue_run (chn);
}
@@ -1965,6 +1963,8 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p transmit_notify: nothing to send.\n", chn);
+ if (NULL != tmit_msg && *data_size < tmit_msg->size)
+ GNUNET_break (0);
*data_size = 0;
return GNUNET_NO;
}
@@ -1975,9 +1975,13 @@
*data_size = tmit_msg->size;
memcpy (data, &tmit_msg[1], *data_size);
- int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
+ int ret
+ = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+ ? GNUNET_NO
+ : GNUNET_YES;
- if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
+ /* FIXME: handle disconnecting clients */
+ if (NULL != tmit_msg->client)
send_message_ack (chn, tmit_msg->client);
GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
@@ -1985,7 +1989,7 @@
if (NULL != chn->tmit_head)
{
- transmit_message (chn);
+ GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
}
else if (GNUNET_YES == chn->is_disconnected
&& tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
@@ -2037,10 +2041,12 @@
static void
master_transmit_message (struct Master *mst)
{
+ if (NULL == mst->chn.tmit_head)
+ return;
if (NULL == mst->tmit_handle)
{
mst->tmit_handle
- = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+ = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id,
mst->max_group_generation,
master_transmit_notify, mst);
}
@@ -2057,10 +2063,12 @@
static void
slave_transmit_message (struct Slave *slv)
{
+ if (NULL == slv->chn.tmit_head)
+ return;
if (NULL == slv->tmit_handle)
{
slv->tmit_handle
- = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+ = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
slave_transmit_notify, slv);
}
else
@@ -2090,6 +2098,9 @@
if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
{
tmit_msg->id = ++mst->max_message_id;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p master_queue_message: message_id=%" PRIu64 "\n",
+ mst, tmit_msg->id);
struct GNUNET_PSYC_MessageMethod *pmeth
= (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
@@ -2159,7 +2170,6 @@
memcpy (&tmit_msg[1], data, data_size);
tmit_msg->client = client;
tmit_msg->size = data_size;
- tmit_msg->state = chn->tmit_state;
tmit_msg->first_ptype = first_ptype;
tmit_msg->last_ptype = last_ptype;
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2015-09-27 21:04:23 UTC (rev 36379)
+++ gnunet/src/psyc/test_psyc.c 2015-09-27 21:04:34 UTC (rev 36380)
@@ -225,7 +225,8 @@
if (NULL == msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message %" PRIu64 "\n", message_id);
+ "Test #%d: Error while master is receiving part of message #%"
PRIu64 ".\n",
+ test, message_id);
return;
}
@@ -243,7 +244,8 @@
if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Unexpected request flags: %x" PRIu32 "\n", flags);
+ "Test #%d: Unexpected request flags: %x" PRIu32 "\n",
+ test, flags);
GNUNET_assert (0);
return;
}
@@ -297,7 +299,8 @@
if (NULL == msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message " PRIu64 "\n", message_id);
+ "Test #%d: Error while slave is receiving part of message #%"
PRIu64 ".\n",
+ test, message_id);
return;
}
@@ -322,7 +325,7 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Test #%d: Unexpected flags for historic message: %x" PRIu32
"\n",
- flags);
+ test, flags);
GNUNET_assert (0);
return;
}
@@ -575,9 +578,9 @@
uint16_t size = strlen (tmit->data[tmit->n]);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmit notify data: %u bytes available, "
+ "Test #%d: Transmit notify data: %u bytes available, "
"processing fragment %u/%u (size %u).\n",
- *data_size, tmit->n + 1, tmit->data_count, size);
+ test, *data_size, tmit->n + 1, tmit->data_count, size);
if (*data_size < size)
{
*data_size = 0;
@@ -587,7 +590,8 @@
if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Test #%d: Transmission paused.\n", test);
tmit->paused = GNUNET_YES;
GNUNET_SCHEDULER_add_delayed (
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
@@ -611,9 +615,9 @@
{
struct TransmitClosure *tmit = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmit notify modifier: %lu bytes available, "
+ "Test #%d: Transmit notify modifier: %lu bytes available, "
"%u modifiers left to process.\n",
- *data_size, GNUNET_ENV_environment_get_count (tmit->env));
+ test, *data_size, GNUNET_ENV_environment_get_count (tmit->env));
uint16_t name_size = 0;
size_t value_size = 0;
@@ -688,9 +692,9 @@
void
slave_transmit ()
{
-
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
test = TEST_SLAVE_TRANSMIT;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Slave sending request to master.\n", test);
tmit = GNUNET_new (struct TransmitClosure);
tmit->env = GNUNET_ENV_environment_create ();
@@ -772,7 +776,7 @@
const struct GNUNET_PSYC_Message *join_msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave got join decision: %d\n", is_admitted);
+ "Test #%d: Slave got join decision: %d\n", test, is_admitted);
switch (test)
{
@@ -804,8 +808,8 @@
struct GNUNET_HashCode slave_key_hash;
GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got join request #%u from %s.\n",
- join_req_count, GNUNET_h2s (&slave_key_hash));
+ "Test #%d: Got join request #%u from %s.\n",
+ test, join_req_count, GNUNET_h2s (&slave_key_hash));
/* Reject first request */
int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
@@ -817,8 +821,8 @@
slave_connect_cb (void *cls, int result, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave connected: %d, max_message_id: %" PRIu64 "\n",
- result, max_message_id);
+ "Test #%d: Slave connected: %d, max_message_id: %" PRIu64 "\n",
+ test, result, max_message_id);
GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT ==
test);
GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
}
@@ -827,8 +831,8 @@
static void
slave_join (int t)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
test = t;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Joining slave.\n");
struct GNUNET_PeerIdentity origin = this_peer;
struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
@@ -852,8 +856,9 @@
void
master_transmit ()
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
test = TEST_MASTER_TRANSMIT;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Master sending message to all.\n", test);
end_count = 0;
uint32_t i, j;
@@ -907,8 +912,8 @@
master_start_cb (void *cls, int result, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Master started: %d, max_message_id: %" PRIu64 "\n",
- result, max_message_id);
+ "Test #%d: Master started: %d, max_message_id: %" PRIu64 "\n",
+ test, result, max_message_id);
GNUNET_assert (TEST_MASTER_START == test);
GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
slave_join (TEST_SLAVE_JOIN_REJECT);
@@ -918,8 +923,8 @@
void
master_start ()
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
test = TEST_MASTER_START;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Starting master.\n", test);
mst = GNUNET_PSYC_master_start (cfg, channel_key,
GNUNET_PSYC_CHANNEL_PRIVATE,
&master_start_cb, &join_request_cb,
&master_message_cb, &master_message_part_cb,
Modified: gnunet/src/psycstore/psyc_util_lib.c
===================================================================
--- gnunet/src/psycstore/psyc_util_lib.c 2015-09-27 21:04:23 UTC (rev
36379)
+++ gnunet/src/psycstore/psyc_util_lib.c 2015-09-27 21:04:34 UTC (rev
36380)
@@ -101,6 +101,12 @@
* Are we currently transmitting a message?
*/
uint8_t in_transmit;
+
+ /**
+ * Notify callback is currently being called.
+ */
+ uint8_t in_notify;
+
};
@@ -334,20 +340,20 @@
* Transmission handle.
* @param msg
* Message part, or NULL.
- * @param end
- * End of message?
+ * @param tmit_now
+ * Transmit message now, or wait for buffer to fill up?
* #GNUNET_YES or #GNUNET_NO.
*/
static void
transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
const struct GNUNET_MessageHeader *msg,
- uint8_t end)
+ uint8_t tmit_now)
{
uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message part of type %u and size %u (end: %u)).\n",
- NULL != msg ? ntohs (msg->type) : 0, size, end);
+ "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
+ NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
if (NULL != tmit->msg)
{
@@ -380,7 +386,7 @@
}
if (NULL != tmit->msg
- && (GNUNET_YES == end
+ && (GNUNET_YES == tmit_now
|| (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
< tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
{
@@ -391,9 +397,6 @@
tmit->msg = NULL;
tmit->acks_pending++;
}
-
- if (GNUNET_YES == end)
- tmit->in_transmit = GNUNET_NO;
}
@@ -414,7 +417,9 @@
if (NULL != tmit->notify_data)
{
data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
+ tmit->in_notify = GNUNET_YES;
notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size,
&msg[1]);
+ tmit->in_notify = GNUNET_NO;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"transmit_data (ret: %d, size: %u): %.*s\n",
@@ -442,6 +447,7 @@
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
transmit_queue_insert (tmit, msg, GNUNET_YES);
+ tmit->in_transmit = GNUNET_NO;
return;
}
@@ -458,6 +464,8 @@
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
msg->size = htons (sizeof (*msg));
transmit_queue_insert (tmit, msg, GNUNET_YES);
+ /* FIXME: wait for ACK before setting in_transmit to no */
+ tmit->in_transmit = GNUNET_NO;
}
}
@@ -489,8 +497,10 @@
{
max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
data_size = max_data_size;
+ tmit->in_notify = GNUNET_YES;
notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
&mod->oper, &mod->value_size);
+ tmit->in_notify = GNUNET_NO;
}
mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
@@ -520,8 +530,10 @@
{
max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
data_size = max_data_size;
+ tmit->in_notify = GNUNET_YES;
notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
&data_size, &msg[1], NULL, NULL);
+ tmit->in_notify = GNUNET_NO;
}
tmit->mod_value_remaining -= data_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -558,8 +570,8 @@
tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
-
transmit_queue_insert (tmit, msg, GNUNET_YES);
+ tmit->in_transmit = GNUNET_NO;
return;
}
@@ -748,6 +760,9 @@
void
GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
{
+ if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
+ return;
+
if (0 == tmit->acks_pending)
{
tmit->paused = GNUNET_NO;
@@ -800,13 +815,11 @@
{
case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
- if (GNUNET_NO == tmit->paused)
- transmit_mod (tmit);
+ transmit_mod (tmit);
break;
case GNUNET_PSYC_MESSAGE_STATE_DATA:
- if (GNUNET_NO == tmit->paused)
- transmit_data (tmit);
+ transmit_data (tmit);
break;
case GNUNET_PSYC_MESSAGE_STATE_END:
Modified: gnunet/src/social/social_api.c
===================================================================
--- gnunet/src/social/social_api.c 2015-09-27 21:04:23 UTC (rev 36379)
+++ gnunet/src/social/social_api.c 2015-09-27 21:04:34 UTC (rev 36380)
@@ -1837,8 +1837,10 @@
{
if (GNUNET_OK ==
GNUNET_PSYC_transmit_message (hst->plc.tmit, method_name, env,
- NULL, notify_data, notify_data_cls,
flags));
- return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit;
+ NULL, notify_data, notify_data_cls, flags))
+ return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit;
+ else
+ return NULL;
}
@@ -2168,8 +2170,10 @@
if (GNUNET_OK ==
GNUNET_PSYC_transmit_message (plc->tmit, method_name, env,
- NULL, notify_data, notify_data_cls,
flags));
- return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit;
+ NULL, notify_data, notify_data_cls, flags))
+ return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit;
+ else
+ return NULL;
}
Modified: gnunet/src/util/client_manager.c
===================================================================
--- gnunet/src/util/client_manager.c 2015-09-27 21:04:23 UTC (rev 36379)
+++ gnunet/src/util/client_manager.c 2015-09-27 21:04:34 UTC (rev 36380)
@@ -328,7 +328,7 @@
mgr->client_tmit
= GNUNET_CLIENT_notify_transmit_ready (mgr->client,
- ntohs (mgr->tmit_head->msg->size),
+ GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_NO,
&send_next_message,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r36380 - in gnunet/src: include multicast psyc psycstore social util,
gnunet <=