[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r35327 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r35327 - gnunet/src/fs |
Date: |
Sun, 1 Mar 2015 02:12:22 +0100 |
Author: grothoff
Date: 2015-03-01 02:12:22 +0100 (Sun, 01 Mar 2015)
New Revision: 35327
Modified:
gnunet/src/fs/gnunet-service-fs_cp.c
Log:
count number of pending replies and refuse to process queries if queue gets too
big
Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c 2015-02-28 21:12:03 UTC (rev
35326)
+++ gnunet/src/fs/gnunet-service-fs_cp.c 2015-03-01 01:12:22 UTC (rev
35327)
@@ -86,7 +86,7 @@
/**
* Task called on timeout, or 0 for none.
*/
- struct GNUNET_SCHEDULER_Task * timeout_task;
+ struct GNUNET_SCHEDULER_Task *timeout_task;
/**
* Function to call to get the actual message.
@@ -155,7 +155,7 @@
/**
* Task for the delay.
*/
- struct GNUNET_SCHEDULER_Task * delay_task;
+ struct GNUNET_SCHEDULER_Task *delay_task;
/**
* Size of the message.
@@ -184,7 +184,7 @@
/**
* Task for asynchronous stopping of this request.
*/
- struct GNUNET_SCHEDULER_Task * kill_task;
+ struct GNUNET_SCHEDULER_Task *kill_task;
};
@@ -209,7 +209,7 @@
/**
* Task scheduled to revive migration to this peer.
*/
- struct GNUNET_SCHEDULER_Task * mig_revive_task;
+ struct GNUNET_SCHEDULER_Task *mig_revive_task;
/**
* Messages (replies, queries, content migration) we would like to
@@ -248,7 +248,7 @@
/**
* Task scheduled if we need to retry bandwidth reservation later.
*/
- struct GNUNET_SCHEDULER_Task * rc_delay_task;
+ struct GNUNET_SCHEDULER_Task *rc_delay_task;
/**
* Active requests from this neighbour, map of query to 'struct PeerRequest'.
@@ -276,6 +276,11 @@
unsigned int cth_in_progress;
/**
+ * Number of entries in @e delayed_head DLL.
+ */
+ unsigned int delay_queue_size;
+
+ /**
* Respect rating for this peer on disk.
*/
uint32_t disk_respect;
@@ -298,8 +303,8 @@
unsigned int last_request_times_off;
/**
- * GNUNET_YES if we did successfully reserve 32k bandwidth,
- * GNUNET_NO if not.
+ * #GNUNET_YES if we did successfully reserve 32k bandwidth,
+ * #GNUNET_NO if not.
*/
int did_reserve;
@@ -439,10 +444,13 @@
GNUNET_assert (NULL == cp->cth);
cp->cth_in_progress++;
cp->cth =
- GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
+ GNUNET_CORE_notify_transmit_ready (GSF_core,
+ GNUNET_YES,
GNUNET_CORE_PRIO_BACKGROUND,
GNUNET_TIME_absolute_get_remaining
- (pth->timeout), &target, pth->size,
+ (pth->timeout),
+ &target,
+ pth->size,
&peer_transmit_ready_cb, cp);
GNUNET_assert (NULL != cp->cth);
GNUNET_assert (0 < cp->cth_in_progress--);
@@ -458,7 +466,9 @@
* @return number of bytes copied to @a buf
*/
static size_t
-peer_transmit_ready_cb (void *cls, size_t size, void *buf)
+peer_transmit_ready_cb (void *cls,
+ size_t size,
+ void *buf)
{
struct GSF_ConnectedPeer *cp = cls;
struct GSF_PeerTransmitHandle *pth = cp->pth_head;
@@ -478,7 +488,9 @@
GNUNET_SCHEDULER_cancel (pth->timeout_task);
pth->timeout_task = NULL;
}
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
{
cp->ppd.last_request_times[(cp->last_request_times_off++) %
@@ -511,7 +523,8 @@
* @param tc scheduler context
*/
static void
-retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+retry_reservation (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GSF_ConnectedPeer *cp = cls;
struct GNUNET_PeerIdentity target;
@@ -519,7 +532,9 @@
GNUNET_PEER_resolve (cp->ppd.pid, &target);
cp->rc_delay_task = NULL;
cp->rc =
- GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
+ GNUNET_ATS_reserve_bandwidth (GSF_ats,
+ &target,
+ DBLOCK_SIZE,
&ats_reserve_callback, cp);
}
@@ -736,7 +751,9 @@
* @return number of bytes copied to @a buf, can be 0 (without indicating an
error)
*/
static size_t
-copy_reply (void *cls, size_t buf_size, void *buf)
+copy_reply (void *cls,
+ size_t buf_size,
+ void *buf)
{
struct PutMessage *pm = cls;
size_t size;
@@ -845,15 +862,23 @@
struct GSF_DelayedHandle *dh = cls;
struct GSF_ConnectedPeer *cp = dh->cp;
- GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size--;
if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
{
GNUNET_free (dh->pm);
GNUNET_free (dh);
return;
}
- (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
- dh->msize, ©_reply, dh->pm);
+ (void) GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ dh->msize,
+ ©_reply,
+ dh->pm);
GNUNET_free (dh);
}
@@ -967,8 +992,9 @@
pm->type = htonl (type);
pm->expiration = GNUNET_TIME_absolute_hton (expiration);
memcpy (&pm[1], data, data_len);
- if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
- (GNUNET_YES == GSF_enable_randomized_delays))
+ if ( (UINT32_MAX != reply_anonymity_level) &&
+ (0 != reply_anonymity_level) &&
+ (GNUNET_YES == GSF_enable_randomized_delays) )
{
struct GSF_DelayedHandle *dh;
@@ -976,15 +1002,24 @@
dh->cp = cp;
dh->pm = pm;
dh->msize = msize;
- GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size++;
dh->delay_task =
GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
- &transmit_delayed_now, dh);
+ &transmit_delayed_now,
+ dh);
}
else
{
- (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
- ©_reply, pm);
+ (void) GSF_peer_transmit_ (cp,
+ GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ msize,
+ ©_reply,
+ pm);
}
if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
return;
@@ -1164,7 +1199,6 @@
enum GNUNET_BLOCK_Type type;
GNUNET_PEER_Id spid;
- GNUNET_assert (other != NULL);
msize = ntohs (message->size);
if (msize < sizeof (struct GetMessage))
{
@@ -1173,7 +1207,8 @@
}
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
- ("# GET requests received (from other peers)"), 1,
+ ("# GET requests received (from other peers)"),
+ 1,
GNUNET_NO);
gm = (const struct GetMessage *) message;
type = ntohl (gm->type);
@@ -1219,25 +1254,36 @@
{
if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to find RETURN-TO peer `%4s' in connection set.
Dropping query.\n",
+ "Failed to find RETURN-TO peer `%s' in connection set.
Dropping query.\n",
GNUNET_i2s (&opt[bits - 1]));
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to find peer `%4s' in connection set. Dropping
query.\n",
+ "Failed to find peer `%s' in connection set. Dropping
query.\n",
GNUNET_i2s (other));
-#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requests dropped due to missing reverse
route"),
- 1, GNUNET_NO);
-#endif
+ 1,
+ GNUNET_NO);
return NULL;
}
+ if (cp->ppd.pending_replies + cp->delay_queue_size > 128)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%s' has too many replies queued already. Dropping
query.\n",
+ GNUNET_i2s (other));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# requests dropped due to full
reply queue"),
+ 1,
+ GNUNET_NO);
+ return NULL;
+ }
/* note that we can really only check load here since otherwise
* peers could find out that we are overloaded by not being
* disconnected after sending us a malformed query... */
- priority = bound_priority (ntohl (gm->priority), cps);
+ priority = bound_priority (ntohl (gm->priority),
+ cps);
if (priority < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1246,7 +1292,7 @@
return NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received request for `%s' of type %u from peer `%4s' with flags
%u\n",
+ "Received request for `%s' of type %u from peer `%s' with flags
%u\n",
GNUNET_h2s (&gm->query),
(unsigned int) type,
GNUNET_i2s (other),
@@ -1359,7 +1405,9 @@
"Timeout trying to transmit to other peer\n");
pth->timeout_task = NULL;
cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
else if (GNUNET_NO == pth->is_query)
@@ -1419,13 +1467,18 @@
prev = pos;
pos = pos->next;
}
- GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
+ GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
+ cp->pth_tail,
+ prev,
+ pth);
if (GNUNET_YES == is_query)
cp->ppd.pending_queries++;
else if (GNUNET_NO == is_query)
cp->ppd.pending_replies++;
- pth->timeout_task =
- GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
+ pth->timeout_task
+ = GNUNET_SCHEDULER_add_delayed (timeout,
+ &peer_transmit_timeout,
+ pth);
schedule_transmission (pth);
return pth;
}
@@ -1447,7 +1500,9 @@
pth->timeout_task = NULL;
}
cp = pth->cp;
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
else if (GNUNET_NO == pth->is_query)
@@ -1614,13 +1669,22 @@
GNUNET_SCHEDULER_cancel (pth->timeout_task);
pth->timeout_task = NULL;
}
- GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
+ GNUNET_CONTAINER_DLL_remove (cp->pth_head,
+ cp->pth_tail,
+ pth);
+ if (GNUNET_YES == pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_queries--);
+ else if (GNUNET_NO == pth->is_query)
+ GNUNET_assert (0 < cp->ppd.pending_replies--);
pth->gmc (pth->gmc_cls, 0, NULL);
GNUNET_free (pth);
}
while (NULL != (dh = cp->delayed_head))
{
- GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ cp->delay_queue_size--;
GNUNET_SCHEDULER_cancel (dh->delay_task);
GNUNET_free (dh->pm);
GNUNET_free (dh);
@@ -1631,6 +1695,8 @@
GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
cp->mig_revive_task = NULL;
}
+ GNUNET_break (0 == cp->ppd.pending_queries);
+ GNUNET_break (0 == cp->ppd.pending_replies);
GNUNET_free (cp);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r35327 - gnunet/src/fs,
gnunet <=