[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r11392 - in gnunet: . src/datastore src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r11392 - in gnunet: . src/datastore src/fs |
Date: |
Mon, 17 May 2010 09:17:48 +0200 |
Author: grothoff
Date: 2010-05-17 09:17:48 +0200 (Mon, 17 May 2010)
New Revision: 11392
Modified:
gnunet/TODO
gnunet/src/datastore/datastore_api.c
gnunet/src/fs/fs.h
gnunet/src/fs/fs_test_lib_data.conf
gnunet/src/fs/gnunet-service-fs.c
Log:
lc stuff
Modified: gnunet/TODO
===================================================================
--- gnunet/TODO 2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/TODO 2010-05-17 07:17:48 UTC (rev 11392)
@@ -1,12 +1,55 @@
0.9.0pre1:
* FS: [CG]
- - migration:
- + on-demand encoding
- + peer selection => how to consider latency/bw/etc?
- + content transmission => how often the same block?
- + testing
- - gnunet-service-fs (hot-path routing, load-based routing, nitpicks)
- + active reply route caching design & implementation of service; gap
extension!
+ - test migration
+ - TTL/priority calculations
+ - hot-path routing, load considerations
+ - statistics
+ - active reply route caching design & implementation of service; gap
extension!
+ - Indexing:
+May 16 12:49:50 fs-13737 WARNING `open' failed on file
`/home/grothoff/svn/gnunet/src/fs/H/' at disk.c:1253 with error: No such file
or directory
+May 16 12:49:50 fs-13737 WARNING Could not access indexed file `ENUTBMBR' at
offset 2064384: No such file or directory
+ NOTE: corrupted filename in open message
+ NOTE: odd directory name in open message
+
+==14995== 8 bytes in 1 blocks are definitely lost in loss record 1 of 12
+==14995== at 0x4024C4C: malloc (vg_replace_malloc.c:195)
+==14995== by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92)
+==14995== by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61)
+==14995== by 0x40519F5: GNUNET_DATASTORE_get_random (datastore_api.c:1102)
+==14995== by 0x804ADCF: gather_migration_blocks (gnunet-service-fs.c:969)
+==14995== by 0x40864C8: run_ready (scheduler.c:514)
+==14995== by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642)
+==14995== by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404)
+==14995== by 0x804F725: main (gnunet-service-fs.c:3506)
+==14995==
+==14995== 8 bytes in 1 blocks are definitely lost in loss record 2 of 12
+==14995== at 0x4024C4C: malloc (vg_replace_malloc.c:195)
+==14995== by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92)
+==14995== by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61)
+==14995== by 0x4051ACB: GNUNET_DATASTORE_get (datastore_api.c:1160)
+==14995== by 0x804F39A: handle_start_search (gnunet-service-fs.c:3352)
+==14995== by 0x4087F9A: GNUNET_SERVER_inject (server.c:653)
+==14995== by 0x40880A8: process_client_buffer (server.c:714)
+==14995== by 0x4088529: restart_processing (server.c:848)
+==14995== by 0x40864C8: run_ready (scheduler.c:514)
+==14995== by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642)
+==14995== by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404)
+==14995== by 0x804F725: main (gnunet-service-fs.c:3506)
+==14995==
+==14995== 120 bytes in 15 blocks are definitely lost in loss record 5 of 12
+==14995== at 0x4024C4C: malloc (vg_replace_malloc.c:195)
+==14995== by 0x4068F05: GNUNET_xmalloc_unchecked_ (common_allocation.c:92)
+==14995== by 0x4068E33: GNUNET_xmalloc_ (common_allocation.c:61)
+==14995== by 0x4050DA1: GNUNET_DATASTORE_put (datastore_api.c:695)
+==14995== by 0x804DD79: handle_p2p_put (gnunet-service-fs.c:2591)
+==14995== by 0x40588B8: main_notify_handler (core_api.c:468)
+==14995== by 0x4067DAE: receive_task (client.c:499)
+==14995== by 0x40864C8: run_ready (scheduler.c:514)
+==14995== by 0x4086970: GNUNET_SCHEDULER_run (scheduler.c:642)
+==14995== by 0x408CF1B: GNUNET_SERVICE_run (service.c:1404)
+==14995== by 0x804F725: main (gnunet-service-fs.c:3506)
+==14995==
+
* TBENCH: [MW]
- good to have for transport/DV evaluation!
* DV: [Nate]
Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c 2010-05-17 05:28:03 UTC (rev
11391)
+++ gnunet/src/datastore/datastore_api.c 2010-05-17 07:17:48 UTC (rev
11392)
@@ -255,20 +255,8 @@
}
while (NULL != (qe = h->queue_head))
{
- if (NULL != qe->response_proc)
- {
- qe->response_proc (qe, NULL);
- }
- else
- {
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- if (qe->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (h->sched,
- qe->task);
- GNUNET_free (qe);
- }
+ GNUNET_assert (NULL != qe->response_proc);
+ qe->response_proc (qe, NULL);
}
if (GNUNET_YES == drop)
{
@@ -385,15 +373,8 @@
{
if (pos->max_queue < h->queue_size)
{
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- pos);
- GNUNET_SCHEDULER_cancel (h->sched,
- pos->task);
- if (pos->response_proc != NULL)
- pos->response_proc (pos, NULL);
- GNUNET_free (pos);
- h->queue_size--;
+ GNUNET_assert (pos->response_proc != NULL);
+ pos->response_proc (pos, NULL);
break;
}
pos = pos->next;
@@ -565,6 +546,24 @@
}
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+ struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+ GNUNET_CONTAINER_DLL_remove (h->queue_head,
+ h->queue_tail,
+ qe);
+ if (qe->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched,
+ qe->task);
+ qe->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ h->queue_size--;
+ GNUNET_free (qe);
+}
+
/**
* Type of a function to call when we receive a message
* from the service.
@@ -584,16 +583,7 @@
const char *emsg;
int32_t status;
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- if (qe->task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched,
- qe->task);
- qe->task = GNUNET_SCHEDULER_NO_TASK;
- }
- GNUNET_free (qe);
+ free_queue_entry (qe);
if (msg == NULL)
{
if (NULL == h->client)
@@ -1018,10 +1008,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Failed to receive response from datastore\n"));
#endif
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- GNUNET_free (qe);
+ free_queue_entry (qe);
do_disconnect (h);
rc->iter (rc->iter_cls,
NULL, 0, NULL, 0, 0, 0,
@@ -1036,10 +1023,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received end of result set\n");
#endif
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- GNUNET_free (qe);
+ free_queue_entry (qe);
rc->iter (rc->iter_cls,
NULL, 0, NULL, 0, 0, 0,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
@@ -1052,10 +1036,7 @@
(ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct
DataMessage*)msg)->size)) )
{
GNUNET_break (0);
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- GNUNET_free (qe);
+ free_queue_entry (qe);
h->retry_time = GNUNET_TIME_UNIT_ZERO;
do_disconnect (h);
rc->iter (rc->iter_cls,
@@ -1226,10 +1207,7 @@
GNUNET_TIME_absolute_get_remaining (qe->timeout));
return;
}
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- GNUNET_free (qe);
+ free_queue_entry (qe);
h->retry_time = GNUNET_TIME_UNIT_ZERO;
do_disconnect (h);
rc->iter (rc->iter_cls,
@@ -1253,13 +1231,8 @@
h = qe->h;
reconnect = qe->was_transmitted;
- GNUNET_CONTAINER_DLL_remove (h->queue_head,
- h->queue_tail,
- qe);
- if (qe->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (h->sched,
- qe->task);
- GNUNET_free (qe);
+ free_queue_entry (qe);
+ h->queue_size--;
if (reconnect)
{
h->retry_time = GNUNET_TIME_UNIT_ZERO;
Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h 2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/src/fs/fs.h 2010-05-17 07:17:48 UTC (rev 11392)
@@ -1,3 +1,4 @@
+
/*
This file is part of GNUnet.
(C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and
other contributing authors)
@@ -42,6 +43,20 @@
#define MAX_MIGRATION_QUEUE 32
/**
+ * How many peers do we select as possible
+ * targets per block obtained for migration?
+ */
+#define MIGRATION_LIST_SIZE 4
+
+/**
+ * To how many peers do we forward each migration block ultimately?
+ * This number must be smaller or equal to MIGRATION_LIST_SIZE. Using
+ * a smaller value allows for variation in available bandwidth (for
+ * migration) between the peers.
+ */
+#define MIGRATION_TARGET_COUNT 2
+
+/**
* Ratio for moving average delay calculation. The previous
* average goes in with a factor of (n-1) into the calculation.
* Must be > 0.
Modified: gnunet/src/fs/fs_test_lib_data.conf
===================================================================
--- gnunet/src/fs/fs_test_lib_data.conf 2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/src/fs/fs_test_lib_data.conf 2010-05-17 07:17:48 UTC (rev 11392)
@@ -53,7 +53,7 @@
HOSTNAME = localhost
#OPTIONS = -L DEBUG
#DEBUG = YES
-#PREFIX = valgrind --tool=memcheck --leak-check=yes
+PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-fs
#PREFIX = xterm -e gdb -x cmd --args
Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c 2010-05-17 05:28:03 UTC (rev 11391)
+++ gnunet/src/fs/gnunet-service-fs.c 2010-05-17 07:17:48 UTC (rev 11392)
@@ -28,8 +28,7 @@
* TODO:
* - have non-zero preference / priority for requests we initiate!
* - implement hot-path routing decision procedure
- * - implement: bound_priority, test_load_too_high, validate_nblock
- * - add content migration support (forward from migration list)
+ * - implement: bound_priority, test_load_too_high
* - statistics
*/
#include "platform.h"
@@ -586,11 +585,22 @@
struct GNUNET_TIME_Absolute expiration;
/**
+ * Peers we would consider forwarding this
+ * block to. Zero for empty entries.
+ */
+ GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
+
+ /**
* Size of the block.
*/
size_t size;
/**
+ * Number of targets already used.
+ */
+ unsigned int used_targets;
+
+ /**
* Type of the block.
*/
enum GNUNET_BLOCK_Type type;
@@ -684,6 +694,25 @@
*/
static int active_migration;
+
+/**
+ * Transmit messages by copying it to the target buffer
+ * "buf". "buf" will be NULL and "size" zero if the socket was closed
+ * for writing in the meantime. In that case, do nothing
+ * (the disconnect or shutdown handler will take care of the rest).
+ * If we were able to transmit messages and there are still more
+ * pending, ask core again for further calls to this function.
+ *
+ * @param cls closure, pointer to the 'struct ConnectedPeer*'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_to_peer (void *cls,
+ size_t size, void *buf);
+
+
/* ******************* clean up functions ************************ */
@@ -698,16 +727,38 @@
GNUNET_CONTAINER_DLL_remove (mig_head,
mig_tail,
mb);
+ GNUNET_PEER_decrement_rcs (mb->target_list,
+ MIGRATION_LIST_SIZE);
mig_size--;
GNUNET_free (mb);
}
/**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+ const struct GNUNET_PeerIdentity *p1,
+ const struct GNUNET_PeerIdentity *p2)
+{
+ return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+ &p2->hashPubKey,
+ key);
+}
+
+
+/**
* Consider migrating content to a given peer.
*
- * @param cls not used
- * @param key ID of the peer (not used)
+ * @param cls 'struct MigrationReadyBlock*' to select
+ * targets for (or NULL for none)
+ * @param key ID of the peer
* @param value 'struct ConnectedPeer' of the peer
* @return GNUNET_YES (always continue iteration)2
*/
@@ -716,17 +767,92 @@
const GNUNET_HashCode *key,
void *value)
{
+ struct MigrationReadyBlock *mb = cls;
struct ConnectedPeer *cp = value;
+ struct MigrationReadyBlock *pos;
+ struct GNUNET_PeerIdentity cppid;
+ struct GNUNET_PeerIdentity otherpid;
+ struct GNUNET_PeerIdentity worstpid;
+ size_t msize;
+ unsigned int i;
+ unsigned int repl;
+ /* consider 'cp' as a migration target for mb */
+ if (mb != NULL)
+ {
+ GNUNET_PEER_resolve (cp->pid,
+ &cppid);
+ repl = MIGRATION_LIST_SIZE;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (mb->target_list[i] == 0)
+ {
+ mb->target_list[i] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[i], 1);
+ repl = MIGRATION_LIST_SIZE;
+ break;
+ }
+ GNUNET_PEER_resolve (mb->target_list[i],
+ &otherpid);
+ if ( (repl == MIGRATION_LIST_SIZE) &&
+ is_closer (&mb->query,
+ &cppid,
+ &otherpid))
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ else if ( (repl != MIGRATION_LIST_SIZE) &&
+ (is_closer (&mb->query,
+ &worstpid,
+ &otherpid) ) )
+ {
+ repl = i;
+ worstpid = otherpid;
+ }
+ }
+ if (repl != MIGRATION_LIST_SIZE)
+ {
+ GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+ mb->target_list[repl] = cp->pid;
+ GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+ }
+ }
+
+ /* consider scheduling transmission to cp for content migration */
if (cp->cth != NULL)
- return GNUNET_YES; /* or what? */
- /* FIXME: not implemented! */
+ return GNUNET_YES;
+ msize = 0;
+ pos = mig_head;
+ while (pos != NULL)
+ {
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (cp->pid == pos->target_list[i])
+ {
+ if (msize == 0)
+ msize = pos->size;
+ else
+ msize = GNUNET_MIN (msize,
+ pos->size);
+ break;
+ }
+ }
+ pos = pos->next;
+ }
+ if (msize == 0)
+ return GNUNET_YES; /* no content available */
+ cp->cth
+ = GNUNET_CORE_notify_transmit_ready (core,
+ 0, GNUNET_TIME_UNIT_FOREVER_REL,
+ (const struct GNUNET_PeerIdentity*)
key,
+ msize + sizeof (struct PutMessage),
+ &transmit_to_peer,
+ cp);
return GNUNET_YES;
}
-
-
/**
* Task that is run periodically to obtain blocks for content
* migration
@@ -740,6 +866,32 @@
/**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+ struct GNUNET_TIME_Relative delay;
+
+ if (mig_qe != NULL)
+ return;
+ if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+ return;
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ mig_size);
+ delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+ MAX_MIGRATION_QUEUE);
+ delay = GNUNET_TIME_relative_max (delay,
+ min_migration_delay);
+ mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+ delay,
+ &gather_migration_blocks,
+ NULL);
+}
+
+
+/**
* Process content offered for migration.
*
* @param cls closure
@@ -765,26 +917,25 @@
expiration, uint64_t uid)
{
struct MigrationReadyBlock *mb;
- struct GNUNET_TIME_Relative delay;
if (key == NULL)
{
mig_qe = NULL;
if (mig_size < MAX_MIGRATION_QUEUE)
- {
- delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
- mig_size);
- delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
- MAX_MIGRATION_QUEUE);
- delay = GNUNET_TIME_relative_max (delay,
- min_migration_delay);
- mig_task = GNUNET_SCHEDULER_add_delayed (sched,
- delay,
- &gather_migration_blocks,
- NULL);
- }
+ consider_migration_gathering ();
return;
}
+ if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
+ {
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key, size, data,
+ type, priority, anonymity,
+ expiration, uid,
+ &process_migration_content,
+ NULL))
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ return;
+ }
mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
mb->query = *key;
mb->expiration = expiration;
@@ -796,10 +947,9 @@
mig_tail,
mb);
mig_size++;
- if (mig_size == 1)
- GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
- &consider_migration,
- NULL);
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ mb);
GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
}
@@ -978,7 +1128,8 @@
uint32_t distance)
{
struct ConnectedPeer *cp;
-
+ struct MigrationReadyBlock *pos;
+
cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
cp->pid = GNUNET_PEER_intern (peer);
GNUNET_break (GNUNET_OK ==
@@ -986,8 +1137,13 @@
&peer->hashPubKey,
cp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- if (mig_size > 0)
- (void) consider_migration (NULL, &peer->hashPubKey, cp);
+
+ pos = mig_head;
+ while (NULL != pos)
+ {
+ (void) consider_migration (pos, &peer->hashPubKey, cp);
+ pos = pos->next;
+ }
}
@@ -1031,6 +1187,8 @@
struct ConnectedPeer *cp;
struct PendingMessage *pm;
unsigned int i;
+ struct MigrationReadyBlock *pos;
+ struct MigrationReadyBlock *next;
GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
&peer->hashPubKey,
@@ -1052,6 +1210,31 @@
GNUNET_CONTAINER_multihashmap_remove (connected_peers,
&peer->hashPubKey,
cp));
+ /* remove this peer from migration considerations; schedule
+ alternatives */
+ next = mig_head;
+ while (NULL != (pos = next))
+ {
+ next = pos->next;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if (pos->target_list[i] == cp->pid)
+ {
+ GNUNET_PEER_change_rc (pos->target_list[i], -1);
+ pos->target_list[i] = 0;
+ if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size
(connected_peers))
+ {
+ delete_migration_block (pos);
+ consider_migration_gathering ();
+ }
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ pos);
+ break;
+ }
+ }
+ }
+
GNUNET_PEER_change_rc (cp->pid, -1);
GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
if (NULL != cp->cth)
@@ -1231,7 +1414,7 @@
/**
- * Transmit the given message by copying it to the target buffer
+ * Transmit messages by copying it to the target buffer
* "buf". "buf" will be NULL and "size" zero if the socket was closed
* for writing in the meantime. In that case, do nothing
* (the disconnect or shutdown handler will take care of the rest).
@@ -1251,7 +1434,11 @@
char *cbuf = buf;
struct GNUNET_PeerIdentity pid;
struct PendingMessage *pm;
+ struct MigrationReadyBlock *mb;
+ struct MigrationReadyBlock *next;
+ struct PutMessage migm;
size_t msize;
+ unsigned int i;
cp->cth = NULL;
if (NULL == buf)
@@ -1283,6 +1470,44 @@
&transmit_to_peer,
cp);
}
+ else
+ {
+ next = mig_head;
+ while (NULL != (mb = next))
+ {
+ next = mb->next;
+ for (i=0;i<MIGRATION_LIST_SIZE;i++)
+ {
+ if ( (cp->pid == mb->target_list[i]) &&
+ (mb->size + sizeof (migm) <= size) )
+ {
+ GNUNET_PEER_change_rc (mb->target_list[i], -1);
+ mb->target_list[i] = 0;
+ mb->used_targets++;
+ migm.header.size = htons (sizeof (migm) + mb->size);
+ migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+ migm.type = htonl (mb->type);
+ migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+ memcpy (&cbuf[msize], &migm, sizeof (migm));
+ msize += sizeof (migm);
+ size -= sizeof (migm);
+ memcpy (&cbuf[msize], &mb[1], mb->size);
+ msize += mb->size;
+ size -= mb->size;
+ break;
+ }
+ }
+ if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+ (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size
(connected_peers)) )
+ {
+ delete_migration_block (mb);
+ consider_migration_gathering ();
+ }
+ }
+ consider_migration (NULL,
+ &pid.hashPubKey,
+ cp);
+ }
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting %u bytes to peer %u\n",
@@ -1330,20 +1555,19 @@
cp->pending_requests++;
if (cp->pending_requests > MAX_QUEUE_PER_PEER)
destroy_pending_message (cp->pending_messages_tail, 0);
+ GNUNET_PEER_resolve (cp->pid, &pid);
+ if (NULL != cp->cth)
+ GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
+ /* need to schedule transmission */
+ cp->cth = GNUNET_CORE_notify_transmit_ready (core,
+
cp->pending_messages_head->priority,
+ MAX_TRANSMIT_DELAY,
+ &pid,
+ cp->pending_messages_head->msize,
+ &transmit_to_peer,
+ cp);
if (cp->cth == NULL)
{
- /* need to schedule transmission */
- GNUNET_PEER_resolve (cp->pid, &pid);
- cp->cth = GNUNET_CORE_notify_transmit_ready (core,
-
cp->pending_messages_head->priority,
- MAX_TRANSMIT_DELAY,
- &pid,
-
cp->pending_messages_head->msize,
- &transmit_to_peer,
- cp);
- }
- if (cp->cth == NULL)
- {
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to schedule transmission with core!\n");
@@ -2095,9 +2319,14 @@
}
else
{
+ if (NULL != prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff) %
CS2P_SUCCESS_LIST_SIZE])
+ GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+ [(prq->sender->last_client_replies_woff)
% CS2P_SUCCESS_LIST_SIZE]);
prq->sender->last_client_replies
[(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
= pr->client_request_list->client_list->client;
+ GNUNET_SERVER_client_keep
(pr->client_request_list->client_list->client);
}
}
GNUNET_CRYPTO_hash (prq->data,
@@ -2255,12 +2484,10 @@
memcpy (&pm[1], prq->data, prq->size);
add_to_pending_messages_for_peer (cp, reply, pr);
}
- // FIXME: implement hot-path routing statistics keeping!
return GNUNET_YES;
}
-
/**
* Continuation called to notify client about result of the
* operation.
@@ -2586,18 +2813,13 @@
GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
return;
}
- prq.type = type;
- prq.priority = priority;
- process_reply (&prq, key, pr);
-
if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
(type == GNUNET_BLOCK_TYPE_IBLOCK) )
{
if (pr->qe != NULL)
GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
- return;
}
- if ( (pr->client_request_list == NULL) &&
+ else if ( (pr->client_request_list == NULL) &&
( (GNUNET_YES == test_load_too_high()) ||
(pr->results_found > 5 + 2 * pr->priority) ) )
{
@@ -2611,10 +2833,12 @@
GNUNET_NO);
if (pr->qe != NULL)
GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
- return;
}
- if (pr->qe != NULL)
+ else if (pr->qe != NULL)
GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ prq.type = type;
+ prq.priority = priority;
+ process_reply (&prq, key, pr);
}
@@ -3221,11 +3445,7 @@
}
/* FIXME: distinguish between sending and storing in options? */
if (active_migration)
- {
- mig_task = GNUNET_SCHEDULER_add_now (sched,
- &gather_migration_blocks,
- NULL);
- }
+ consider_migration_gathering ();
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,
NULL);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r11392 - in gnunet: . src/datastore src/fs,
gnunet <=