[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r36211 - gnunet/src/rps
From: |
gnunet |
Subject: |
[GNUnet-SVN] r36211 - gnunet/src/rps |
Date: |
Wed, 5 Aug 2015 23:47:36 +0200 |
Author: ch3
Date: 2015-08-05 23:47:36 +0200 (Wed, 05 Aug 2015)
New Revision: 36211
Modified:
gnunet/src/rps/gnunet-service-rps.c
Log:
-keep track of messages passed to mq
Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2015-08-05 21:47:34 UTC (rev 36210)
+++ gnunet/src/rps/gnunet-service-rps.c 2015-08-05 21:47:36 UTC (rev 36211)
@@ -183,6 +183,34 @@
/**
+ * List containing all messages that are yet to be send
+ */
+struct PendingMessage
+{
+ /**
+ * DLL next, prev
+ */
+ struct PendingMessage *next;
+ struct PendingMessage *prev;
+
+ /**
+ * The envelope to the corresponding message
+ */
+ struct GNUNET_MQ_Envelope *ev;
+
+ /**
+ * The corresponding context
+ */
+ struct PeerContext *peer_ctx;
+
+ /**
+ * The message type
+ */
+ const char *type;
+};
+
+
+/**
* Struct used to keep track of other peer's status
*
* This is stored in a multipeermap.
@@ -232,6 +260,12 @@
uint32_t peer_flags;
/**
+ * DLL with all messages that are yet to be sent
+ */
+ struct PendingMessage *pending_messages_head;
+ struct PendingMessage *pending_messages_tail;
+
+ /**
* This is pobably followed by 'statistical' data (when we first saw
* him, how did we get his ID, how many pushes (in a timeinterval),
* ...)
@@ -1148,6 +1182,45 @@
/**
+ * @brief Add an envelope to a message passed to mq to list of pending messages
+ *
+ * @param peer peer the message was sent to
+ * @param ev envelope to the message
+ * @param type type of the message to be sent
+ */
+static struct PendingMessage *
+insert_pending_message (const struct GNUNET_PeerIdentity *peer,
+ struct GNUNET_MQ_Envelope *ev,
+ const char *type)
+{
+ struct PendingMessage *pending_msg;
+ struct PeerContext *peer_ctx;
+
+ peer_ctx = get_peer_ctx (peer);
+ pending_msg = GNUNET_new (struct PendingMessage);
+ pending_msg->ev = ev;
+ pending_msg->peer_ctx = peer_ctx;
+ pending_msg->type = type;
+ GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
+ peer_ctx->pending_messages_tail,
+ pending_msg);
+ return pending_msg;
+}
+
+static void
+remove_pending_message (struct PendingMessage *pending_msg)
+{
+ struct PeerContext *peer_ctx;
+
+ peer_ctx = pending_msg->peer_ctx;
+ GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
+ peer_ctx->pending_messages_tail,
+ pending_msg);
+ GNUNET_free (pending_msg);
+}
+
+
+/**
* @brief This is called once a message is sent.
*
* @param cls type of the message that was sent
@@ -1155,10 +1228,11 @@
static void
mq_notify_sent_cb (void *cls)
{
- const char *type = cls;
+ struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s was sent.\n",
- type);
+ pending_msg->type);
+ remove_pending_message (pending_msg);
}
@@ -1178,6 +1252,7 @@
struct GNUNET_MQ_Handle *mq;
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
+ struct PendingMessage *pending_msg;
/* Compute actual size */
send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
@@ -1208,9 +1283,10 @@
memcpy (&out_msg[1], peer_ids,
send_size * sizeof (struct GNUNET_PeerIdentity));
+ pending_msg = insert_pending_message (peer_id, ev, "PULL REPLY");
GNUNET_MQ_notify_sent (ev,
mq_notify_sent_cb,
- "PULL REPLY");
+ pending_msg);
GNUNET_MQ_send (mq, ev);
}
@@ -1923,11 +1999,12 @@
* @param peer_id the peer to send the pull request to.
*/
static void
-send_pull_request (struct GNUNET_PeerIdentity *peer_id)
+send_pull_request (const struct GNUNET_PeerIdentity *peer_id)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MQ_Handle *mq;
struct PeerContext *peer_ctx;
+ struct PendingMessage *pending_msg;
peer_ctx = get_peer_ctx (peer_id);
GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING));
@@ -1939,9 +2016,10 @@
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
mq = get_mq (peer_id);
+ pending_msg = insert_pending_message (peer_id, ev, "PULL REQUEST");
GNUNET_MQ_notify_sent (ev,
mq_notify_sent_cb,
- "PULL REQUEST");
+ pending_msg);
GNUNET_MQ_send (mq, ev);
}
@@ -1952,10 +2030,11 @@
* @param peer_id the peer to send the push to.
*/
static void
-send_push (struct GNUNET_PeerIdentity *peer_id)
+send_push (const struct GNUNET_PeerIdentity *peer_id)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MQ_Handle *mq;
+ struct PendingMessage *pending_msg;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Going to send PUSH to peer %s.\n",
@@ -1963,9 +2042,10 @@
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
mq = get_mq (peer_id);
+ pending_msg = insert_pending_message (peer_id, ev, "PUSH");
GNUNET_MQ_notify_sent (ev,
mq_notify_sent_cb,
- "PUSH");
+ pending_msg);
GNUNET_MQ_send (mq, ev);
}
@@ -2542,7 +2622,7 @@
GNUNET_i2s (&peer_ctx->peer_id));
/* Remove it from the sampler used for the Brahms protocol */
- RPS_sampler_reinitialise_by_value (prot_sampler, key);
+ RPS_sampler_reinitialise_by_value (prot_sampler, key);
/* If operations are still scheduled for this peer cancel those */
if (0 != peer_ctx->num_outstanding_ops)
@@ -2585,6 +2665,18 @@
if (GNUNET_YES == in_arr (pull_list, pull_list_size, key))
rem_from_list (&pull_list, &pull_list_size, key);
+ /* Cancle messages that have not been sent yet */
+ while (NULL != peer_ctx->pending_messages_head)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing unsent %s\n",
+ peer_ctx->pending_messages_head->type);
+ /* We are not able to cancel messages as #GNUNET_CADET_mq_create () does
not
+ * set a #GNUNET_MQ_CancelImpl */
+ /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */
+ remove_pending_message (peer_ctx->pending_messages_head);
+ }
+
/* If there is still a mq destroy it */
if (NULL != peer_ctx->mq)
{
@@ -2636,19 +2728,20 @@
if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) &&
(GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (view, peer)) &&
- (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
&&
(GNUNET_NO == in_arr (push_list, push_list_size, peer)) &&
- (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) )
+ (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) &&
+ (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
)
{
peer_ctx = get_peer_ctx (peer);
if ( (NULL == peer_ctx->recv_channel) &&
+ (NULL == peer_ctx->pending_messages_head) &&
(GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) )
{
#ifdef ENABLE_MALICIOUS
if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
peer_remove_cb (NULL, peer, peer_ctx);
- #else
+ #else /* ENABLE_MALICIOUS */
peer_remove_cb (NULL, peer, peer_ctx);
#endif /* ENABLE_MALICIOUS */
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r36211 - gnunet/src/rps,
gnunet <=