[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r4128 - in GNUnet: . src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r4128 - in GNUnet: . src/applications/dht/module |
Date: |
Sat, 30 Dec 2006 07:10:20 -0800 (PST) |
Author: grothoff
Date: 2006-12-30 07:10:17 -0800 (Sat, 30 Dec 2006)
New Revision: 4128
Modified:
GNUnet/src/applications/dht/module/routing.c
GNUnet/todo
Log:
addressing routing issues
Modified: GNUnet/src/applications/dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dht/module/routing.c 2006-12-30 14:51:14 UTC
(rev 4127)
+++ GNUnet/src/applications/dht/module/routing.c 2006-12-30 15:10:17 UTC
(rev 4128)
@@ -23,16 +23,11 @@
* @brief state for active DHT routing operations
* @author Christian Grothoff
*
- * RC:
- * - add support for GET retry (or delayed initial GET ops)
- * - fix problem with possible GET/PUT routing loops!
- * (not convinced that current design even probabilistically
- * prevents loops; also check for routing loops by
- * checking pending queries)
- *
* LATER:
* - prioritization
* - delay selection
+ * - implement extra_get_callback
+ * - add similar callback for discovery in table.c
*/
#include "platform.h"
@@ -47,9 +42,14 @@
/**
* @brief record used for sending response back
*/
-typedef struct {
+typedef struct DHT_Source_Route {
/**
+ * This is a linked list.
+ */
+ struct DHT_Source_Route * next;
+
+ /**
* Source of the request. Replies should be forwarded to
* this peer.
*/
@@ -73,19 +73,19 @@
MESSAGE_HEADER header;
/**
- * Type of the requested content
+ * Type of the requested content (NBO)
*/
unsigned int type;
/**
- * Priority of requested content
+ * Priority of requested content (NBO)
*/
unsigned int prio;
/**
- * Reserved (for now, always zero)
+ * Relative time to live in cronMILLIS (NBO)
*/
- unsigned int reserved;
+ int ttl;
/**
* Search key.
@@ -104,12 +104,12 @@
MESSAGE_HEADER header;
/**
- * Type of the content
+ * Type of the content (NBO)
*/
unsigned int type;
/**
- * When to discard the content (relative time)
+ * When to discard the content (relative time, NBO)
*/
cron_t timeout;
@@ -130,7 +130,7 @@
MESSAGE_HEADER header;
/**
- * Type of the content
+ * Type of the content (NBO)
*/
unsigned int type;
@@ -144,12 +144,17 @@
/**
* Entry in the DHT routing table.
*/
-typedef struct {
+typedef struct DHTQueryRecord {
/**
+ * When do we stop forwarding this request?
+ */
+ cron_t expires;
+
+ /**
* Information about where to send the results back to.
*/
- DHT_Source_Route source;
+ DHT_Source_Route * sources;
/**
* GET message of this record.
@@ -166,19 +171,26 @@
* Number of entries in results.
*/
unsigned int result_count;
-
} DHTQueryRecord;
-static unsigned int rt_size;
+/**
+ * How far into the future can requests continue?
+ * Note that this also caps the frequency of how
+ * often peers will re-issue requests.
+ */
+#define MAX_TTL (5 * cronMINUTES)
-static unsigned int rt_pos;
-
/**
- * rt_size records of active queries
+ * Linked list of active records.
*/
static DHTQueryRecord ** records;
/**
+ * Size of records
+ */
+static unsigned int rt_size;
+
+/**
* Statistics service.
*/
static Stats_ServiceAPI * stats;
@@ -214,6 +226,7 @@
DHT_RESULT_MESSAGE * result;
unsigned int routed;
unsigned int tracked;
+ DHT_Source_Route * pos;
if (cls != NULL) {
result = cls;
@@ -258,33 +271,37 @@
q->result_count + 1);
routed++;
q->results[q->result_count-1] = hc;
- if (0 != memcmp(&q->source.source,
- coreAPI->myIdentity,
- sizeof(PeerIdentity))) {
+ pos = q->sources;
+ while (pos != NULL) {
+ if (0 != memcmp(&pos->source,
+ coreAPI->myIdentity,
+ sizeof(PeerIdentity))) {
#if DEBUG_ROUTING
- GE_LOG(coreAPI->ectx,
- GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "Routing result to other peer\n");
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+ "Routing result to other peer\n");
#endif
- coreAPI->unicast(&q->source.source,
- &result->header,
- 0, /* FIXME: priority */
- 5 * cronSECONDS); /* FIXME */
- if (stats != NULL)
- stats->change(stat_replies_routed, 1);
- } else if (q->source.receiver != NULL) {
+ coreAPI->unicast(&pos->source,
+ &result->header,
+ 0, /* FIXME: priority */
+ 5 * cronSECONDS); /* FIXME */
+ if (stats != NULL)
+ stats->change(stat_replies_routed, 1);
+ } else if (pos->receiver != NULL) {
#if DEBUG_ROUTING
- GE_LOG(coreAPI->ectx,
- GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "Routing result to local client\n");
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+ "Routing result to local client\n");
#endif
- q->source.receiver(key,
- type,
- size,
- data,
- q->source.receiver_closure);
- if (stats != NULL)
- stats->change(stat_replies_routed, 1);
+ pos->receiver(key,
+ type,
+ size,
+ data,
+ pos->receiver_closure);
+ if (stats != NULL)
+ stats->change(stat_replies_routed, 1);
+ }
+ pos = pos->next;
}
}
MUTEX_UNLOCK(lock);
@@ -299,35 +316,84 @@
FREE(result);
}
-static void addRoute(const PeerIdentity * sender,
- ResultHandler handler,
- void * cls,
- const DHT_GET_MESSAGE * get) {
+/**
+ * @return OK if route was added, SYSERR if not
+ */
+static int addRoute(const PeerIdentity * sender,
+ ResultHandler handler,
+ void * cls,
+ const DHT_GET_MESSAGE * get) {
DHTQueryRecord * q;
+ unsigned int i;
+ unsigned int rt_pos;
+ cron_t expire;
+ cron_t now;
+ int ttl;
+ struct DHT_Source_Route * pos;
+ ttl = ntohl(get->ttl);
+ if (ttl > MAX_TTL)
+ ttl = 0; /* implausibly high */
+ now = get_time();
+ expire = now + ttl;
MUTEX_LOCK(lock);
- if (records[rt_pos] != NULL) {
+ rt_pos = rt_size;
+ for (i=0;i<rt_size;i++) {
+ if ( (sender != NULL) &&
+ (records[i] != NULL) &&
+ (0 == memcmp(&records[i]->get->key,
+ &get->key,
+ sizeof(HashCode512))) &&
+ (records[i]->get->type == get->type) &&
+ (records[i]->expires > now - MAX_TTL) ) {
+ /* do not route, same request already (recently)
+ active (possibly from other initiator) */
+ /* FIXME: support sending replies back to
+ multiple peers!? */
+ MUTEX_UNLOCK(lock);
+ return SYSERR;
+ }
+ if (records[i] == NULL) {
+ records[i] = MALLOC(sizeof(DHTQueryRecord));
+ records[i]->get = NULL;
+ rt_pos = i;
+ expire = 0;
+ } else if (records[i]->expires < expire) {
+ expire = records[i]->expires;
+ rt_pos = i;
+ }
+ }
+ if (rt_pos == rt_size) {
+ /* do not route, expiration time too high */
+ MUTEX_UNLOCK(lock);
+ return SYSERR;
+ }
+ if (records[rt_pos]->get != NULL) {
FREE(records[rt_pos]->get);
- GROW(records[rt_pos]->results,
- records[rt_pos]->result_count,
- 0);
- } else {
- records[rt_pos] = MALLOC(sizeof(DHTQueryRecord));
+ while (records[rt_pos]->sources != NULL) {
+ pos = records[rt_pos]->sources;
+ records[rt_pos]->sources = pos->next;
+ FREE(pos);
+ }
}
q = records[rt_pos];
memset(q,
0,
sizeof(DHTQueryRecord));
+ q->expires = now + ttl;
q->get = MALLOC(ntohs(get->header.size));
memcpy(q->get,
get,
ntohs(get->header.size));
+ pos = MALLOC(sizeof(DHT_Source_Route));
+ pos->next = q->sources;
+ q->sources = pos;
if (sender != NULL)
- q->source.source = *sender;
+ pos->source = *sender;
else
- q->source.source = *coreAPI->myIdentity;
- q->source.receiver = handler;
- q->source.receiver_closure = cls;
+ pos->source = *coreAPI->myIdentity;
+ pos->receiver = handler;
+ pos->receiver_closure = cls;
#if DEBUG_ROUTING
GE_LOG(coreAPI->ectx,
GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
@@ -338,6 +404,7 @@
MUTEX_UNLOCK(lock);
if (stats != NULL)
stats->change(stat_requests_routed, 1);
+ return OK;
}
/**
@@ -354,7 +421,9 @@
const MESSAGE_HEADER * msg) {
PeerIdentity next[GET_TRIES];
const DHT_GET_MESSAGE * get;
+ DHT_GET_MESSAGE aget;
int total;
+ int ttl;
int i;
#if DEBUG_ROUTING
EncName enc;
@@ -374,11 +443,12 @@
#endif
if (stats != NULL)
stats->change(stat_get_requests_received, 1);
- if (sender != NULL)
- addRoute(sender,
- NULL,
- NULL,
- get);
+ if ( (sender != NULL) &&
+ (OK != addRoute(sender,
+ NULL,
+ NULL,
+ get)) )
+ return OK; /* could not route */
total = dht_store_get(&get->key,
ntohl(get->type),
&routeResult,
@@ -393,6 +463,7 @@
#endif
return OK;
}
+ total = 0;
for (i=0;i<GET_TRIES;i++) {
if (OK != select_dht_peer(&next[i],
&get->key,
@@ -402,6 +473,15 @@
if (-1 == hashCodeCompareDistance(&next[i].hashPubKey,
&coreAPI->myIdentity->hashPubKey,
&get->key)) {
+ if (total == 0) {
+ aget = *get;
+ ttl = ntohl(get->ttl);
+ if (ttl > MAX_TTL)
+ ttl = MAX_TTL;
+ ttl -= 5 * cronSECONDS;
+ aget.ttl = htonl(ttl);
+ total = 1;
+ }
coreAPI->unicast(&next[i],
msg,
0, /* FIXME: priority */
@@ -441,7 +521,8 @@
stats->change(stat_put_requests_received, 1);
put = (const DHT_PUT_MESSAGE*) msg;
#if DEBUG_ROUTING
- hash2enc(&put->key, &enc);
+ hash2enc(&put->key,
+ &enc);
GE_LOG(coreAPI->ectx,
GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"Received DHT PUT for key `%s'.\n",
@@ -539,14 +620,14 @@
get.header.type = htons(P2P_PROTO_DHT_GET);
get.type = htonl(type);
get.prio = htonl(0); /* FIXME */
- get.reserved = htonl(0);
+ get.ttl = htonl(MAX_TTL); /* FIXME? */
get.key = *key;
- addRoute(NULL,
- handler,
- cls,
- &get);
- handleGet(NULL,
- &get.header);
+ if (OK == addRoute(NULL,
+ handler,
+ cls,
+ &get))
+ handleGet(NULL,
+ &get.header);
}
/**
@@ -558,21 +639,39 @@
ResultHandler handler,
void * cls) {
int i;
+ struct DHT_Source_Route * pos;
+ struct DHT_Source_Route * prev;
+ int done;
+ done = NO;
MUTEX_LOCK(lock);
for (i=0;i<rt_size;i++) {
if (records[i] == NULL)
continue;
- if ( (records[i]->source.receiver == handler) &&
- (records[i]->source.receiver_closure == cls) &&
- (0 == memcmp(key,
- &records[i]->get->key,
- sizeof(HashCode512))) ) {
+ prev = NULL;
+ pos = records[i]->sources;
+ while (pos != NULL) {
+ if ( (pos->receiver == handler) &&
+ (pos->receiver_closure == cls) &&
+ (0 == memcmp(key,
+ &records[i]->get->key,
+ sizeof(HashCode512))) ) {
+ if (prev == NULL)
+ records[i]->sources = pos->next;
+ else
+ prev->next = pos->next;
+ FREE(pos);
+ done = YES;
+ break;
+ }
+ }
+ if (records[i]->sources == NULL) {
FREE(records[i]->get);
FREE(records[i]);
records[i] = NULL;
+ }
+ if (done == YES)
break;
- }
}
MUTEX_UNLOCK(lock);
}
@@ -608,6 +707,21 @@
}
/**
+ * We have additional "free" bandwidth available.
+ * Possibly find a good query to add to the message
+ * to the given receiver.
+ *
+ * @param padding maximum number of bytes available
+ * @return number of bytes added at position
+ */
+static unsigned int
+extra_get_callback(const PeerIdentity * receiver,
+ void * position,
+ unsigned int padding) {
+ return 0;
+}
+
+/**
* Initialize routing DHT component.
*
* @param capi the core API
@@ -650,6 +764,8 @@
&handlePut);
coreAPI->registerHandler(P2P_PROTO_DHT_RESULT,
&handleResult);
+ coreAPI->registerSendCallback(sizeof(DHT_GET_MESSAGE),
+ &extra_get_callback);
return OK;
}
@@ -661,6 +777,8 @@
int done_dht_routing() {
unsigned int i;
+ coreAPI->unregisterSendCallback(sizeof(DHT_GET_MESSAGE),
+ &extra_get_callback);
coreAPI->unregisterHandler(P2P_PROTO_DHT_GET,
&handleGet);
coreAPI->unregisterHandler(P2P_PROTO_DHT_PUT,
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-12-30 14:51:14 UTC (rev 4127)
+++ GNUnet/todo 2006-12-30 15:10:17 UTC (rev 4128)
@@ -21,12 +21,9 @@
0.7.2 [3'07]:
- new features:
* XFS / support for location URIs [CG]
- + dht/routing: handle routing loops [RC]
+ ECRS-URI: toString/fromString for loc URIs [RC]
- + dht/gap integration [RC]
- + fsui/location URI support [RC]
- + dht/routing: GET retries (optimization)
- + dstore bloomfilter (optimization)
+ + dht/gap integration (search routing) [RC]
+ + fsui/fs/location URI support (download routing) [RC]
* HTTP transport (libcurl, libmicrohttpd)
- minor improvements:
* directories can be compacted -- add heuristic to determine
@@ -47,11 +44,11 @@
- insert meta-data under hash (md5? sha1? sha-512? GNUnet-URI?)
as keyword (to allow getting meta-data from URI only)
- Chat support basics [RC]
-- better NAT traversal:
- * NAT-PMP (in addition to UPnP)
- old/new features:
* SMTP transport (libesmtp)
* SMTP logger
+ * support NAT-PMP (in addition to UPnP)?
+ * add bloomfilter to dstore?
- Documentation:
* LJ article
- Testcases:
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r4128 - in GNUnet: . src/applications/dht/module,
grothoff <=