[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r179 - in GNUnet: . src/applications/fs/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r179 - in GNUnet: . src/applications/fs/module |
Date: |
Fri, 4 Feb 2005 07:29:29 -0800 (PST) |
Author: grothoff
Date: 2005-02-04 07:29:26 -0800 (Fri, 04 Feb 2005)
New Revision: 179
Modified:
GNUnet/src/applications/fs/module/fs.c
GNUnet/todo
Log:
FS-DHT integration done
Modified: GNUnet/src/applications/fs/module/fs.c
===================================================================
--- GNUnet/src/applications/fs/module/fs.c 2005-02-04 13:00:22 UTC (rev
178)
+++ GNUnet/src/applications/fs/module/fs.c 2005-02-04 15:29:26 UTC (rev
179)
@@ -25,9 +25,6 @@
*
* FS CORE. This is the code that is plugged into the GNUnet core to
* enable File Sharing.
- *
- * TODO:
- * - DHT integration (will have to modify DHT API, too!)
*/
#include "platform.h"
@@ -86,6 +83,15 @@
*/
#define MAX_MIGRATION_EXP (1L * cronMONTHS)
+typedef struct {
+ struct DHT_GET_RECORD * rec;
+ unsigned int prio;
+} DHT_GET_CLS;
+
+typedef struct {
+ struct DHT_PUT_RECORD * rec;
+} DHT_PUT_CLS;
+
/**
* Global core API.
*/
@@ -113,8 +119,95 @@
static Mutex lock;
+/**
+ * ID of the FS table in the DHT infrastructure.
+ */
+static DHT_TableId dht_table;
/**
+ * Store an item in the datastore.
+ *
+ * @param key the key of the item
+ * @param value the value to store
+ * @param prio how much does our routing code value
+ * this datum?
+ * @return OK if the value could be stored,
+ * NO if the value verifies but is not stored,
+ * SYSERR if the value is malformed
+ */
+static int gapPut(void * closure,
+ const HashCode160 * key,
+ const DataContainer * value,
+ unsigned int prio) {
+ Datastore_Value * dv;
+ GapWrapper * gw;
+ unsigned int size;
+ int ret;
+ HashCode160 hc;
+ cron_t et;
+ cron_t now;
+
+ if (ntohl(value->size) < sizeof(GapWrapper)) {
+ BREAK();
+ return SYSERR;
+ }
+ gw = (GapWrapper*) value;
+ size = ntohl(gw->dc.size)
+ - sizeof(GapWrapper)
+ + sizeof(Datastore_Value);
+ if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
+ (char*)&gw[1],
+ &hc)) ||
+ (! equalsHashCode160(&hc, key)) ) {
+ BREAK(); /* value failed verification! */
+ return SYSERR;
+ }
+
+ dv = MALLOC(size);
+ dv->size = htonl(size);
+ dv->type = gw->type;
+ dv->prio = htonl(prio);
+ dv->anonymityLevel = htonl(0);
+ et = ntohll(gw->timeout);
+ cronTime(&now);
+ /* bound ET to MAX_MIGRATION_EXP from now */
+ if (et > now) {
+ et -= now;
+ et = et % MAX_MIGRATION_EXP;
+ et += now;
+ }
+ dv->expirationTime = htonll(et);
+ memcpy(&dv[1],
+ &gw[1],
+ size - sizeof(Datastore_Value));
+ processResponse(key, dv);
+ ret = datastore->putUpdate(key,
+ dv);
+ FREE(dv);
+ return ret;
+}
+
+static int get_result_callback(const HashCode160 * key,
+ const DataContainer * value,
+ DHT_GET_CLS * cls) {
+ gapPut(NULL,
+ key,
+ value,
+ cls->prio);
+ return OK;
+}
+
+static void get_complete_callback(DHT_GET_CLS * cls) {
+ dht->get_stop(cls->rec);
+ FREE(cls);
+}
+
+static void put_complete_callback(DHT_PUT_CLS * cls) {
+ dht->put_stop(cls->rec);
+ FREE(cls);
+}
+
+/**
* Process a query from the client. Forwards to the network.
*
* @return SYSERR if the TCP connection should be closed, otherwise OK
@@ -122,6 +215,7 @@
static int csHandleRequestQueryStart(ClientHandle sock,
const CS_HEADER * req) {
RequestSearch * rs;
+ unsigned int keyCount;
if (ntohs(req->size) < sizeof(RequestSearch)) {
BREAK();
@@ -129,14 +223,28 @@
}
rs = (RequestSearch*) req;
trackQuery(&rs->query[0], sock);
+ keyCount = 1 + (ntohs(req->size) - sizeof(RequestSearch)) /
sizeof(HashCode160);
gap->get_start(ntohl(rs->type),
ntohl(rs->anonymityLevel),
- 1 + (ntohs(req->size) - sizeof(RequestSearch)) /
sizeof(HashCode160),
+ keyCount,
&rs->query[0],
ntohll(rs->expiration),
ntohl(rs->prio));
- if (ntohl(rs->anonymityLevel) == 0) {
- /* FIXME: query(rs); -- pass to dht! */
+ if ( (ntohl(rs->anonymityLevel) == 0) &&
+ (dht != NULL) ) {
+ DHT_GET_CLS * cls;
+
+ cls = MALLOC(sizeof(DHT_GET_CLS));
+ cls->prio = ntohl(rs->prio);
+ cls->rec = dht->get_start(&dht_table,
+ ntohl(rs->type),
+ keyCount,
+ &rs->query[0],
+ ntohll(rs->expiration),
+ (DataProcessor) &get_result_callback,
+ cls,
+ (DHT_OP_Complete) &get_complete_callback,
+ cls);
}
return OK;
}
@@ -157,7 +265,7 @@
}
rs = (RequestSearch*) req;
if (ntohl(rs->anonymityLevel) == 0) {
- /* FIXME: cancel with dht */
+ /* FIXME 0.7.1: cancel with dht? */
}
gap->get_stop(ntohl(rs->type),
1 + (ntohs(req->size) - sizeof(RequestSearch)) /
sizeof(HashCode160),
@@ -177,6 +285,7 @@
Datastore_Value * datum;
int ret;
HashCode160 query;
+ unsigned int type;
if (ntohs(req->size) < sizeof(RequestIndex)) {
BREAK();
@@ -195,14 +304,47 @@
FREE(datum);
return SYSERR;
}
- datum->type = htonl(getTypeOfBlock(ntohs(ri->header.size) -
sizeof(RequestInsert),
- &ri[1]));
+ type = getTypeOfBlock(ntohs(ri->header.size) - sizeof(RequestInsert),
+ &ri[1]);
+ datum->type = htonl(type);
MUTEX_LOCK(&lock);
ret = datastore->put(&query,
datum);
MUTEX_UNLOCK(&lock);
if (ntohl(ri->anonymityLevel) == 0) {
- /* do DHT put! */
+ GapWrapper * gw;
+ unsigned int size;
+ cron_t now;
+ cron_t et;
+ DHT_PUT_CLS * cls;
+
+ size = sizeof(GapWrapper) +
+ ntohs(ri->header.size) - sizeof(RequestInsert) -
+ sizeof(Datastore_Value);
+ gw = MALLOC(size);
+ gw->dc.size = htonl(size);
+ gw->type = htonl(type);
+ et = ntohll(ri->expiration);
+ /* expiration time normalization and randomization */
+ cronTime(&now);
+ if (et > now) {
+ et -= now;
+ et = et % MAX_MIGRATION_EXP;
+ if (et > 0)
+ et = randomi(et);
+ et = et + now;
+ }
+ gw->timeout = htonll(et);
+ memcpy(&gw[1],
+ &ri[1],
+ size - sizeof(GapWrapper));
+ cls = MALLOC(sizeof(DHT_PUT_CLS));
+ cls->rec = dht->put_start(&dht_table,
+ &query,
+ 15 * cronSECONDS, /* FIXME 0.7.1: better timeout
for DHT PUT operation */
+ &gw->dc,
+ (DHT_OP_Complete) &put_complete_callback,
+ cls);
}
FREE(datum);
@@ -509,69 +651,6 @@
}
/**
- * Store an item in the datastore.
- *
- * @param key the key of the item
- * @param value the value to store
- * @param prio how much does our routing code value
- * this datum?
- * @return OK if the value could be stored,
- * NO if the value verifies but is not stored,
- * SYSERR if the value is malformed
- */
-static int gapPut(void * closure,
- const HashCode160 * key,
- const DataContainer * value,
- unsigned int prio) {
- Datastore_Value * dv;
- GapWrapper * gw;
- unsigned int size;
- int ret;
- HashCode160 hc;
- cron_t et;
- cron_t now;
-
- if (ntohl(value->size) < sizeof(GapWrapper)) {
- BREAK();
- return SYSERR;
- }
- gw = (GapWrapper*) value;
- size = ntohl(gw->dc.size)
- - sizeof(GapWrapper)
- + sizeof(Datastore_Value);
- if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
- (char*)&gw[1],
- &hc)) ||
- (! equalsHashCode160(&hc, key)) ) {
- BREAK(); /* value failed verification! */
- return SYSERR;
- }
-
- dv = MALLOC(size);
- dv->size = htonl(size);
- dv->type = gw->type;
- dv->prio = htonl(prio);
- dv->anonymityLevel = htonl(0);
- et = ntohll(gw->timeout);
- cronTime(&now);
- /* bound ET to MAX_MIGRATION_EXP from now */
- if (et > now) {
- et -= now;
- et = et % MAX_MIGRATION_EXP;
- et += now;
- }
- dv->expirationTime = htonll(et);
- memcpy(&dv[1],
- &gw[1],
- size - sizeof(Datastore_Value));
- processResponse(key, dv);
- ret = datastore->putUpdate(key,
- dv);
- FREE(dv);
- return ret;
-}
-
-/**
* Remove an item from the datastore.
*
* @param key the key of the item
@@ -599,7 +678,102 @@
return SYSERR;
}
+
/**
+ * Callback that converts the Datastore_Value values
+ * from the datastore to Blockstore values for the
+ * DHT routing protocol.
+ */
+static int dhtGetConverter(const HashCode160 * key,
+ const Datastore_Value * value,
+ void * cls) {
+ GGC * ggc = (GGC*) cls;
+ GapWrapper * gw;
+ int ret;
+ unsigned int size;
+ cron_t et;
+ cron_t now;
+
+ ret = isDatumApplicable(ntohl(value->type),
+ ntohl(value->size) - sizeof(Datastore_Value),
+ (char*) &value[1],
+ ggc->keyCount,
+ ggc->keys);
+ if (ret == SYSERR)
+ return SYSERR; /* no query will ever match */
+ if (ret == NO)
+ return OK; /* Additional filtering based on type;
+ i.e., namespace request and namespace
+ in reply does not match namespace in query */
+ size = sizeof(GapWrapper) +
+ ntohl(value->size) -
+ sizeof(Datastore_Value);
+
+ if (ntohl(value->anonymityLevel) != 0)
+ return OK;
+
+ gw = MALLOC(size);
+ gw->dc.size = htonl(size);
+ gw->type = value->type;
+ et = ntohll(value->expirationTime);
+ /* expiration time normalization and randomization */
+ cronTime(&now);
+ if (et > now) {
+ et -= now;
+ et = et % MAX_MIGRATION_EXP;
+ if (et > 0)
+ et = randomi(et);
+ et = et + now;
+ }
+ gw->timeout = htonll(et);
+ memcpy(&gw[1],
+ &value[1],
+ size - sizeof(GapWrapper));
+
+ if (ggc->resultCallback != NULL)
+ ret = ggc->resultCallback(key,
+ &gw->dc,
+ ggc->resCallbackClosure);
+ else
+ ret = OK;
+ FREE(gw);
+ return ret;
+}
+
+/**
+ * Lookup an item in the datastore.
+ *
+ * @param key the value to lookup
+ * @param resultCallback function to call for each result that was found
+ * @param resCallbackClosure extra argument to resultCallback
+ * @return number of results, SYSERR on error
+ */
+static int dhtGet(void * closure,
+ unsigned int type,
+ unsigned int prio,
+ unsigned int keyCount,
+ const HashCode160 * keys,
+ DataProcessor resultCallback,
+ void * resCallbackClosure) {
+ int ret;
+ GGC myClosure;
+
+ myClosure.keyCount = keyCount;
+ myClosure.keys = keys;
+ myClosure.resultCallback = resultCallback;
+ myClosure.resCallbackClosure = resCallbackClosure;
+ ret = datastore->get(&keys[0],
+ type,
+ &dhtGetConverter,
+ &myClosure);
+ if (ret != SYSERR)
+ ret = myClosure.count; /* return number of actual
+ results (unfiltered) that
+ were found */
+ return ret;
+}
+
+/**
* Initialize the FS module. This method name must match
* the library name (libgnunet_XXX => initialize_XXX).
*
@@ -607,7 +781,11 @@
*/
int initialize_module_fs(CoreAPIForApplication * capi) {
static Blockstore dsGap;
+ static Blockstore dsDht;
+ hash("GNUNET_FS",
+ strlen("GNUNET_FS"),
+ &dht_table);
if (getConfigurationInt("AFS",
"DISKQUOTA") <= 0) {
LOG(LOG_ERROR,
@@ -627,7 +805,8 @@
capi->releaseService(datastore);
return SYSERR;
}
- dht = capi->requestService("dht");
+ // dht = capi->requestService("dht");
+ dht = NULL;
coreAPI = capi;
MUTEX_CREATE(&lock);
@@ -639,7 +818,14 @@
initQueryManager(capi);
gap->init(&dsGap);
- /* if (dht != NULL) dht->join(&dsDht, &table);*/
+ if (dht != NULL) {
+ dsDht.closure = NULL;
+ dsDht.get = &dhtGet;
+ dsDht.put = &gapPut; /* exactly the same method for gap/dht*/
+ dsDht.del = &gapDel; /* exactly the same method for gap/dht*/
+ dsDht.iterate = &gapIterate; /* exactly the same method for gap/dht*/
+ dht->join(&dsDht, &dht_table);
+ }
LOG(LOG_DEBUG,
_("'%s' registering client handlers %d %d %d %d %d %d %d %d %d\n"),
@@ -676,6 +862,15 @@
void done_module_fs() {
doneMigration();
+ if (dht != NULL) {
+ LOG(LOG_INFO,
+ "Leaving DHT (this may take a while).");
+ dht->leave(&dht_table,
+ 15 * cronSECONDS);
+ LOG(LOG_INFO,
+ "Leaving DHT complete.");
+
+ }
GNUNET_ASSERT(SYSERR !=
coreAPI->unregisterClientHandler(AFS_CS_PROTO_QUERY_START,
&csHandleRequestQueryStart));
GNUNET_ASSERT(SYSERR !=
coreAPI->unregisterClientHandler(AFS_CS_PROTO_QUERY_STOP,
@@ -692,7 +887,6 @@
&csHandleRequestTestIndexed));
GNUNET_ASSERT(SYSERR !=
coreAPI->unregisterClientHandler(AFS_CS_PROTO_GET_AVG_PRIORITY,
&csHandleRequestGetAvgPriority));
- /* dht->leave(&table, timeout); */
doneQueryManager();
coreAPI->releaseService(datastore);
datastore = NULL;
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2005-02-04 13:00:22 UTC (rev 178)
+++ GNUnet/todo 2005-02-04 15:29:26 UTC (rev 179)
@@ -34,9 +34,6 @@
- Missing Features:
* topology: do aggressive bootstrap on first start (Christian) [ easy ]
* ecrs-unindex: code cleanup [ easy ]
- * fix dht routing service
- - make dht respect new dht API (long way to go)
- - fs-dht integration [ difficult ]
* configure.ac: flags for mysql, gmp, libgcrypt should ONLY be passed when
linking the respective modules / libraries (gnunet_util, sqstore_mysql) [
tricky ]
- Features removed but to be revived:
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r179 - in GNUnet: . src/applications/fs/module,
grothoff <=