[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r5126 - GNUnet/src/applications/fs/module
From: |
gnunet |
Subject: |
[GNUnet-SVN] r5126 - GNUnet/src/applications/fs/module |
Date: |
Sat, 23 Jun 2007 03:41:27 -0600 (MDT) |
Author: grothoff
Date: 2007-06-23 03:41:27 -0600 (Sat, 23 Jun 2007)
New Revision: 5126
Modified:
GNUnet/src/applications/fs/module/dht_push.c
GNUnet/src/applications/fs/module/migration.c
Log:
Experimental (!) patch to reduce CPU load by improving
migration efficiency. I want to eliminate the DHT
part from the measurement, so that thread is deactivated.
Now I want to find out just how much this patch improves
things. Feedback welcome.
Modified: GNUnet/src/applications/fs/module/dht_push.c
===================================================================
--- GNUnet/src/applications/fs/module/dht_push.c 2007-06-22 09:39:24 UTC
(rev 5125)
+++ GNUnet/src/applications/fs/module/dht_push.c 2007-06-23 09:41:27 UTC
(rev 5126)
@@ -31,6 +31,16 @@
#include "gnunet_sqstore_service.h"
/**
+ * Disable DHT pushing? Set to 1 to essentially disable
+ * the code in this file. Used to study its performance
+ * impact. Useful also for users that do not want to
+ * use non-anonymous file-sharing (since it eliminates
+ * some of the processing cost which would otherwise go
+ * to waste).
+ */
+#define NO_PUSH 1
+
+/**
* DHT service. Set to NULL to terminate
*/
static DHT_ServiceAPI * dht;
@@ -75,6 +85,8 @@
delay = 6 * cronHOURS / total;
if (delay < 5 * cronSECONDS)
delay = 5 * cronSECONDS;
+ if (delay > 60 * cronSECONDS)
+ delay = 60 * cronSECONDS;
PTHREAD_SLEEP(delay);
if (dht == NULL)
return SYSERR;
@@ -121,9 +133,11 @@
if (stats != NULL)
stat_push_count
= stats->create(gettext_noop("# blocks pushed into DHT"));
- thread = PTHREAD_CREATE(&push_thread,
- NULL,
- 1024 * 64);
+ if (! NO_PUSH) {
+ thread = PTHREAD_CREATE(&push_thread,
+ NULL,
+ 1024 * 64);
+ }
}
void done_dht_push(void) {
Modified: GNUnet/src/applications/fs/module/migration.c
===================================================================
--- GNUnet/src/applications/fs/module/migration.c 2007-06-22 09:39:24 UTC
(rev 5125)
+++ GNUnet/src/applications/fs/module/migration.c 2007-06-23 09:41:27 UTC
(rev 5126)
@@ -36,6 +36,24 @@
#define DEBUG_MIGRATION NO
/**
+ * To how many peers may we migrate the same piece of content during
+ * one iteration? Higher values mean less IO, but also migration
+ * becomes quickly much less effective (everyone has the same
+ * content!). Also, numbers larger than the number of connections are
+ * simply a waste of memory.
+ */
+#define MAX_RECEIVERS 16
+
+/**
+ * How many migration records do we keep in memory
+ * at the same time? Each record is about 32k, so
+ * 32 records will use about 1 MB of memory.
+ * We might want to allow users to specify larger
+ * values in the configuration file some day.
+ */
+#define MAX_RECORDS 32
+
+/**
* Datastore service.
*/
static Datastore_ServiceAPI * datastore;
@@ -64,6 +82,8 @@
static int stat_migration_count;
+static int stat_migration_factor;
+
static int stat_on_demand_migration_attempts;
/**
@@ -71,14 +91,15 @@
*/
static struct MUTEX * lock;
-/**
- * The content that we are currently trying
- * to migrate (used to give us more than one
- * chance should we fail for some reason the
- * first time).
- */
-static Datastore_Value * content;
-
+struct MigrationRecord {
+ Datastore_Value * value;
+ HashCode512 key;
+ unsigned int receiverIndices[MAX_RECEIVERS];
+ unsigned int sentCount;
+};
+
+static struct MigrationRecord content[MAX_RECORDS];
+
static struct GE_Context * ectx;
/**
@@ -100,30 +121,65 @@
activeMigrationCallback(const PeerIdentity * receiver,
void * position,
unsigned int padding) {
- /** key corresponding to content (if content != NULL);
- yes, must be static! */
- static HashCode512 key;
unsigned int ret;
GapWrapper * gw;
unsigned int size;
cron_t et;
cron_t now;
unsigned int anonymity;
- Datastore_Value *enc;
+ Datastore_Value * enc;
+ Datastore_Value * value;
+ unsigned int index;
+ int entry;
+ int discard_entry;
+ int discard_match;
+ int i;
+ int j;
+ int match;
+ index = coreAPI->computeIndex(receiver);
MUTEX_LOCK(lock);
- if (content != NULL) {
- size = sizeof(GapWrapper) + ntohl(content->size) - sizeof(Datastore_Value);
- if (size > padding) {
- FREE(content);
- content = NULL;
+ entry = -1;
+ discard_entry = -1;
+ discard_match = -1;
+ for (i=0;i<MAX_RECORDS;i++) {
+ if (content[i].value == NULL) {
+ discard_entry = i;
+ discard_match = MAX_RECEIVERS + 1;
+ continue;
}
+ if (ntohl(content[i].value->size) + sizeof(GapWrapper) -
sizeof(Datastore_Value) <= padding) {
+ match = 0;
+ for (j=0;j<content[i].sentCount;j++) {
+ if (content[i].receiverIndices[j] == index) {
+ match = 1;
+ break;
+ }
+ }
+ if (match == 0) {
+ /* TODO: consider key proximity in matching as
+ well! */
+ entry = i;
+ break;
+ } else {
+ if (content[i].sentCount > discard_match) {
+ discard_match = content[i].sentCount;
+ discard_entry = i;
+ }
+ }
+ }
}
- if (content == NULL) {
+ if (entry == -1) {
+ entry = discard_entry;
+ GE_ASSERT(NULL,
+ entry != -1);
+ FREENONNULL(content[entry].value);
+ content[entry].value = NULL;
+ content[entry].sentCount = 0;
if (OK != datastore->getRandom(&receiver->hashPubKey,
padding,
- &key,
- &content,
+ &content[entry].key,
+ &content[entry].value,
0)) {
MUTEX_UNLOCK(lock);
#if DEBUG_MIGRATION
@@ -132,68 +188,73 @@
"Migration: random lookup in datastore failed.\n");
#endif
return 0;
- }
+ }
+ if (stats != NULL)
+ stats->change(stat_migration_factor, 1);
}
-
+ value = content[entry].value;
+ if (value == NULL) {
+ GE_ASSERT(NULL, 0);
+ MUTEX_UNLOCK(lock);
+ return 0;
+ }
+ size = sizeof(GapWrapper) + ntohl(value->size) - sizeof(Datastore_Value);
+ if (size > padding) {
+ MUTEX_UNLOCK(lock);
+ return 0;
+ }
#if DEBUG_MIGRATION
GE_LOG(ectx,
GE_DEBUG | GE_BULK | GE_USER,
"Migration: random lookup in datastore returned type %d.\n",
- ntohl(content->type));
+ ntohl(value->type));
#endif
- if (ntohl(content->type) == ONDEMAND_BLOCK) {
+ if (ntohl(value->type) == ONDEMAND_BLOCK) {
if (ONDEMAND_getIndexed(datastore,
- content,
- &key,
+ value,
+ &content[entry].key,
&enc) != OK) {
- FREE(content);
- content = NULL;
+ FREENONNULL(value);
+ content[entry].value = NULL;
MUTEX_UNLOCK(lock);
return 0;
}
if (stats != NULL)
stats->change(stat_on_demand_migration_attempts, 1);
-
- FREE(content);
- content = enc;
+ content[entry].value = enc;
+ FREE(value);
+ value = enc;
}
- size = sizeof(GapWrapper) + ntohl(content->size) - sizeof(Datastore_Value);
+ size = sizeof(GapWrapper) + ntohl(value->size) - sizeof(Datastore_Value);
if (size > padding) {
MUTEX_UNLOCK(lock);
-#if DEBUG_MIGRATION
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "Available content of size %u too big for available space (%u)\n",
- size,
- padding);
-#endif
return 0;
}
- et = ntohll(content->expirationTime);
+ et = ntohll(value->expirationTime);
now = get_time();
if (et > now) {
et -= now;
et = et % MAX_MIGRATION_EXP;
et += now;
}
- anonymity = ntohl(content->anonymityLevel);
+ anonymity = ntohl(value->anonymityLevel);
ret = 0;
if (anonymity == 0) {
- /* ret > 0; (if DHT succeeds) fixme for DHT */
+ value->anonymityLevel = htonl(1);
+ anonymity = 1;
}
- if ( (ret == 0) &&
- (OK == checkCoverTraffic(ectx,
- traffic,
- anonymity)) ) {
+ if (OK == checkCoverTraffic(ectx,
+ traffic,
+ anonymity)) {
gw = MALLOC(size);
gw->dc.size = htonl(size);
gw->timeout = htonll(et);
memcpy(&gw[1],
- &content[1],
+ &value[1],
size - sizeof(GapWrapper));
ret = gap->tryMigrate(&gw->dc,
- &key,
+ &content[entry].key,
position,
padding);
FREE(gw);
@@ -203,21 +264,14 @@
"gap's tryMigrate returned %u\n",
ret);
#endif
- } else {
-#if DEBUG_MIGRATION
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "Migration: anonymity requirements not satisfied.\n");
-#endif
+ if (ret != 0) {
+ content[entry].receiverIndices[content[entry].sentCount++] = index;
+ }
}
- if (ret > 0) {
- FREE(content);
- content = NULL;
- }
MUTEX_UNLOCK(lock);
if ( (ret > 0)&&
(stats != NULL) )
- stats->change(stat_migration_count, 1);
+ stats->change(stat_migration_count, 1);
GE_BREAK(NULL, ret <= padding);
return ret;
}
@@ -240,6 +294,8 @@
if (stats != NULL) {
stat_migration_count
= stats->create(gettext_noop("# blocks migrated"));
+ stat_migration_factor
+ = stats->create(gettext_noop("# blocks fetched for migration"));
stat_on_demand_migration_attempts
= stats->create(gettext_noop("# on-demand block migration attempts"));
}
@@ -247,6 +303,7 @@
}
void doneMigration() {
+ int i;
coreAPI->unregisterSendCallback(GAP_ESTIMATED_DATA_SIZE,
&activeMigrationCallback);
if (stats != NULL) {
@@ -258,8 +315,10 @@
dht = NULL;
coreAPI = NULL;
traffic = NULL;
- FREENONNULL(content);
- content = NULL;
+ for (i=0;i<MAX_RECORDS;i++) {
+ FREENONNULL(content[i].value);
+ content[i].value = NULL;
+ }
MUTEX_DESTROY(lock);
lock = NULL;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r5126 - GNUnet/src/applications/fs/module,
gnunet <=