[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r4063 - in GNUnet: . src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r4063 - in GNUnet: . src/applications/dht/module |
Date: |
Tue, 26 Dec 2006 22:49:01 -0800 (PST) |
Author: grothoff
Date: 2006-12-26 22:48:58 -0800 (Tue, 26 Dec 2006)
New Revision: 4063
Removed:
GNUnet/src/applications/dht/module/dht.c
Modified:
GNUnet/src/applications/dht/module/table.c
GNUnet/todo
Log:
byebye
Deleted: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c 2006-12-27 06:30:26 UTC (rev
4062)
+++ GNUnet/src/applications/dht/module/dht.c 2006-12-27 06:48:58 UTC (rev
4063)
@@ -1,3737 +0,0 @@
- /*
- This file is part of GNUnet
- (C) 2004, 2005, 2006 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
- */
-
-/**
- * @file module/dht.c
- * @brief definition of the entry points to the module; implements
- * the client-server application using the DHT service; the DHT
- * service is based on RPC and the DHT itself is roughly based
- * on kademlia.
- * @author Marko R�ih�, Christian Grothoff
- *
- *
- * WARNING (to self): What follows is 3.500+ lines of incomplete,
- * crazy, recursive, asynchronous, multithreaded routing code with
- * plenty of function pointers, too little documentation and not
- * enough testing. Pray to the C gods before venturing any further.
- *
- *
- * Todo:
- * 1) document (lots!)
- * 2) test & debug & complete code
- * 3) look into threading issues (deadlock? data races?)
- *
- * Desirable features:
- * 1) security: how to pick priorities? Access rights?
- * 2) performance: add optional hello messages
- * 3) allow clients to modify data that is stored/retrieved
- * on-the-fly (i.e. to implement signed paths for locations!)
- */
-
-#include "platform.h"
-#include "gnunet_util.h"
-#include "gnunet_core.h"
-#include "gnunet_rpc_service.h"
-#include "gnunet_dht_service.h"
-#include "datastore_dht_master.h"
-
-/* ********************* CONSTANTS ******************* */
-
-/**
- * Enable/disable DHT debugging output.
- */
-#define DEBUG_DHT YES
-
-#if DEBUG_DHT
-#define ENTER() GE_LOG(ectx, GE_REQUEST | GE_DEVELOPER | GE_USER | GE_DEBUG,
"Entering method %s at %s:%d.\n", __FUNCTION__, __FILE__, __LINE__)
-#else
-#define ENTER() do {} while (0)
-#endif
-
-/**
- * Number of replications / parallel requests.
- */
-#define ALPHA 7
-
-/**
- * Frequency of the DHT maintain job (trade-off between
- * more smooth traffic from the maintain job and useless
- * CPU consumption for the job going over the table doing
- * nothing).
- */
-#define DHT_MAINTAIN_FREQUENCY (15 * cronSECONDS)
-
-/**
- * How often do we do maintenance 'find' operations on
- * each table to maintain the routing table (finding
- * peers close to ourselves)?
- */
-#define DHT_MAINTAIN_FIND_FREQUENCY (2 * cronMINUTES)
-
-/**
- * How often should we notify the master-table about our
- * bucket status?
- */
-#define DHT_MAINTAIN_BUCKET_FREQUENCY (5 * cronMINUTES)
-
-/**
- * How often should we ping a peer? Only applies once we
- * are nearing the DHT_INACTIVITY_DEATH time.
- */
-#define DHT_PING_FREQUENCY (64 * DHT_MAINTAIN_FREQUENCY)
-
-/**
- * After what time do peers always expire for good?
- */
-#define DHT_INACTIVITY_DEATH (4 * DHT_PING_FREQUENCY)
-
-/**
- * For how long after the last message do we consider a peer
- * "hyperactive" and refuse to remove it from the table?
- */
-#define DHT_HYPERACTIVE_TIME (60 * cronSECONDS)
-
-/**
- * What is the trade-off factor between the number of tables that a
- * peer participates in and the additional time we give it before
- * removing it? (We may also want to take table-diversity into account
- * here, but for now just the number of tables will do). Effectively,
- * a peer with k tables more stays DHT_TABLE_FACTOR seconds longer in
- * our connection list.
- */
-#define DHT_TABLE_FACTOR (10 * cronSECONDS)
-
-/**
- * What is the CURRENT target size for buckets?
- */
-#define BUCKET_TARGET_SIZE (4 + ALPHA * tablesCount)
-
-
-/* ********************* STRUCTS ************************** */
-/* ******************and Function-Types******************** */
-
-/**
- * Per-peer information.
- */
-typedef struct {
- /**
- * What was the last time we received a message from this peer?
- */
- cron_t lastActivity;
- /**
- * What was the last time we received a table status message
- * from this peer?
- */
- cron_t lastTableRefresh;
- /**
- * What was the last time we send a PING to this peer?
- */
- cron_t lastTimePingSend;
- /**
- * In which tables do we know that peer to participate in?
- */
- DHT_TableId * tables;
- /**
- * How large is the tables array?
- */
- unsigned int tableCount;
- /**
- * What is the identity of the peer?
- */
- PeerIdentity id;
-} PeerInfo;
-
-/**
- * Peers are grouped into buckets.
- */
-typedef struct {
- /**
- * Peers in this bucket fall into the distance-range
- * (2^bstart to 2^bend].
- */
- unsigned int bstart;
-
- /**
- * Peers in this bucket fall into the distance-range
- * (2^bstart to 2^bend].
- */
- unsigned int bend;
-
- /**
- * Peers in this bucket. NULL is used if no peer is known.
- */
- struct Vector * peers; /* contains PeerInfo instances */
-} PeerBucket;
-
-/**
- * Local information about a DHT table that this peer is participating
- * in.
- */
-typedef struct {
- DHT_TableId id;
- Blockstore * store;
- /**
- * What was the last time we advertised this nodes participation in
- * this table to the master table?
- */
- cron_t lastMasterAdvertisement;
-
- /**
- * What was the last time we ran a find-node operation on
- * this table to find neighbouring peers?
- */
- cron_t lastFindOperation;
-} LocalTableData;
-
-
-/**
- * Context for callbacks used by FindNodes.
- */
-typedef struct {
- /**
- * Towards which key are we routing?
- */
- HashCode512 key;
-
- /**
- * In what table are we searching?
- */
- DHT_TableId table;
-
- /**
- * Signal used to return from findNodes when timeout has
- * expired.
- */
- struct SEMAPHORE * signal;
-
- /**
- * Number of entries in matches.
- */
- unsigned int k;
-
- /**
- * Best k matches found so far. Of size ALPHA.
- */
- HashCode512 * matches;
-
- /**
- * Number of RPCs transmitted so far (if it reaches
- * rpcRepliesExpected we can possibly abort before
- * the timeout!).
- */
- unsigned int rpcRepliesReceived;
-
- /**
- * Size of the RPC array.
- */
- unsigned int rpcRepliesExpected;
-
- /**
- * Handle for the async dht_get operation (NULL if
- * such an operation was not performed).
- */
- struct DHT_GET_RECORD * async_handle;
-
- /**
- * ASYNC RPC handles.
- */
- struct RPC_Record ** rpc;
-
- /**
- * When do we need to be done (absolute time).
- */
- cron_t timeout;
-
- /**
- * Lock for accessing this struct.
- */
- struct MUTEX * lock;
-} FindNodesContext;
-
-/**
- * Callback for findNodes that is invoked whenever a node is found.
- *
- * @param identity the identity of the node that was found
- * @return OK to continue searching, SYSERR to abort early
- */
-typedef int (*NodeFoundCallback)(const PeerIdentity * identity,
- void * closure);
-
-/**
- * Context for callbacks used by FindNodes.
- */
-typedef struct {
- /**
- * Towards which key are we routing?
- */
- HashCode512 key;
-
- /**
- * In what table are we searching?
- */
- DHT_TableId table;
-
- /**
- * Number of entries to wait for
- */
- unsigned int k;
-
- /**
- * Number of entries found so far.
- */
- unsigned int found;
-
- /**
- * Number of RPCs transmitted so far (if it reaches
- * rpcRepliesExpected we can possibly abort before
- * the timeout!).
- */
- unsigned int rpcRepliesReceived;
-
- /**
- * Size of the RPC array.
- */
- unsigned int rpcRepliesExpected;
-
- /**
- * Handle for the async dht_get operation (NULL if
- * such an operation was not performed).
- */
- struct DHT_GET_RECORD * async_handle;
-
- /**
- * ASYNC RPC handles.
- */
- struct RPC_Record ** rpc;
-
- /**
- * When do we need to be done (absolute time).
- */
- cron_t timeout;
-
- /**
- * Lock for accessing this struct.
- */
- struct MUTEX * lock;
-
- /**
- * Callback to call on the k nodes.
- */
- NodeFoundCallback callback;
-
- /**
- * Extra argument to the callback.
- */
- void * closure;
-} FindKNodesContext;
-
-/**
- * Context for async DHT_GET operation.
- */
-typedef struct DHT_GET_RECORD {
- /**
- * What is the (absolute) time of the timeout?
- */
- cron_t timeout;
-
- /**
- * In which table are we searching?
- */
- DHT_TableId table;
-
- unsigned int type;
-
- unsigned int keyCount;
-
- /**
- * What are the keys?
- */
- HashCode512 * keys;
-
- DataProcessor resultCallback;
-
- void * resultClosure;
-
- unsigned int resultsFound;
-
- /**
- * Context of findKNodes (async); NULL if the table was local.
- */
- FindKNodesContext * kfnc;
-
- DHT_OP_Complete callback;
-
- void * closure;
-
- /**
- * Size of the RPC array.
- */
- unsigned int rpcRepliesExpected;
-
- /**
- * ASYNC RPC handles.
- */
- struct RPC_Record ** rpc;
-
- /**
- * Lock for concurrent access to the record.
- */
- struct MUTEX * lock;
-
-} DHT_GET_RECORD;
-
-/**
- * Context for async DHT_PUT operation.
- */
-typedef struct DHT_PUT_RECORD {
- /**
- * What is the (absolute) time of the timeout?
- */
- cron_t timeout;
-
- /**
- * In which table are we searching?
- */
- DHT_TableId table;
-
- /**
- * What is the key?
- */
- HashCode512 key;
-
- DataContainer * value;
-
- /**
- * Context of findKNodes (async); NULL if the table was local.
- */
- FindKNodesContext * kfnc;
-
- /**
- * Callback to call upon completion.
- */
- DHT_OP_Complete callback;
-
- /**
- * Extra argument to callback.
- */
- void * closure;
-
- unsigned int confirmed_stores;
-
- /**
- * Size of the RPC array.
- */
- unsigned int rpcRepliesExpected;
-
- /**
- * ASYNC RPC handles.
- */
- struct RPC_Record ** rpc;
-
- /**
- * Lock for concurrent access to the record.
- */
- struct MUTEX * lock;
-
-} DHT_PUT_RECORD;
-
-
-/**
- * Context for async DHT_REMOVE operation.
- */
-typedef struct DHT_REMOVE_RECORD {
- /**
- * What is the (absolute) time of the timeout?
- */
- cron_t timeout;
-
- /**
- * In which table are we searching?
- */
- DHT_TableId table;
-
- /**
- * What is the key?
- */
- HashCode512 key;
-
- /**
- * Which value should be removed?
- */
- DataContainer * value;
-
- unsigned int confirmed_stores;
-
- /**
- * Context of findKNodes (async); NULL if the table was local.
- */
- FindKNodesContext * kfnc;
-
- /**
- * Callback to call upon completion.
- */
- DHT_OP_Complete callback;
-
- /**
- * Extra argument to callback.
- */
- void * closure;
-
- /**
- * Size of the RPC array.
- */
- unsigned int rpcRepliesExpected;
-
- /**
- * ASYNC RPC handles.
- */
- struct RPC_Record ** rpc;
-
- /**
- * Lock for concurrent access to the record.
- */
- struct MUTEX * lock;
-
-} DHT_REMOVE_RECORD;
-
-
-typedef struct {
-
- /**
- * Number of results currently received (size of the
- * results-array).
- */
- unsigned int count;
- /**
- * The results received so far.
- */
- DataContainer ** results;
- /**
- * RPC callback to call with the final result set.
- */
- Async_RPC_Complete_Callback callback;
- /**
- * Argument to the RPC_Complete callback.
- */
- struct CallInstance * rpc_context;
- /**
- * Argument to stop the async DHT-get operation.
- */
- DHT_GET_RECORD * get_record;
- /**
- * Did we send the final reply for this RPC? (if YES,
- * the dht-cron job or dht-shutdown will free the resources
- * of this struct).
- */
- int done;
- /**
- * Lock for accessing this struct.
- */
- struct MUTEX * lock;
-} RPC_DHT_FindValue_Context;
-
-typedef struct {
- /**
- * RPC callback to call with the final result set.
- */
- Async_RPC_Complete_Callback callback;
- /**
- * Argument to the RPC_Complete callback.
- */
- struct CallInstance * rpc_context;
- /**
- * Argument to stop the async DHT-get operation.
- */
- DHT_PUT_RECORD * put_record;
- /**
- * Did we send the final reply for this RPC? (if YES,
- * the dht-cron job or dht-shutdown will free the resources
- * of this struct).
- */
- int done;
- /**
- * Lock for accessing this struct.
- */
- struct MUTEX * lock;
-} RPC_DHT_store_Context;
-
-typedef struct {
- /**
- * RPC callback to call with the final result set.
- */
- Async_RPC_Complete_Callback callback;
- /**
- * Argument to the RPC_Complete callback.
- */
- struct CallInstance * rpc_context;
- /**
- * Argument to stop the async DHT-get operation.
- */
- DHT_REMOVE_RECORD * remove_record;
- /**
- * Did we send the final reply for this RPC? (if YES,
- * the dht-cron job or dht-shutdown will free the resources
- * of this struct).
- */
- int done;
- /**
- * Lock for accessing this struct.
- */
- struct MUTEX * lock;
-} RPC_DHT_remove_Context;
-
-/**
- * Cron-job that must be run before DHT can shutdown.
- */
-typedef struct {
- CronJob job;
- void * arg;
-} DHT_CronJobAbortEntry;
-
-
-/* ***************** prototypes ******************** */
-
-/**
- * Send an RPC 'ping' request to that node requesting DHT table
- * information. Note that this is done asynchronously.
- * This is just the prototype, the function is below.
- */
-static void request_DHT_ping(const PeerIdentity * identity,
- void * fnc);
-
-static FindKNodesContext * findKNodes_start(const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout,
- unsigned int k,
- NodeFoundCallback callback,
- void * closure);
-
-static int findKNodes_stop(FindKNodesContext * fnc);
-
-
-/* ******************* GLOBALS ********************* */
-
-/**
- * Global core API.
- */
-static CoreAPIForApplication * coreAPI;
-
-static struct GE_Context * ectx;
-
-/**
- * RPC API
- */
-static RPC_ServiceAPI * rpcAPI;
-
-/**
- * The buckets (Kademlia style routing table).
- */
-static PeerBucket * buckets;
-
-/**
- * Total number of active buckets.
- */
-static unsigned int bucketCount;
-
-/**
- * The ID of the master table.
- */
-static HashCode512 masterTableId;
-
-/**
- * List of the tables that this peer participates in.
- */
-static LocalTableData ** tables;
-
-/**
- * Number of entries in the tables array.
- */
-static unsigned int tablesCount;
-
-/**
- * Mutex to synchronize access to tables.
- */
-static struct MUTEX * lock;
-
-/**
- * Handle for the masterTable datastore that is used by this node
- * to store information about which peers participate in which
- * tables (the masterTable is another DHT, this store is just the
- * part of the masterTable that is stored at this peer).
- */
-static Blockstore * masterTableDatastore;
-
-/**
- * Table of cron-jobs (and arguments) that MUST be run
- * before the DHT module can shutdown. All of these
- * jobs are guaranteed to be triggered during the shutdown.
- */
-static DHT_CronJobAbortEntry * abortTable;
-
-static unsigned int abortTableSize;
-
-#define hostIdentityEquals(a,b) (0 == memcmp(a,b,sizeof(PeerIdentity)))
-
-/* *********************** CODE! ********************* */
-
-#if DEBUG_DHT
-static void printRoutingTable() {
- unsigned int i;
-
- MUTEX_LOCK(lock);
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "DHT ROUTING TABLE:\n");
- for (i=0;i<bucketCount;i++) {
- if (buckets[i].peers != NULL) {
- PeerInfo * pos = NULL;
-
- pos = vectorGetFirst(buckets[i].peers);
- while (pos != NULL) {
- EncName enc;
- EncName tabs[3];
- int j;
-
- memset(tabs, 0, sizeof(EncName)*3);
- hash2enc(&pos->id.hashPubKey,
- &enc);
- for (j=0;j<pos->tableCount;j++)
- hash2enc(&pos->tables[j],
- &tabs[j]);
-
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "[%4d: %3d-%3d]: %s with %u tables (%s, %s, %s)\n",
- i,
- buckets[i].bstart, buckets[i].bend,
- &enc,
- pos->tableCount,
- &tabs[0],
- &tabs[1],
- &tabs[2]);
- pos = vectorGetNext(buckets[i].peers);
- }
- }
- }
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "DHT ROUTING TABLE END\n");
- MUTEX_UNLOCK(lock);
-}
-#endif
-
-/**
- * we need to prevent unloading of the
- * DHT module while this cron-job is pending (or
- * rather keep track of it globally to do a proper
- * shutdown on-the-spot if needed!
- */
-static void addAbortJob(CronJob job,
- void * arg) {
- ENTER();
- MUTEX_LOCK(lock);
- GROW(abortTable,
- abortTableSize,
- abortTableSize+1);
- abortTable[abortTableSize-1].job = job;
- abortTable[abortTableSize-1].arg = arg;
- MUTEX_UNLOCK(lock);
-}
-
-/**
- * Remove a job from the abort table.
- */
-static void delAbortJob(CronJob job,
- void * arg) {
- int i;
-
- ENTER();
- MUTEX_LOCK(lock);
- for (i=0;i<abortTableSize;i++) {
- if ( (abortTable[i].job == job) &&
- (abortTable[i].arg == arg) ) {
- abortTable[i] = abortTable[abortTableSize-1];
- GROW(abortTable,
- abortTableSize,
- abortTableSize-1);
- break;
- }
- }
- MUTEX_UNLOCK(lock);
-}
-
-/**
- * Get the LocalTableData for the given table ID.
- * @return NULL if this peer does not participate in that table.
- */
-static LocalTableData * getLocalTableData(const DHT_TableId * id) {
- int i;
- for (i=tablesCount-1;i>=0;i--)
- if (equalsHashCode512(id,
- &tables[i]->id))
- return tables[i];
- return NULL;
-}
-
-/**
- * If this peer supports the given table and the
- * other peer is not closer than this peer to the
- * given key, returns YES.
- */
-static int isNotCloserThanMe(const DHT_TableId * table,
- const PeerIdentity * peer,
- const HashCode512 * key) {
- if (NULL == getLocalTableData(table))
- return NO;
- if (-1 == hashCodeCompareDistance(&peer->hashPubKey,
- &coreAPI->myIdentity->hashPubKey,
- key))
- return NO;
- else
- return YES;
-}
-
-/**
- * Find the bucket into which the given peer belongs.
- */
-static PeerBucket * findBucket(const PeerIdentity * peer) {
- unsigned int index;
- int i;
- int diff;
-#if DEBUG_DHT
- EncName enc1;
- EncName enc2;
-#endif
-
- index = sizeof(HashCode512)*8;
- for (i = sizeof(HashCode512)*8 - 1; i >= 0; --i) {
- diff = getHashCodeBit(&peer->hashPubKey, i) -
getHashCodeBit(&coreAPI->myIdentity->hashPubKey, i);
- if (diff != 0) {
- index = i;
- break;
- }
- }
-#if DEBUG_DHT
- hash2enc(&peer->hashPubKey,
- &enc1);
- hash2enc(&coreAPI->myIdentity->hashPubKey,
- &enc2);
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Bit-distance from `%s' to this peer `%s' is %u bit.\n",
- &enc1,
- &enc2,
- index);
-#endif
- i = bucketCount-1;
- while ( (buckets[i].bstart >= index) &&
- (i > 0) ) {
- i--;
- }
- if ( (buckets[i].bstart < index) &&
- (buckets[i].bend >= index) ) {
- return &buckets[i];
- } else {
-#if DEBUG_DHT
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- "Index %d not in range for bucket %d which is [%d,%d[\n",
- index,
- i,
- buckets[i].bstart,
- buckets[i].bend);
-#endif
- return NULL; /* should only happen for localhost! */
- }
-}
-
-/**
- * Update the set kbest which is supposed to accumulate the k closest
- * peers to the given key. The size of the kbset set is given by
- * limit.
- *
- * @param newValue the new candidate for inclusion in the set
- * @param *k the current number of entries in the set
- */
-static void k_best_insert(unsigned int limit,
- unsigned int * k,
- const HashCode512 * key,
- HashCode512 * kbest,
- const HashCode512 * newValue) {
- int replace;
- int m;
-
- if ((*k) < limit) {
- memcpy(&kbest[*k],
- newValue,
- sizeof(HashCode512));
- (*k)++;
- } else {
- replace = -1;
- for (m=limit-1;m>=0;m--)
- if ( (1 == hashCodeCompareDistance(&kbest[m],
- newValue,
- key)) &&
- ( (replace == -1) ||
- (1 == hashCodeCompareDistance(&kbest[m],
- &kbest[replace],
- key)) ) )
- replace = m;
- if (replace != -1) {
- memcpy(&kbest[replace],
- newValue,
- sizeof(HashCode512));
- }
- }
-}
-
-/**
- * Find the PeerInfo for the given peer.
- *
- * @return NULL if the peer is not in the RT.
- */
-static PeerInfo * findPeerInfo(const PeerIdentity * peer) {
- PeerBucket * bucket;
- PeerInfo * pos;
-
- bucket = findBucket(peer);
- if (bucket == NULL)
- return NULL;
- pos = vectorGetFirst(bucket->peers);
- while (pos != NULL) {
- if (equalsHashCode512(&peer->hashPubKey,
- &pos->id.hashPubKey))
- return pos;
- pos = vectorGetNext(bucket->peers);
- }
- return NULL;
-}
-
-/**
- * We receive a message from 'responder' which may contain optional
- *
- * fields about the responder. Process those fields (if present).
- * @param results::tables list of tables the responder participates in
(optional)
- * @param results::hellos list of hellos for responder (optional)
- */
-static void processOptionalFields(const PeerIdentity * responder,
- RPC_Param * results) {
- unsigned int dataLength;
- char * data;
- unsigned int tableCount;
- DHT_TableId * tables;
- EncName enc;
- cron_t now;
- PeerBucket * bucket;
- PeerInfo * pos;
-
- if (OK == RPC_paramValueByName(results,
- "tables",
- &dataLength,
- (void**)&data)) {
- tableCount = dataLength / sizeof(DHT_TableId);
- if (tableCount * sizeof(DHT_TableId) != dataLength) {
- IF_GELOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Malformed optional field `%s' received from peer `%s'.\n"),
- "tables",
- &enc);
- return;
- }
- tables = (DHT_TableId*) data;
- now = get_time();
-
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "updating routing table after learning about peer `%s' who provides %d
tables.\n",
- &enc,
- tableCount);
-#endif
-
- /* update buckets */
- MUTEX_LOCK(lock);
- pos = findPeerInfo(responder);
- bucket = findBucket(responder);
- if (bucket == NULL) {
- IF_GELOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Could not find peer `%s' in routing table!\n"),
- &enc);
- }
- GE_ASSERT(ectx, bucket != NULL);
- if (pos == NULL) {
- PeerInfo * oldest = NULL;
-
- pos = vectorGetFirst(bucket->peers);
- while (pos != NULL) {
- if (pos->lastActivity + DHT_INACTIVITY_DEATH < now) {
- if (oldest == NULL)
- oldest = pos;
- else
- if (pos->lastActivity < oldest->lastActivity)
- oldest = pos;
- }
- if (pos->lastTableRefresh +
- (pos->tableCount - tableCount) * DHT_TABLE_FACTOR +
DHT_HYPERACTIVE_TIME < now) {
- if (oldest == NULL)
- oldest = pos;
- else if (pos->lastTableRefresh +
- (pos->tableCount - tableCount) * DHT_TABLE_FACTOR <
- oldest->lastTableRefresh +
- (oldest->tableCount - tableCount) * DHT_TABLE_FACTOR)
- oldest = pos;
- }
- pos = vectorGetNext(bucket->peers);
- }
- pos = oldest;
- }
- if ( (vectorSize(bucket->peers) < BUCKET_TARGET_SIZE) &&
- (pos == NULL) ) {
- /* create new entry */
- pos = MALLOC(sizeof(PeerInfo));
- pos->tables = NULL;
- pos->tableCount = 0;
- pos->lastTimePingSend = get_time();
- vectorInsertLast(bucket->peers, pos);
- }
- if (pos == NULL) {
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "routing table full, not adding peer `%s'.\n",
- &enc);
-#endif
- } else {
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "adding peer `%s' to routing table.\n",
- &enc);
-#endif
-
- pos->lastActivity = now;
- pos->lastTableRefresh = now;
- pos->id = *responder;
- GROW(pos->tables,
- pos->tableCount,
- tableCount);
- memcpy(pos->tables,
- tables,
- sizeof(DHT_TableId) * tableCount);
- }
- MUTEX_UNLOCK(lock);
- }
-
- /* HERE: process other optional fields (hellos) */
-
-}
-
-/**
- * We are sending out a message and have the chance to communicate
- * optional fields. Add those if we feel like it.
- *
- * @param args the argument list to which optional fields can be added
- */
-static void addOptionalFields(RPC_Param * args) {
- DHT_TableId * tabs;
- int i;
- unsigned int tc;
- size_t s;
-
- MUTEX_LOCK(lock);
- tc = tablesCount;
- tabs = MALLOC(sizeof(DHT_TableId) * tc);
- for (i=0;i<tc;i++)
- tabs[i] = tables[i]->id;
- MUTEX_UNLOCK(lock);
- s = RPC_paramSize(args) + sizeof(DHT_TableId) * tc;
- /* always add if resulting size is less than 1k;
- never generate messages > 32k;
- if greater than 1k, only add with exponentially
- decreasing probability */
- if ( (s < 1024) ||
- ( (s*s < weak_randomi(32768)*weak_randomi(32768)) &&
- (s*s < weak_randomi(32768)*weak_randomi(32768)) ) ) {
- RPC_paramAdd(args,
- "tables",
- sizeof(DHT_TableId) * tc,
- tabs);
- }
- FREE(tabs);
-
- /* FIXME: here: add other optional fields (hellos) */
-}
-
-/**
- * The given peer has responded to our find RPC callback. Update the
- * last response time in the peer list and add the peers from results
- * to the FNC. Trigger further create_find_nodes_rpc requests.
- *
- * @param responder the ID of the responding peer
- * @param results::peers serialized HostIdentities
- * @param results::tables list of tables the responder participates in
(optional)
- * @param fnc the context (used to continue iterative search)
- */
-static void create_find_nodes_rpc_complete_callback(const PeerIdentity *
responder,
- RPC_Param * results,
- FindNodesContext * fnc) {
- PeerInfo * info;
- char * value;
- unsigned int dataLength;
- unsigned int pos;
- EncName enc;
-
- ENTER();
- processOptionalFields(responder, results);
- /* update peer list */
- MUTEX_LOCK(lock);
- info = findPeerInfo(responder);
- if (info != NULL)
- info->lastActivity = get_time();
- MUTEX_UNLOCK(lock);
-
- if (OK != RPC_paramValueByName(results,
- "peer",
- &dataLength,
- (void**) &value)) {
- IF_GELOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Received malformed response to `%s' from peer `%s'.\n"),
- "DHT_findNode",
- &enc);
- return;
- }
-
- /* parse value, try to DHT-ping the new peers
- (to add it to the table; if that succeeds
- the peer will automatically trigger the ping_reply_handler
- which will in turn trigger create_find_nodes_rpc) */
- if ( (dataLength % sizeof(PeerIdentity)) != 0) {
- IF_GELOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Received malformed response to `%s' from peer `%s'.\n"),
- "DHT_findNode",
- &enc);
- return;
- }
- for (pos=0;pos<dataLength;pos+=sizeof(PeerIdentity)) {
- PeerIdentity * msg;
-
- msg = (PeerIdentity*) &value[pos];
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "processing PeerID received from peer `%s' in response to `%s' RPC.\n",
- &enc,
- "DHT_findNode");
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&msg->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to learn more about peer `%s'.\n",
- "DHT_ping",
- &enc);
-#endif
- if (hostIdentityEquals(msg,
- coreAPI->myIdentity))
- continue; /* ignore self-references! */
- request_DHT_ping(msg,
- fnc);
- }
-}
-
-/**
- * Send a find_nodes RPC to the given peer. Replies are
- * to be inserted into the FNC k-best table.
- */
-static void create_find_nodes_rpc(const PeerIdentity * peer,
- FindNodesContext * fnc) {
- RPC_Param * param;
- cron_t now;
- cron_t rel;
- LocalTableData * table;
-#if DEBUG_DHT
- EncName enc;
-
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&peer->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to peer `%s'.\n",
- "DHT_find_nodes",
- &enc);
-#endif
- ENTER();
- now = get_time();
- param = RPC_paramNew();
- MUTEX_LOCK(fnc->lock);
- if (equalsHashCode512(&fnc->key,
- &coreAPI->myIdentity->hashPubKey)) {
- table = getLocalTableData(&fnc->table);
- if (table != NULL)
- table->lastFindOperation = now;
- }
- RPC_paramAdd(param,
- "table",
- sizeof(DHT_TableId),
- &fnc->table);
-
- RPC_paramAdd(param,
- "key",
- sizeof(HashCode512),
- &fnc->key);
- GROW(fnc->rpc,
- fnc->rpcRepliesExpected,
- fnc->rpcRepliesExpected+1);
- if (fnc->timeout > now)
- rel = fnc->timeout - now;
- else
- rel = 0;
- addOptionalFields(param);
- fnc->rpc[fnc->rpcRepliesExpected-1]
- = rpcAPI->RPC_start(peer,
- "DHT_findNode",
- param,
- 0,
- rel,
- (RPC_Complete) &create_find_nodes_rpc_complete_callback,
- fnc);
- MUTEX_UNLOCK(fnc->lock);
- RPC_paramFree(param);
-}
-
-/**
- * We received a reply from a peer that we ping'ed. Update
- * the FNC's kbest list and the buckets accordingly.
- */
-static void
-ping_reply_handler(const PeerIdentity * responder,
- RPC_Param * results,
- void * cls) {
- FindNodesContext * fnc = cls;
- int i;
- EncName enc;
- PeerInfo * pos;
-
- ENTER();
- GE_ASSERT(ectx, ! hostIdentityEquals(responder,
- coreAPI->myIdentity));
- /* this processes the 'tables' field! */
- processOptionalFields(responder,
- results);
- if (fnc == NULL)
- return;
- /* update k-best list */
- MUTEX_LOCK(fnc->lock);
- pos = findPeerInfo(responder);
- /* does the peer support the table in question? */
- if (! equalsHashCode512(&fnc->table,
- &masterTableId)) {
- for (i=pos->tableCount-1;i>=0;i--)
- if (equalsHashCode512(&fnc->table,
- &pos->tables[i]))
- break;
- if (i == -1) {
- MUTEX_UNLOCK(fnc->lock);
- return; /* peer does not support table in question */
- }
- }
-
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "peer `%s' supports table in question, considering the peer for list of
%d-best matches.\n",
- &enc,
- ALPHA);
-#endif
- k_best_insert(ALPHA,
- &fnc->k,
- &fnc->key,
- fnc->matches,
- &responder->hashPubKey);
-
- /* trigger transitive request searching for more nodes! */
- create_find_nodes_rpc(responder,
- fnc);
- MUTEX_UNLOCK(fnc->lock);
-}
-
-/**
- * Send an RPC 'ping' request to that node requesting DHT table
- * information. Note that this is done asynchronously.
- */
-static void request_DHT_ping(const PeerIdentity * identity,
- void * cls) {
- FindNodesContext * fnc = cls;
- RPC_Param * request_param;
- PeerInfo * pos;
- cron_t now;
- cron_t rel;
-#if DEBUG_DHT
- EncName enc;
-
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&identity->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to peer `%s'.\n",
- "DHT_ping",
- &enc);
-#endif
- ENTER();
- if (hostIdentityEquals(identity,
- coreAPI->myIdentity)) {
- GE_BREAK(ectx, 0);
- return; /* refuse to self-ping!... */
- }
- MUTEX_LOCK(lock);
- /* test if this peer is already in buckets */
- pos = findPeerInfo(identity);
- now = get_time();
- if (pos != NULL)
- pos->lastTimePingSend = now;
- MUTEX_UNLOCK(lock);
-
- /* peer not in RPC buckets; try PINGing via RPC */
- MUTEX_LOCK(fnc->lock);
- GROW(fnc->rpc,
- fnc->rpcRepliesExpected,
- fnc->rpcRepliesExpected+1);
- request_param = vectorNew(4);
- if (fnc->timeout > now)
- rel = fnc->timeout - now;
- else
- rel = 0;
- addOptionalFields(request_param);
- fnc->rpc[fnc->rpcRepliesExpected-1]
- = rpcAPI->RPC_start(identity,
- "DHT_ping",
- request_param,
- 0,
- rel,
- (RPC_Complete) &ping_reply_handler,
- fnc);
- vectorFree(request_param);
- MUTEX_UNLOCK(fnc->lock);
-}
-
-/**
- * Find k nodes in the local buckets that are closest to the
- * given key for the given table. Return instantly, do NOT
- * attempt to query remote peers.
- *
- * @param hosts array with space for k hosts.
- * @return number of hosts found
- */
-static unsigned int findLocalNodes(const DHT_TableId * table,
- const HashCode512 * key,
- PeerIdentity * hosts,
- unsigned int k) {
- int i;
- int j;
- PeerBucket * bucket;
- PeerInfo * pos;
- unsigned int ret;
-#if DEBUG_DHT
- EncName enc;
-
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "searching local table for peers supporting table `%s'.\n",
- &enc);
-#endif
- ENTER();
- ret = 0;
- /* find peers in local peer-list that participate in
- the given table */
- for (i=bucketCount-1;i>=0;i--) {
- bucket = &buckets[i];
- pos = vectorGetFirst(bucket->peers);
- while (pos != NULL) {
- for (j=pos->tableCount-1;j>=0;j--) {
- if (equalsHashCode512(&pos->tables[j],
- table)) {
-#if DEBUG_DHT
- EncName enc;
-
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&pos->id.hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "local table search showed peer `%s' is supporting the table.\n",
- &enc);
-#endif
- k_best_insert(k,
- &ret,
- key,
- (HashCode512*) hosts,
- &pos->id.hashPubKey);
- }
- }
- pos = vectorGetNext(bucket->peers);
- }
- } /* end for all buckets */
- return ret;
-}
-
-/**
- * We got a reply from the DHT-get operation. Update the
- * record datastructures accordingly (and call the record's
- * callback).
- *
- * @param results::data created in rpc_DHT_findValue_abort
- */
-static void dht_findvalue_rpc_reply_callback(const PeerIdentity * responder,
- RPC_Param * results,
- DHT_GET_RECORD * record) {
- DataContainer * value;
- unsigned int i;
- unsigned int max;
- PeerInfo * pos;
- EncName enc;
-
- ENTER();
- processOptionalFields(responder, results);
- MUTEX_LOCK(lock);
- pos = findPeerInfo(responder);
- pos->lastActivity = get_time();
- MUTEX_UNLOCK(lock);
-
- max = RPC_paramCount(results);
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&responder->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "peer `%s' responded to RPC `%s' with %u results.\n",
- &enc,
- "DHT_findvalue",
- max);
-#endif
- for (i=0;i<max;i++) {
- value = RPC_paramDataContainerByPosition(results,
- i);
- if (value == NULL) {
- hash2enc(&responder->hashPubKey,
- &enc);
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Invalid response to `%s' from peer `%s'.\n"),
- "DHT_findValue",
- &enc);
- return;
- }
- MUTEX_LOCK(record->lock);
- if (record->callback != NULL)
- record->resultCallback(record->keys,
- value,
- record->resultClosure);
- MUTEX_UNLOCK(record->lock);
- FREE(value);
- }
-}
-
-/**
- * Send an (async) DHT get to the given peer. Replies are to be
- * processed by the callback in record. The RPC async handle is to be
- * stored in the records rpc list. Locking is not required.
- */
-static int
-send_dht_get_rpc(const PeerIdentity * peer,
- void * cls) {
- DHT_GET_RECORD * record = cls;
- RPC_Param * param;
- unsigned long long timeout;
- unsigned int type;
- cron_t delta;
- cron_t now;
-#if DEBUG_DHT
- EncName enc;
-
- ENTER();
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&peer->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to peer `%s'.\n",
- "DHT_findvalue",
- &enc);
-#endif
- if (isNotCloserThanMe(&record->table,
- peer,
- record->keys))
- return OK; /* refuse! */
- now = get_time();
- if (record->timeout > now)
- delta = (record->timeout - now) / 2;
- else
- delta = 0;
- timeout = htonll(delta);
- type = htonl(record->type);
- param = RPC_paramNew();
- RPC_paramAdd(param,
- "table",
- sizeof(DHT_TableId),
- &record->table);
- RPC_paramAdd(param,
- "keys",
- sizeof(HashCode512) * record->keyCount,
- record->keys);
- RPC_paramAdd(param,
- "timeout",
- sizeof(unsigned long long),
- &timeout);
- RPC_paramAdd(param,
- "type",
- sizeof(unsigned int),
- &type);
- GROW(record->rpc,
- record->rpcRepliesExpected,
- record->rpcRepliesExpected+1);
- addOptionalFields(param);
- record->rpc[record->rpcRepliesExpected-1]
- = rpcAPI->RPC_start(peer,
- "DHT_findValue",
- param,
- 0,
- delta,
- (RPC_Complete) &dht_findvalue_rpc_reply_callback,
- record);
- RPC_paramFree(param);
- return OK;
-}
-
-/**
- * Callback called for local results found in
- * dht_get_async_start. Calls the DHT_OP_Complete
- * callback with the results found locally.
- * A DataProcessor.
- */
-static int getLocalResultCallback(const HashCode512 * key,
- const DataContainer * val,
- DHT_GET_RECORD * rec) {
- int ret;
- if ( (equalsHashCode512(&rec->table,
- &masterTableId)) &&
- ((ntohl(val->size) - sizeof(DataContainer)) % sizeof(PeerIdentity) !=
0) )
- GE_BREAK(ectx, 0); /* assertion failed: entry in master table malformed! */
- ret = OK;
- if (rec->resultCallback != NULL)
- ret = rec->resultCallback(key,
- val,
- rec->resultClosure);
- rec->resultsFound++;
- return ret;
-}
-
-/**
- * Perform an asynchronous GET operation on the DHT identified by
- * 'table' using 'key' as the key. The peer does not have to be part
- * of the table (if so, we will attempt to locate a peer that is!)
- *
- * @param table table to use for the lookup
- * @param key the key to look up
- * @param timeout how long to wait until this operation should
- * automatically time-out
- * @param callback function to call on each result
- * @param closure extra argument to callback
- * @return handle to stop the async get
- */
-static struct DHT_GET_RECORD *
-dht_get_async_start(const DHT_TableId * table,
- unsigned int type,
- unsigned int keyCount,
- const HashCode512 * keys,
- cron_t timeout,
- DataProcessor resultCallback,
- void * cls,
- DHT_OP_Complete callback,
- void * closure) {
- int i;
- LocalTableData * ltd;
- DHT_GET_RECORD * ret;
- unsigned int count;
-#if DEBUG_DHT
- EncName enc;
- EncName enc2;
- int res;
-
- ENTER();
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&keys[0],
- &enc));
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc2));
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "performing `%s' operation on key `%s' and table `%s'.\n",
- "DHT_GET",
- &enc,
- &enc2);
-#endif
-
- if (timeout > 1 * cronHOURS) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("`%s' called with timeout above 1 hour (bug?)\n"),
- __FUNCTION__);
- timeout = 1 * cronHOURS;
- }
-
- ret = MALLOC(sizeof(DHT_GET_RECORD));
- ret->timeout = get_time() + timeout;
- ret->type = type;
- ret->keyCount = keyCount;
- ret->keys = MALLOC(keyCount * sizeof(HashCode512));
- memcpy(ret->keys,
- keys,
- keyCount * sizeof(HashCode512));
- ret->table = *table;
- ret->resultCallback = resultCallback;
- ret->resultClosure = cls;
- ret->resultsFound = 0;
- ret->callback = callback;
- ret->closure = closure;
- ret->lock = MUTEX_CREATE(YES);
- ret->rpc = NULL;
- ret->rpcRepliesExpected = 0;
- ret->kfnc = NULL;
- MUTEX_LOCK(lock);
-
-
- ltd = getLocalTableData(table);
- if (ltd != NULL) {
- PeerIdentity * hosts;
-#if DEBUG_DHT
- IF_GELOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc));
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "I participate in the table `%s' for the `%s' operation.\n",
- &enc,
- "DHT_GET");
-#endif
- /* We do participate in the table, it is fair to assume
- that we know the relevant peers in my neighbour set */
- hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
- count = findLocalNodes(table,
- &keys[0],
- hosts,
- ALPHA);
- /* try adding this peer to hosts */
- k_best_insert(ALPHA,
- &count,
- &keys[0],
- (HashCode512*) hosts,
- &coreAPI->myIdentity->hashPubKey);
- if (count == 0) {
- GE_BREAK(ectx, 0);
- /* Assertion failed: I participate in a table but findLocalNodes
returned 0! */
- MUTEX_UNLOCK(lock);
- FREE(ret->keys);
- FREE(ret);
- return NULL;
- }
- /* if this peer is in 'hosts', try local datastore lookup */
- for (i=0;i<count;i++)
- if (hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i])) {
- res = ltd->store->get(ltd->store->closure,
- type,
- 0, /* FIXME: priority */
- keyCount,
- keys,
- (DataProcessor)&getLocalResultCallback,
- ret);
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&keys[0],
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "local datastore lookup for key `%s' resulted in %d results.\n",
- &enc,
- res);
-#endif
- break;
- }
-
- if (ALPHA > ret->resultsFound) {
- /* if less than ALPHA replies were found, send
- dht_get_RPC to the other peers */
- for (i=0;i<count;i++) {
- if (! hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i])) {
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&hosts[i].hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to peer `%s' that also participates in the
table.\n",
- "DHT_GET",
- &enc);
-#endif
- send_dht_get_rpc(&hosts[i],
- ret);
- }
- }
- }
- } else {
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "I do not participate in the table `%s', finding %d other nodes that
do.\n",
- &enc,
- ALPHA);
-#endif
- /* We do not particpate in the table; hence we need to use
- findKNodes to find an initial set of peers in that
- table; findKNodes tries to find k nodes and instantly
- allows us to query each node found. For each peer found,
- we then perform send_dht_get_rpc.
- */
- ret->kfnc
- = findKNodes_start(table,
- &keys[0],
- timeout,
- ALPHA,
- &send_dht_get_rpc,
- ret);
- }
- MUTEX_UNLOCK(lock);
- return ret;
-}
-
-/**
- * Stop async DHT-get. Frees associated resources.
- */
-static int dht_get_async_stop(struct DHT_GET_RECORD * record) {
- int i;
- int resultsFound;
-
- ENTER();
- if (record == NULL)
- return SYSERR;
- /* abort findKNodes (if running) - it may cause
- the addition of additional RPCs otherwise! */
- if (record->kfnc != NULL)
- findKNodes_stop(record->kfnc);
-
- for (i=0;i<record->rpcRepliesExpected;i++)
- rpcAPI->RPC_stop(record->rpc[i]);
- MUTEX_DESTROY(record->lock);
- resultsFound = record->resultsFound;
- FREE(record);
-#if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "`%s' operation completed with %d results.\n",
- "DHT_GET",
- resultsFound);
-#endif
-
- if (resultsFound > 0)
- return resultsFound;
- else
- return SYSERR; /* timeout */
-}
-
-/**
- * We found a peer in the MasterTable that supports the table that
- * we're trying to find peers for. Update FNC accordingly and
- * start transitive search for peers from that new peer.
- *
- * @param value should contain a set of HeloMessages corresponding
- * to the identities of peers that support the table that we're
- * looking for; pass those Helos to the core *and* try to ping them.
- */
-static int
-findnodes_dht_master_get_callback(const HashCode512 * key,
- const DataContainer * cont,
- void * cls) {
- FindNodesContext * fnc = cls;
- unsigned int dataLength;
- const PeerIdentity * id;
- int i;
-
- ENTER();
- dataLength = ntohl(cont->size) - sizeof(DataContainer);
-
- if ( (dataLength % sizeof(PeerIdentity)) != 0) {
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Response size was %d, expected multile of %d\n",
- dataLength,
- sizeof(PeerIdentity));
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Invalid response to `%s'.\n"),
- "DHT_findValue");
- return SYSERR;
- }
- id = (const PeerIdentity*) &cont[1];
- for (i=dataLength/sizeof(PeerIdentity)-1;i>=0;i--) {
- if (!hostIdentityEquals(&id[i],
- coreAPI->myIdentity))
- request_DHT_ping(&id[i],
- fnc);
- }
- return OK;
-}
-
-
-/**
- * In the induced sub-structure for the given 'table', find the ALPHA
- * nodes closest to the given key. The code first tries to find ALPHA
- * nodes in the routing table that participate in the given table. If
- * nodes are found, the k<=ALPHA nodes closest to the key are queried
- * (using the find node RPC) to find nodes closer to the key.
- *
- * If no (zero!) participating nodes are found, the a set of introduction
- * nodes for this table is obtained from the master table (using RPC
- * get). For the master table we try to discover peers participating
- * in the DHT using broadcasts to all connected peers (relying on
- * GNUnet core peer discovery).
- *
- * If we learn about new nodes in this step, add them to the RT table;
- * if we run out of space in the RT, send pings to oldest entry; if
- * oldest entry did not respond to PING, replace it!
- *
- * This function is used periodially for each table that we have joined
- * to ensure that we're connected to our neighbours.
- *
- * @param table the table which the peers must participate in
- * @param key the target key to use for routing
- * @param timeout how long to tell the RPCs that we will wait
- * (note that the caller is supposed to call findNodes_stop
- * to finally collect the collected nodes)
- * @return context for findNodes_stop
- */
-static FindNodesContext * findNodes_start(const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout) {
- FindNodesContext * fnc;
- int i;
-#if DEBUG_DHT
- EncName enc;
-
- ENTER();
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "function `%s' called to look for nodes participating in table `%s'.\n",
- __FUNCTION__,
- &enc);
-#endif
- fnc = MALLOC(sizeof(FindNodesContext));
- fnc->key = *key;
- fnc->table = *table;
- fnc->k = 0;
- fnc->matches = MALLOC(sizeof(HashCode512) * ALPHA);
- fnc->signal = SEMAPHORE_CREATE(0);
- fnc->timeout = get_time() + timeout;
- fnc->rpcRepliesExpected = 0;
- fnc->rpcRepliesReceived = 0;
- fnc->async_handle = NULL;
- fnc->lock = MUTEX_CREATE(YES);
-
- /* find peers in local peer-list that participate in
- the given table */
- fnc->k = findLocalNodes(table,
- key,
- (PeerIdentity*) fnc->matches,
- ALPHA);
-#if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "found %d participating nodes in local routing table.\n",
- fnc->k);
-#endif
- for (i=0;i<fnc->k;i++) {
- /* we found k nodes participating in the table; ask these
- k nodes to search further (in this table, with this key,
- with this timeout). Improve k-best node until timeout
- expires */
- create_find_nodes_rpc((PeerIdentity*) &fnc->matches[i],
- fnc);
- }
-
- /* also search for more peers for this table? */
- fnc->async_handle = NULL;
- if (fnc->k < ALPHA) {
- if (equalsHashCode512(table,
- &masterTableId)) {
-#if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "broadcasting RPC ping to find other peers for master table.\n");
-#endif
- /* No or too few other DHT peers known, search
- for more by sending a PING to all connected peers
- that are not in the table already */
- coreAPI->forAllConnectedNodes(&request_DHT_ping,
- fnc);
- } else {
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "performing RPC `%s' to find other peers participating in table
`%s'.\n",
- "DHT_findValue",
- &enc);
-#endif
- /* try finding peers responsible for this table using
- the master table */
- fnc->async_handle
- = dht_get_async_start(&masterTableId,
- 0, /* type */
- 1, /* 1 key */
- table, /* key */
- timeout,
- &findnodes_dht_master_get_callback,
- fnc,
- NULL,
- NULL);
- }
- }
- return fnc;
-}
-
-/**
- * This stops the asynchronous findNodes process. The search is aborted
- * and the k-best results are passed to the callback.
- *
- * @param fnc context returned from findNodes_start
- * @param callback function to call for each peer found
- * @param closure extra argument to the callback
- * @return number of peers found, SYSERR on error
- */
-static int findNodes_stop(FindNodesContext * fnc,
- NodeFoundCallback callback,
- void * closure) {
- int i;
-
- ENTER();
- /* stop async DHT get */
- if (fnc->async_handle != NULL) {
- dht_get_async_stop(fnc->async_handle);
- fnc->async_handle = NULL;
- }
-
- /* stop all async RPCs */
- for (i=fnc->rpcRepliesExpected-1;i>=0;i--)
- rpcAPI->RPC_stop(fnc->rpc[i]);
- SEMAPHORE_DESTROY(fnc->signal);
- MUTEX_DESTROY(fnc->lock);
-
- /* Finally perform callbacks on collected k-best nodes. */
- if (callback != NULL)
- for (i=fnc->k-1;i>=0;i--)
- callback((PeerIdentity*)&fnc->matches[i], closure);
- FREE(fnc->matches);
- i = fnc->k;
- FREE(fnc);
- return i;
-}
-
-/**
- * We found a peer in the MasterTable that supports the table that
- * we're trying to find peers for. Notify the caller about this peer.
- *
- * @param value should contain a set of HeloMessages corresponding
- * to the identities of peers that support the table that we're
- * looking for; pass those Helos to the core *and* to the callback
- * as peers supporting the table.
- */
-static int
-find_k_nodes_dht_master_get_callback(const HashCode512 * key,
- const DataContainer * cont,
- void * cls) {
- FindKNodesContext * fnc = cls;
- unsigned int pos;
- unsigned int dataLength;
- const PeerIdentity * value;
-#if DEBUG_DHT
- EncName enc;
-#endif
-
- ENTER();
- dataLength = ntohl(cont->size) - sizeof(DataContainer);
- value = (const PeerIdentity*) &cont[1];
-
- /* parse value, try to DHT-ping the new peers
- (to add it to the table; if that succeeds
- the peer will automatically trigger the ping_reply_handler
- which will in turn trigger create_find_nodes_rpc) */
- if ( (dataLength % sizeof(PeerIdentity)) != 0) {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Malformed response to `%s' on master table.\n"),
- "DHT_findValue");
- return SYSERR;
- }
- for (pos = 0;pos<dataLength/sizeof(PeerIdentity);pos++) {
- const PeerIdentity * msg;
-
- msg = &value[pos];
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&msg->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "master table returned peer `%s' in `%s' operation.\n",
- &enc,
- "DHT_findValue");
-#endif
- MUTEX_LOCK(fnc->lock);
- if (fnc->k > 0) {
- if (fnc->callback != NULL)
- fnc->callback(msg,
- fnc->closure);
- fnc->k--;
- fnc->found++;
- }
- MUTEX_UNLOCK(fnc->lock);
- }
- return OK;
-}
-
-/**
- * In the induced sub-structure for the given 'table', find k nodes
- * close to the given key that participate in that table. Any node in
- * the table will do, but preference is given to nodes that are close.
- * Still, the first k nodes that were found are returned (just the
- * search goes towards the key). This function is used for lookups
- * in tables in which this peer does not participate in.
- *
- * If no (zero!) participating nodes are found locally, the a set of
- * introduction nodes for this table is obtained from the master table
- * (using RPC get). For the master table we try to discover peers
- * participating in the DHT using broadcasts to all connected peers
- * (relying on GNUnet core peer discovery).
- *
- * If we learn about new nodes in this step, add them to the RT table;
- * if we run out of space in the RT, send pings to oldest entry; if
- * oldest entry did not respond to PING, replace it!
- *
- * @param table the table which the peers must participate in,
- * for this function, this should NEVER be the master-table.
- * @param key the target key to use for routing
- * @param timeout how long to tell the RPCs that we will wait
- * (note that the caller is supposed to call findNodes_stop
- * to finally collect the collected nodes)
- * @param k number of nodes to find
- * @param callback function to call for each peer found
- * @param closure extra argument to the callback
- * @return context for findKNodes_stop
- */
-static FindKNodesContext *
-findKNodes_start(const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout,
- unsigned int k,
- NodeFoundCallback callback,
- void * closure) {
- FindKNodesContext * fnc;
- int i;
- int found;
- PeerIdentity * matches;
-#if DEBUG_DHT
- EncName enc;
-
- ENTER();
- hash2enc(table,
- &enc);
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "`%s' called to find %d nodes that participate in table `%s'.\n",
- __FUNCTION__,
- k,
- &enc);
-#endif
- fnc = MALLOC(sizeof(FindKNodesContext));
- fnc->key = *key;
- fnc->table = *table;
- fnc->k = k;
- fnc->callback = callback;
- fnc->closure = closure;
- fnc->timeout = get_time() + timeout;
- fnc->rpcRepliesExpected = 0;
- fnc->rpcRepliesReceived = 0;
- fnc->found = 0;
- fnc->lock = MUTEX_CREATE(YES);
- matches = MALLOC(sizeof(PeerIdentity) * fnc->k);
-
- /* find peers in local peer-list that participate in
- the given table */
- found = findLocalNodes(table,
- key,
- matches,
- k);
- if (callback != NULL)
- for (i=0;i<found;i++)
- callback(&matches[i],
- closure);
- if (found == k) {
-#if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "`%s' found %d nodes in local table, no remote requests needed.\n",
- __FUNCTION__,
- k);
-#endif
- FREE(matches);
- return fnc; /* no need for anything else, we've found
- all we care about! */
- }
- fnc->k -= found;
- fnc->found = found;
- FREE(matches);
-
- /* also do 'get' to find for more peers for this table */
- fnc->async_handle = NULL;
- if (equalsHashCode512(table,
- &masterTableId)) {
- GE_BREAK(ectx, 0);
- /* findKNodes_start called for masterTable. That should not happen! */
- } else {
- #if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "`%s' sends request to find %d in master table.\n",
- __FUNCTION__,
- k);
-#endif
- /* try finding peers responsible for this table using
- the master table */
- fnc->async_handle
- = dht_get_async_start(&masterTableId,
- 0, /* type */
- 1, /* key count */
- table, /* keys */
- timeout,
- &find_k_nodes_dht_master_get_callback,
- fnc,
- NULL,
- NULL);
- }
- return fnc;
-}
-
-/**
- * This stops the asynchronous find-k-Nodes process.
- * The search is aborted.
- *
- * @param fnc context returned from findNodes_start
- * @return number of peers found, SYSERR on error
- */
-static int findKNodes_stop(FindKNodesContext * fnc) {
- int i;
- /* stop async DHT get */
- ENTER();
- if (fnc->async_handle != NULL) {
- dht_get_async_stop(fnc->async_handle);
- fnc->async_handle = NULL;
- }
-
- /* stop all async RPCs */
- for (i=fnc->rpcRepliesExpected-1;i>=0;i--)
- rpcAPI->RPC_stop(fnc->rpc[i]);
- MUTEX_DESTROY(fnc->lock);
-
- i = fnc->found;
- FREE(fnc);
- return i;
-}
-
-/**
- * We got a reply from the DHT_store operation. Update the
- * record datastructures accordingly (and call the record's
- * callback).
- *
- * @param results::peer created in rpc_DHT_store_abort
- */
-static void dht_put_rpc_reply_callback(const PeerIdentity * responder,
- RPC_Param * results,
- void * cls) {
- DHT_PUT_RECORD * record = cls;
- PeerIdentity * peer;
- unsigned int dataLength;
- PeerInfo * pos;
- unsigned int i;
- unsigned int max;
-
- ENTER();
- processOptionalFields(responder, results);
- MUTEX_LOCK(record->lock);
- pos = findPeerInfo(responder);
- pos->lastActivity = get_time();
-
- max = RPC_paramCount(results);
- for (i=0;i<max;i++) {
- if (0 != strcmp("peer",
- RPC_paramName(results, i)))
- continue; /* ignore */
- if ( (OK != RPC_paramValueByPosition(results,
- i,
- &dataLength,
- (void**)&peer)) ||
- (dataLength != sizeof(PeerIdentity)) ) {
- EncName enc;
-
- MUTEX_UNLOCK(record->lock);
- hash2enc(&responder->hashPubKey,
- &enc);
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Invalid response to `%s' from `%s'\n"),
- "DHT_put",
- &enc);
- return;
- }
- }
- MUTEX_UNLOCK(record->lock);
-}
-
-/**
- * Send an (async) DHT put to the given peer. Replies are to be
- * processed by the callback in record. The RPC async handle is to be
- * stored in the records rpc list. Locking is not required.
- */
-static int
-send_dht_put_rpc(const PeerIdentity * peer,
- void * cls) {
- DHT_PUT_RECORD * record = cls;
- RPC_Param * param;
- unsigned long long timeout;
- cron_t delta;
- cron_t now;
-#if DEBUG_DHT
- EncName enc;
-
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&peer->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to peer `%s'.\n",
- "DHT_store",
- &enc);
-#endif
- ENTER();
- if (isNotCloserThanMe(&record->table,
- peer,
- &record->key))
- return OK;
- now = get_time();
- if (record->timeout > now)
- delta = (record->timeout - now) / 2;
- else
- delta = 0;
- timeout = htonll(delta);
- param = RPC_paramNew();
- RPC_paramAdd(param,
- "table",
- sizeof(DHT_TableId),
- &record->table);
- RPC_paramAdd(param,
- "key",
- sizeof(HashCode512),
- &record->key);
- RPC_paramAdd(param,
- "timeout",
- sizeof(unsigned long long),
- &timeout);
- RPC_paramAddDataContainer(param,
- "value",
- record->value);
- GROW(record->rpc,
- record->rpcRepliesExpected,
- record->rpcRepliesExpected+1);
- addOptionalFields(param);
- record->rpc[record->rpcRepliesExpected-1]
- = rpcAPI->RPC_start(peer,
- "DHT_store",
- param,
- 0,
- delta,
- &dht_put_rpc_reply_callback,
- record);
- RPC_paramFree(param);
- return OK;
-}
-
-static void
-dht_put_async_timeout(void * cls) {
- struct DHT_PUT_RECORD * dpr = cls;
- if (dpr->callback != NULL)
- dpr->callback(dpr->closure);
- delAbortJob(&dht_put_async_timeout, cls);
-}
-
-
-/**
- * Perform an asynchronous PUT operation on the DHT identified by
- * 'table' storing a binding of 'key' to 'value'. The peer does not
- * have to be part of the table (if so, we will attempt to locate a
- * peer that is!)
- *
- * @param table table to use for the lookup
- * @param key the key to look up
- * @param timeout how long to wait until this operation should
- * automatically time-out
- * @param callback function to call on successful completion
- * @param closure extra argument to callback
- * @return handle to stop the async put
- */
-static struct DHT_PUT_RECORD *
-dht_put_async_start(const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout,
- const DataContainer * value,
- DHT_OP_Complete callback,
- void * closure) {
- int i;
- LocalTableData * ltd;
- DHT_PUT_RECORD * ret;
- unsigned int count;
-#if DEBUG_DHT
- EncName enc;
- EncName enc2;
-
- if (value == NULL) {
- GE_BREAK(ectx, 0);
- return NULL;
- }
-
- ENTER();
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(key,
- &enc));
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc2));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "performing `%s' operation on key `%s' and table `%s'.\n",
- "DHT_PUT",
- &enc,
- &enc2);
-#endif
- if (timeout > 1 * cronHOURS) {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("`%s' called with timeout above 1 hour (bug?)\n"),
- __FUNCTION__);
- timeout = 1 * cronHOURS;
- }
- ret = MALLOC(sizeof(DHT_PUT_RECORD));
- ret->timeout = get_time() + timeout;
- ret->key = *key;
- ret->table = *table;
- ret->callback = callback;
- ret->closure = closure;
- ret->value = MALLOC(ntohl(value->size));
- memcpy(ret->value,
- value,
- ntohl(value->size));
- ret->lock = MUTEX_CREATE(YES);
- ret->rpc = NULL;
- ret->rpcRepliesExpected = 0;
- ret->kfnc = NULL;
- MUTEX_LOCK(lock);
-
-
- ltd = getLocalTableData(table);
- if (ltd != NULL) {
- PeerIdentity * hosts;
-#if DEBUG_DHT
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(table,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "I participate in the table `%s' for the `%s' operation.\n",
- &enc,
- "DHT_PUT");
-#endif
- /* We do participate in the table, it is fair to assume
- that we know the relevant peers in my neighbour set */
- hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
- count = findLocalNodes(table,
- key,
- hosts,
- ALPHA);
- /* try adding this peer to hosts */
- k_best_insert(ALPHA,
- &count,
- key,
- (HashCode512*) hosts,
- &coreAPI->myIdentity->hashPubKey);
- if (count == 0) {
- GE_BREAK(ectx, 0);
- /* Assertion failed: I participate in a table but findLocalNodes
returned 0! */
- MUTEX_UNLOCK(lock);
- return NULL;
- }
- /* if this peer is in 'hosts', try local datastore lookup */
- for (i=0;i<count;i++) {
- if (hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i])) {
- if (OK == ltd->store->put(ltd->store->closure,
- key,
- value,
- 0 /* FIXME: priority */))
- ret->confirmed_stores++;
- break;
- }
- }
-
- /* send dht_put_RPC to the other peers */
- for (i=0;i<count;i++)
- if (! hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i]))
- send_dht_put_rpc(&hosts[i],
- ret);
- } else {
- /* We do not particpate in the table; hence we need to use
- findKNodes to find an initial set of peers in that
- table; findKNodes tries to find k nodes and instantly
- allows us to query each node found. For each peer found,
- we then perform send_dht_put_rpc.
- */
- ret->kfnc
- = findKNodes_start(table,
- key,
- timeout,
- ALPHA,
- &send_dht_put_rpc,
- ret);
- }
-
- /* call OP_Complete callback after timeout! */
- addAbortJob(&dht_put_async_timeout,
- ret);
- cron_add_job(coreAPI->cron,
- &dht_put_async_timeout,
- timeout,
- 0,
- ret);
- MUTEX_UNLOCK(lock);
- return ret;
-}
-
-/**
- * Stop async DHT-put. Frees associated resources.
- */
-static int dht_put_async_stop(struct DHT_PUT_RECORD * record) {
- int i;
-
- ENTER();
- if (record == NULL)
- return SYSERR;
-
- /* cancel timeout cron job (if still live) */
- delAbortJob(&dht_put_async_timeout, record);
- cron_del_job(coreAPI->cron,
- &dht_put_async_timeout,
- 0,
- record);
- /* abort findKNodes (if running) - it may cause
- the addition of additional RPCs otherwise! */
- if (record->kfnc != NULL)
- findKNodes_stop(record->kfnc);
-
- for (i=0;i<record->rpcRepliesExpected;i++)
- rpcAPI->RPC_stop(record->rpc[i]);
- MUTEX_DESTROY(record->lock);
- i = record->confirmed_stores;
- FREE(record->value);
- FREE(record);
- if (i > 0)
- return OK;
- else
- return SYSERR;
-}
-
-/**
- * We got a reply from the DHT_remove operation. Update the
- * record datastructures accordingly (and call the record's
- * callback).
- *
- * @param results::peer created in rpc_DHT_store_abort
- */
-static void
-dht_remove_rpc_reply_callback(const PeerIdentity * responder,
- RPC_Param * results,
- void * cls) {
- DHT_REMOVE_RECORD * record = cls;
- PeerIdentity * peer;
- unsigned int dataLength;
- PeerInfo * pos;
- unsigned int i;
- unsigned int max;
-
- ENTER();
- processOptionalFields(responder, results);
- MUTEX_LOCK(record->lock);
- pos = findPeerInfo(responder);
- pos->lastActivity = get_time();
- max = RPC_paramCount(results);
- for (i=0;i<max;i++) {
- if (0 != strcmp("peer",
- RPC_paramName(results, i)))
- continue; /* ignore */
- if ( (OK != RPC_paramValueByPosition(results,
- i,
- &dataLength,
- (void**)&peer)) ||
- (dataLength != sizeof(PeerIdentity)) ) {
- EncName enc;
-
- MUTEX_UNLOCK(record->lock);
- hash2enc(&responder->hashPubKey,
- &enc);
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Invalid response to `%s' from `%s'\n"),
- "DHT_remove",
- &enc);
- return;
- }
- record->confirmed_stores++;
- }
- MUTEX_UNLOCK(record->lock);
-}
-
-/**
- * Send an (async) DHT remove to the given peer. Replies are to be
- * processed by the callback in record. The RPC async handle is to be
- * stored in the records rpc list. Locking is not required.
- */
-static int
-send_dht_remove_rpc(const PeerIdentity * peer,
- void * cls) {
- DHT_REMOVE_RECORD * record = cls;
- RPC_Param * param;
- unsigned long long timeout;
- cron_t delta;
- cron_t now;
-#if DEBUG_DHT
- EncName enc;
-
- ENTER();
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&peer->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to peer `%s'.\n",
- "DHT_remove",
- &enc);
-#endif
- if (isNotCloserThanMe(&record->table,
- peer,
- &record->key))
- return OK; /* refuse! */
- now = get_time();
- if (record->timeout > now)
- delta = (record->timeout - now) / 2;
- else
- delta = 0;
- timeout = htonll(delta);
- param = RPC_paramNew();
- RPC_paramAdd(param,
- "table",
- sizeof(DHT_TableId),
- &record->table);
- RPC_paramAdd(param,
- "key",
- sizeof(HashCode512),
- &record->key);
- RPC_paramAdd(param,
- "timeout",
- sizeof(unsigned long long),
- &timeout);
- if (record->value != NULL)
- RPC_paramAddDataContainer(param,
- "value",
- record->value);
- GROW(record->rpc,
- record->rpcRepliesExpected,
- record->rpcRepliesExpected+1);
- addOptionalFields(param);
- record->rpc[record->rpcRepliesExpected-1]
- = rpcAPI->RPC_start(peer,
- "DHT_remove",
- param,
- 0,
- delta,
- &dht_remove_rpc_reply_callback,
- record);
- RPC_paramFree(param);
- return OK;
-}
-
-/**
- * Perform an asynchronous REMOVE operation on the DHT identified by
- * 'table' removing the binding of 'key' to 'value'. The peer does not
- * have to be part of the table (if so, we will attempt to locate a
- * peer that is!)
- *
- * @param table table to use for the lookup
- * @param key the key to look up
- * @param timeout how long to wait until this operation should
- * automatically time-out (relative time)
- * @param callback function to call on successful completion
- * @param closure extra argument to callback
- * @return handle to stop the async remove
- */
-static struct DHT_REMOVE_RECORD *
-dht_remove_async_start(const DHT_TableId * table,
- const HashCode512 * key,
- cron_t timeout,
- const DataContainer * value,
- DHT_OP_Complete callback,
- void * closure) {
- int i;
- LocalTableData * ltd;
- DHT_REMOVE_RECORD * ret;
- unsigned int count;
-
- if (timeout > 1 * cronHOURS) {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("`%s' called with timeout above 1 hour (bug?)\n"),
- __FUNCTION__);
- timeout = 1 * cronHOURS;
- }
- ENTER();
- ret = MALLOC(sizeof(DHT_REMOVE_RECORD));
- ret->timeout = get_time() + timeout;
- ret->key = *key;
- ret->table = *table;
- ret->callback = callback;
- ret->closure = closure;
- if (value == NULL) {
- ret->value = NULL;
- } else {
- ret->value = MALLOC(ntohl(value->size));
- memcpy(ret->value,
- value,
- ntohl(value->size));
- }
- ret->lock = MUTEX_CREATE(YES);
- ret->rpc = NULL;
- ret->rpcRepliesExpected = 0;
- ret->confirmed_stores = 0;
- ret->kfnc = NULL;
- MUTEX_LOCK(lock);
-
-
- ltd = getLocalTableData(table);
- if (ltd != NULL) {
- PeerIdentity * hosts;
- /* We do participate in the table, it is fair to assume
- that we know the relevant peers in my neighbour set */
- hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
- count = findLocalNodes(table,
- key,
- hosts,
- ALPHA);
- /* try adding this peer to hosts */
- k_best_insert(ALPHA,
- &count,
- key,
- (HashCode512*) hosts,
- &coreAPI->myIdentity->hashPubKey);
- if (count == 0) {
- GE_BREAK(ectx, 0);
- /* Assertion failed: I participate in a table but findLocalNodes
returned 0! */
- MUTEX_UNLOCK(lock);
- return NULL;
- }
- /* if this peer is in 'hosts', try local datastore lookup */
- for (i=0;i<count;i++) {
- if (hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i])) {
- if (OK == ltd->store->del(ltd->store->closure,
- key,
- value))
- ret->confirmed_stores++;
- break;
- }
- }
-
- /* send dht_remove_RPC to the other peers */
- for (i=0;i<count;i++)
- if (! hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i]))
- send_dht_remove_rpc(&hosts[i],
- ret);
- } else {
- /* We do not particpate in the table; hence we need to use
- findKNodes to find an initial set of peers in that
- table; findKNodes tries to find k nodes and instantly
- allows us to query each node found. For each peer found,
- we then perform send_dht_remove_rpc.
- */
- ret->kfnc
- = findKNodes_start(table,
- key,
- timeout,
- ALPHA,
- &send_dht_remove_rpc,
- ret);
- }
- MUTEX_UNLOCK(lock);
- return ret;
-}
-
-/**
- * Stop async DHT-remove. Frees associated resources.
- */
-static int dht_remove_async_stop(struct DHT_REMOVE_RECORD * record) {
- int i;
-
- ENTER();
- if (record == NULL)
- return SYSERR;
-
- /* abort findKNodes (if running) - it may cause
- the addition of additional RPCs otherwise! */
- if (record->kfnc != NULL)
- findKNodes_stop(record->kfnc);
-
- for (i=0;i<record->rpcRepliesExpected;i++)
- rpcAPI->RPC_stop(record->rpc[i]);
- MUTEX_DESTROY(record->lock);
- i = record->confirmed_stores;
- FREE(record->value);
- FREE(record);
- if (i > 0)
- return OK;
- else
- return SYSERR;
-}
-
-/**
- * Join a table (start storing data for the table). Join
- * fails if the node is already joint with the particular
- * table.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @param timeout NOT USED. Remove?
- * @return SYSERR on error, OK on success
- */
-static int dht_join(Blockstore * datastore,
- const DHT_TableId * table) {
- int i;
-
- ENTER();
- MUTEX_LOCK(lock);
- for (i=0;i<tablesCount;i++) {
- if (equalsDHT_TableId(&tables[i]->id, table)) {
- MUTEX_UNLOCK(lock);
- return SYSERR;
- }
- }
- GROW(tables,
- tablesCount,
- tablesCount+1);
- tables[tablesCount-1] = MALLOC(sizeof(LocalTableData));
- tables[tablesCount-1]->id = *table;
- tables[tablesCount-1]->store = datastore;
- MUTEX_UNLOCK(lock);
- return OK;
-}
-
-/**
- * Leave a table (stop storing data for the table). Leave
- * fails if the node is not joint with the table. Blocks
- * for at most timeout ms to migrate content elsewhere.
- *
- * @param datastore the storage callbacks to use for the table
- * @param table the ID of the table
- * @return SYSERR on error, OK on success
- */
-static int dht_leave(const DHT_TableId * table) {
- int i;
- int idx;
- LocalTableData * old;
- DHT_REMOVE_RECORD * remRec;
-
- ENTER();
- MUTEX_LOCK(lock);
- idx = -1;
- for (i=0;i<tablesCount;i++) {
- if (equalsDHT_TableId(&tables[i]->id, table)) {
- idx = i;
- break;
- }
- }
- if (idx == -1) {
- MUTEX_UNLOCK(lock);
- return SYSERR;
- }
- old = tables[i];
- if (tablesCount > 1)
- tables[i] = tables[tablesCount-1];
- GROW(tables,
- tablesCount,
- tablesCount-1);
- FREE(old);
- MUTEX_UNLOCK(lock);
- if (! equalsHashCode512(&masterTableId,
- table)) {
- /* issue dht_remove to remove this peer
- from the master table for this table;
- not needed/possible if we quit the DHT
- altogether... */
- DataContainer * value;
-
- value = MALLOC(sizeof(PeerIdentity) + sizeof(DataContainer));
- value->size = htonl(sizeof(PeerIdentity) + sizeof(DataContainer));
- memcpy(&value[1],
- coreAPI->myIdentity,
- sizeof(PeerIdentity));
- remRec = dht_remove_async_start(&masterTableId,
- table,
- 0,
- value,
- NULL,
- NULL);
- dht_remove_async_stop(remRec);
- }
- return OK;
-}
-
-/**
- * We received a PING from another DHT. The appropriate response
- * is to send a list of the tables that this peer participates in.
- *
- * @param arguments do we need any?
- * @param results::tables the tables we participate in (DHT_TableIds)
- * @param helos::hellos for this peer (optional)
- */
-static void rpc_DHT_ping(const PeerIdentity * sender,
- RPC_Param * arguments,
- RPC_Param * results) {
-#if DEBUG_DHT
- EncName enc;
-
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&sender->hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Received RPC `%s' from peer `%s'.\n",
- "DHT_ping",
- &enc);
-#endif
- ENTER();
- processOptionalFields(sender, arguments);
- /* processes 'tables' */
- addOptionalFields(results);
- /* adds 'tables' (with very high probability since there's nothing else,
- except if we participate in over 50 tables, which would be bad...) */
-}
-
-/**
- * Find nodes that we know of that participate in the given
- * table and that are close to the given key.
- *
- * @param arguments::key the key to route towards
- * @param arguments::table the id of the table
- * @param results::peers list of peers found to participate in the given table
with ID close to key;
- * peers consists of HostIdentities one after the other. See
- * create_find_nodes_rpc_complete_callback for the parser of the reply.
- * @param results::tables list of tables that this peer participates in
(optional)
- */
-static void rpc_DHT_findNode(const PeerIdentity * sender,
- RPC_Param * arguments,
- RPC_Param * results) {
- HashCode512 * key;
- DHT_TableId * table;
- unsigned int dataLength;
- unsigned int count;
- unsigned int k;
- PeerIdentity * peers;
-
- ENTER();
- processOptionalFields(sender, arguments);
- key = NULL;
- table = NULL;
- if ( (OK != RPC_paramValueByName(arguments,
- "key",
- &dataLength,
- (void**) &key)) ||
- (dataLength != sizeof(HashCode512)) ||
- (OK != RPC_paramValueByName(arguments,
- "table",
- &dataLength,
- (void**) &table)) ||
- (dataLength != sizeof(DHT_TableId)) ) {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("Received invalid RPC `%s'.\n"),
- "DHT_findNode");
- return;
- }
- k = ALPHA; /* optionally obtain k from arguments??? */
- peers = MALLOC(sizeof(PeerIdentity) * k);
- count = findLocalNodes(table,
- key,
- peers,
- k);
- RPC_paramAdd(results,
- "peer",
- count * sizeof(PeerIdentity),
- peers);
- FREE(peers);
- addOptionalFields(results);
-}
-
-/**
- * Cron-job to abort an rpc_DHT_findValue operation on timeout.
- * Takes the existing set of results and constructs a reply for
- * the RPC callback. If there are no replies, responds with
- * timeout.<p>
- *
- * The result is parsed in dht_findvalue_rpc_reply_callback.
- */
-static void rpc_DHT_findValue_abort(RPC_DHT_FindValue_Context * fw) {
- RPC_Param * results;
-
- ENTER();
- delAbortJob((CronJob) &rpc_DHT_findValue_abort,
- fw);
- MUTEX_LOCK(fw->lock);
- if (fw->done == YES) {
- MUTEX_UNLOCK(fw->lock);
- return;
- }
- dht_get_async_stop(fw->get_record);
- fw->get_record = NULL;
-
- /* build RPC reply, call RPC callback */
- if (fw->callback != NULL) {
- results = RPC_paramNew();
- addOptionalFields(results);
- fw->callback(results,
- OK,
- fw->rpc_context);
- RPC_paramFree(results);
- }
- fw->done = YES;
- MUTEX_UNLOCK(fw->lock);
-}
-
-/**
- * Job that adds a given reply to the list of replies for this
- * find-value operation. If the maximum number of results has
- * been accumulated this will also stop the cron-job and trigger
- * sending the cummulative reply via RPC.
- */
-static int rpc_dht_findValue_callback(const HashCode512 * key,
- const DataContainer * value,
- RPC_DHT_FindValue_Context * fw) {
- ENTER();
- MUTEX_LOCK(fw->lock);
- GROW(fw->results,
- fw->count,
- fw->count+1);
- fw->results[fw->count-1] = MALLOC(ntohl(value->size));
- memcpy(fw->results[fw->count-1],
- value,
- ntohl(value->size));
- MUTEX_UNLOCK(fw->lock);
- return OK;
-}
-
-static void rpc_dht_findValue_complete(RPC_DHT_FindValue_Context * ctx) {
- RPC_Param * param;
- int i;
-
- param = RPC_paramNew();
- for (i=0;i<ctx->count;i++)
- RPC_paramAdd(param,
- "data",
- ntohl(ctx->results[i]->size),
- &ctx->results[i][1]);
- ctx->callback(param,
- 0, /* error code */
- ctx->rpc_context);
- RPC_paramFree(param);
-}
-
-/**
- * Asynchronous RPC function called for 'findValue' RPC.
- *
- * @param arguments::keys the keys to search for
- * @param arguments::table the table to search in
- * @param arguments::timeout how long to wait at most
- * @param arguments::type type of the request (block type)
- * @param callback function to call with results when done
- * @param context additional argument to callback
- * @param results::data the result of the get operation
- * @param results::tables optional argument describing the tables
- * that this peer participates in
- */
-static void rpc_DHT_findValue(const PeerIdentity * sender,
- RPC_Param * arguments,
- Async_RPC_Complete_Callback callback,
- struct CallInstance * rpc_context) {
- HashCode512 * keys;
- DHT_TableId * table;
- unsigned long long * timeout;
- unsigned int * type;
- unsigned int keysLength;
- unsigned int dataLength;
- RPC_DHT_FindValue_Context * fw_context;
- RPC_Param * results;
-
- ENTER();
- processOptionalFields(sender, arguments);
- /* parse arguments */
- if ( (OK != RPC_paramValueByName(arguments,
- "keys",
- &keysLength,
- (void**) &keys)) ||
- (0 != (keysLength % sizeof(HashCode512))) ||
- (OK != RPC_paramValueByName(arguments,
- "table",
- &dataLength,
- (void**) &table)) ||
- (dataLength != sizeof(DHT_TableId)) ||
- (OK != RPC_paramValueByName(arguments,
- "timeout",
- &dataLength,
- (void**) &timeout)) ||
- (dataLength != sizeof(unsigned long long)) ||
- (OK != RPC_paramValueByName(arguments,
- "type",
- &dataLength,
- (void**) &type)) ||
- (dataLength != sizeof(unsigned int)) ) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Received invalid RPC `%s'.\n"),
- "DHT_findValue");
- RPC_paramFree(arguments);
- results = RPC_paramNew();
- if (callback != NULL)
- callback(results,
- SYSERR,
- rpc_context);
- RPC_paramFree(results);
- return;
- }
- fw_context
- = MALLOC(sizeof(RPC_DHT_FindValue_Context));
- fw_context->lock
- = MUTEX_CREATE(YES);
- fw_context->count
- = 0;
- fw_context->done
- = NO;
- fw_context->results
- = NULL;
- fw_context->callback
- = callback;
- fw_context->rpc_context
- = rpc_context;
- fw_context->get_record
- = dht_get_async_start(table,
- ntohl(*type),
- keysLength / sizeof(HashCode512),
- keys,
- ntohll(*timeout),
- (DataProcessor) &rpc_dht_findValue_callback,
- fw_context,
- (DHT_OP_Complete) &rpc_dht_findValue_complete,
- fw_context);
- addAbortJob((CronJob)&rpc_DHT_findValue_abort,
- fw_context);
- cron_add_job(coreAPI->cron,
- (CronJob)&rpc_DHT_findValue_abort,
- ntohll(*timeout),
- 0,
- fw_context);
- RPC_paramFree(arguments);
-}
-
-/**
- * Cron-job to abort an rpc_DHT_store operation on timeout.
- * Takes the existing set of results and constructs a reply for
- * the RPC callback. If there are no replies, responds with
- * timeout.<p>
- *
- * The result is parsed in dht_put_rpc_reply_callback.
- */
-static void rpc_DHT_store_abort(void * cls) {
- RPC_DHT_store_Context * fw = cls;
- RPC_Param * results;
-
- ENTER();
- delAbortJob(&rpc_DHT_store_abort,
- fw);
- MUTEX_LOCK(fw->lock);
- if (fw->done == YES) {
- MUTEX_UNLOCK(fw->lock);
- return;
- }
- dht_put_async_stop(fw->put_record);
- fw->put_record = NULL;
-
- /* build RPC reply, call RPC callback */
- if (fw->callback != NULL) {
- results = RPC_paramNew();
- addOptionalFields(results);
- fw->callback(results,
- OK,
- fw->rpc_context);
- RPC_paramFree(results);
- }
- fw->done = YES;
- MUTEX_UNLOCK(fw->lock);
-}
-
-/**
- * Job that adds a given reply to the list of replies for this
- * store operation. If the maximum number of peers has stored
- * the value, this will also stop the cron-job and trigger
- * sending the cummulative reply via RPC.
- */
-static void rpc_dht_store_callback(RPC_DHT_store_Context * fw) {
- RPC_Param * param;
-
- cron_del_job(coreAPI->cron,
- &rpc_DHT_store_abort,
- 0,
- fw);
- delAbortJob(&rpc_DHT_store_abort, fw);
- param = RPC_paramNew();
- fw->callback(param,
- 0,
- fw->rpc_context);
- RPC_paramFree(param);
- FREE(fw);
-}
-
-/**
- * DHT store request.
- */
-static void rpc_DHT_store(const PeerIdentity * sender,
- RPC_Param * arguments,
- Async_RPC_Complete_Callback callback,
- struct CallInstance * rpc_context) {
- HashCode512 * key;
- DHT_TableId * table;
- unsigned int dataLength;
- DataContainer * value;
- unsigned long long * timeout;
- RPC_DHT_store_Context * fw_context;
- LocalTableData * ltd;
- RPC_Param * results;
-
- ENTER();
- processOptionalFields(sender, arguments);
- /* parse arguments */
- if ( (OK != RPC_paramValueByName(arguments,
- "key",
- &dataLength,
- (void**) &key)) ||
- (dataLength != sizeof(HashCode512)) ||
- (OK != RPC_paramValueByName(arguments,
- "table",
- &dataLength,
- (void**) &table)) ||
- (dataLength != sizeof(DHT_TableId)) ||
- (OK != RPC_paramValueByName(arguments,
- "timeout",
- &dataLength,
- (void**) &timeout)) ||
- (dataLength != sizeof(unsigned long long)) ||
- ((NULL == (value = RPC_paramDataContainerByName(arguments,
- "value")))) ) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Received invalid RPC `%s'.\n"),
- "DHT_store");
- RPC_paramFree(arguments);
- results = RPC_paramNew();
- if (callback != NULL)
- callback(results,
- SYSERR,
- rpc_context);
- RPC_paramFree(results);
- return;
- }
-
- fw_context
- = MALLOC(sizeof(RPC_DHT_store_Context));
- fw_context->lock = MUTEX_CREATE(YES);
- MUTEX_LOCK(lock);
- ltd = getLocalTableData(table);
- if (ltd == NULL) {
- GE_LOG(ectx, GE_WARNING | GE_BULK | GE_USER,
- _("RPC for `%s' received for table that we do not participate in!\n"),
- "DHT_store");
- }
- MUTEX_UNLOCK(lock);
- fw_context->done
- = NO;
- fw_context->callback
- = callback;
- fw_context->rpc_context
- = rpc_context;
- fw_context->put_record
- = dht_put_async_start(table,
- key,
- ntohll(*timeout),
- value,
- (DHT_OP_Complete) &rpc_dht_store_callback,
- fw_context);
- addAbortJob(&rpc_DHT_store_abort,
- fw_context);
- cron_add_job(coreAPI->cron,
- &rpc_DHT_store_abort,
- ntohll(*timeout),
- 0,
- fw_context);
- RPC_paramFree(arguments);
- FREE(value);
-}
-
-/**
- * Cron-job to abort an rpc_DHT_remove operation on timeout.
- * Takes the existing set of results and constructs a reply for
- * the RPC callback. If there are no replies, responds with
- * timeout.<p>
- *
- * The result is parsed in dht_remove_rpc_reply_callback.
- */
-static void rpc_DHT_remove_abort(RPC_DHT_remove_Context * fw) {
- RPC_Param * results;
-
- ENTER();
- delAbortJob((CronJob) &rpc_DHT_remove_abort,
- fw);
- MUTEX_LOCK(fw->lock);
- if (fw->done == YES) {
- MUTEX_UNLOCK(fw->lock);
- return;
- }
- dht_remove_async_stop(fw->remove_record);
- fw->remove_record = NULL;
-
- /* build RPC reply, call RPC callback */
- results = RPC_paramNew();
- addOptionalFields(results);
- if (fw->callback != NULL)
- fw->callback(results,
- OK,
- fw->rpc_context);
- RPC_paramFree(results);
- fw->done = YES;
- MUTEX_UNLOCK(fw->lock);
-}
-
-/**
- * Job that adds a given reply to the list of peers that have removed
- * this find-value operation. If the number of peers reaches the
- * number of replicas this will also stop the cron-job and trigger
- * sending the cummulative reply via RPC.
- */
-static void rpc_dht_remove_callback(RPC_DHT_remove_Context * fw) {
- RPC_Param * param;
-
- cron_del_job(coreAPI->cron,
- &rpc_DHT_store_abort,
- 0,
- fw);
- delAbortJob(&rpc_DHT_store_abort, fw);
- param = RPC_paramNew();
-
- fw->callback(param,
- 0,
- fw->rpc_context);
- RPC_paramFree(param);
- FREE(fw);
-}
-
-/**
- * ASYNC RPC call for removing entries from the DHT.
- *
- * @param arguments::key the key to remove
- * @param arguments::table the table to remove data from
- * @param arguments::timeout how long to wait at most
- * @param arguments::value optional argument specifying which
- * value to remove from the given table under the given key
- * @param callback RPC service function to call once we are done
- * @param rpc_context extra argument to callback
- */
-static void rpc_DHT_remove(const PeerIdentity * sender,
- RPC_Param * arguments,
- Async_RPC_Complete_Callback callback,
- struct CallInstance * rpc_context) {
- HashCode512 * key;
- DHT_TableId * table;
- unsigned int dataLength;
- DataContainer * value;
- unsigned long long * timeout;
- RPC_DHT_remove_Context * fw_context;
- LocalTableData * ltd;
- RPC_Param * results;
-
- ENTER();
- processOptionalFields(sender, arguments);
- /* parse arguments */
- if ( (OK != RPC_paramValueByName(arguments,
- "key",
- &dataLength,
- (void**) &key)) ||
- (dataLength != sizeof(HashCode512)) ||
- (OK != RPC_paramValueByName(arguments,
- "table",
- &dataLength,
- (void**) &table)) ||
- (dataLength != sizeof(DHT_TableId)) ||
- (OK != RPC_paramValueByName(arguments,
- "timeout",
- &dataLength,
- (void**) &timeout)) ||
- (dataLength != sizeof(unsigned long long)) ) {
- GE_LOG(ectx,
- GE_WARNING | GE_BULK | GE_USER,
- _("Received invalid RPC `%s'.\n"),
- "DHT_remove");
- RPC_paramFree(arguments);
- results = RPC_paramNew();
- if (callback != NULL)
- callback(results,
- SYSERR,
- rpc_context);
- RPC_paramFree(results);
- return;
- }
-
- value = RPC_paramDataContainerByName(arguments,
- "value");
- fw_context
- = MALLOC(sizeof(RPC_DHT_remove_Context));
- fw_context->lock = MUTEX_CREATE(YES);
- MUTEX_LOCK(lock);
- ltd = getLocalTableData(table);
- if (ltd == NULL) {
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- _("RPC for `%s' received for table that we do not participate in!\n"),
- "DHT_removed");
- }
- MUTEX_UNLOCK(lock);
- fw_context->done
- = NO;
- fw_context->callback
- = callback;
- fw_context->rpc_context
- = rpc_context;
- fw_context->remove_record
- = dht_remove_async_start(table,
- key,
- ntohll(*timeout),
- value,
- (DHT_OP_Complete) &rpc_dht_remove_callback,
- fw_context);
- addAbortJob((CronJob)&rpc_DHT_remove_abort,
- fw_context);
- cron_add_job(coreAPI->cron,
- (CronJob)&rpc_DHT_remove_abort,
- ntohll(*timeout),
- 0,
- fw_context);
- RPC_paramFree(arguments);
- FREE(value);
-}
-
-/**
- * Cron-job to maintain DHT invariants. The responsibility of
- * this job is to maintain the routing table (by finding peers
- * if necessary).
- *
- * During shutdown the cron-job is called at a particular point
- * to free the associated resources. The point is chosen such
- * that the cron-job will not allocate new resources (since all
- * tables and all buckets are empty at that point).
- */
-static void dhtMaintainJob(void * shutdownFlag) {
- static struct RPC_Record ** pingRecords = NULL;
- static cron_t * pingTimes = NULL;
- static unsigned int pingRecordsSize = 0;
- static unsigned int pingTimesSize = 0;
- static struct DHT_PUT_RECORD ** putRecords = 0;
- static cron_t * putTimes = 0;
- static unsigned int putRecordsSize = 0;
- static unsigned int putTimesSize = 0;
- static FindNodesContext ** findRecords = NULL;
- static cron_t * findTimes = NULL;
- static unsigned int findRecordsSize = 0;
- static unsigned int findTimesSize = 0;
- int i;
- RPC_Param * request_param;
- PeerBucket * bucket;
- PeerInfo * pos;
- cron_t now;
- DataContainer * value;
-
- ENTER();
- MUTEX_LOCK(lock);
-#if DEBUG_DHT
- printRoutingTable();
- /* first, free resources from ASYNC calls started last time */
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "`%s' stops async requests from last cron round.\n",
- __FUNCTION__);
-#endif
- now = get_time();
- for (i=putRecordsSize-1;i>=0;i--) {
- if ( (shutdownFlag != NULL) ||
- (putTimes[i] + DHT_MAINTAIN_BUCKET_FREQUENCY < now)) {
- dht_put_async_stop(putRecords[i]);
- putRecords[i] = putRecords[putRecordsSize-1];
- putTimes[i] = putTimes[putRecordsSize-1];
- GROW(putRecords,
- putRecordsSize,
- putRecordsSize-1);
- GROW(putRecords,
- putTimesSize,
- putTimesSize-1);
- }
- }
- for (i=findRecordsSize-1;i>=0;i--) {
- if ( (shutdownFlag != NULL) ||
- (findTimes[i] + DHT_MAINTAIN_FIND_FREQUENCY < get_time())) {
- findNodes_stop(findRecords[i],
- NULL,
- NULL);
- findTimes[i] = findTimes[findRecordsSize-1];
- findRecords[i] = findRecords[findRecordsSize-1];
- GROW(findRecords,
- findRecordsSize,
- findRecordsSize-1);
- GROW(findTimes,
- findTimesSize,
- findTimesSize-1);
- }
- }
- for (i=0;i<pingRecordsSize;i++) {
- if ( (shutdownFlag != NULL) ||
- (pingTimes[i] + DHT_PING_FREQUENCY < get_time())) {
- rpcAPI->RPC_stop(pingRecords[i]);
- pingRecords[i] = pingRecords[pingRecordsSize-1];
- pingTimes[i] = pingTimes[pingRecordsSize-1];
- GROW(pingRecords,
- pingRecordsSize,
- pingRecordsSize-1);
- GROW(pingTimes,
- pingTimesSize,
- pingTimesSize-1);
- }
- }
- if (shutdownFlag != NULL) {
- MUTEX_UNLOCK(lock);
- return;
- }
-
- /* now trigger next round of ASYNC calls */
-
- /* for all of our tables, do a PUT on the master table */
- request_param = vectorNew(4);
- value = MALLOC(sizeof(PeerIdentity) + sizeof(DataContainer));
- value->size = htonl(sizeof(PeerIdentity) + sizeof(DataContainer));
- memcpy(&value[1],
- coreAPI->myIdentity,
- sizeof(PeerIdentity));
-#if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "`%s' issues DHT_PUTs to advertise tables this peer participates in.\n",
- __FUNCTION__);
-#endif
-
- for (i=0;i<tablesCount;i++) {
- if (tables[i]->lastMasterAdvertisement + DHT_MAINTAIN_BUCKET_FREQUENCY <
now) {
- tables[i]->lastMasterAdvertisement = now;
- if (equalsHashCode512(&tables[i]->id,
- &masterTableId))
- continue;
- GROW(putRecords,
- putRecordsSize,
- putRecordsSize+1);
- GROW(putTimes,
- putTimesSize,
- putTimesSize+1);
- putRecords[putRecordsSize-1]
- = dht_put_async_start(&masterTableId,
- &tables[i]->id,
- DHT_MAINTAIN_BUCKET_FREQUENCY,
- value,
- NULL,
- NULL);
- putTimes[putTimesSize-1] = now;
- }
- }
- vectorFree(request_param);
- FREE(value);
-
- /*
- for each table that we have joined gather OUR neighbours
- */
-#if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "`%s' issues findNodes for each table that we participate in.\n",
- __FUNCTION__);
-#endif
- for (i=0;i<tablesCount;i++) {
- if (tables[i]->lastFindOperation + DHT_MAINTAIN_FIND_FREQUENCY < now) {
- tables[i]->lastFindOperation = now;
- GROW(findRecords,
- findRecordsSize,
- findRecordsSize+1);
- GROW(findTimes,
- findTimesSize,
- findTimesSize+1);
- findRecords[findRecordsSize-1]
- = findNodes_start(&tables[i]->id,
- &coreAPI->myIdentity->hashPubKey,
- DHT_MAINTAIN_FIND_FREQUENCY);
- findTimes[findTimesSize-1] = now;
- }
- }
- /*
- for all peers in RT:
- a) if lastTableRefresh is very old, send ping
- b) if lastActivity is very very old, drop
- */
-#if DEBUG_DHT
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "`%s' issues put to advertise tables that we participate in.\n",
- __FUNCTION__);
-#endif
- request_param = vectorNew(4);
- for (i=bucketCount-1;i>=0;i--) {
- bucket = &buckets[i];
- pos = vectorGetFirst(bucket->peers);
- while (pos != NULL) {
- if (now - pos->lastTableRefresh > DHT_INACTIVITY_DEATH) {
- /* remove from table: dead peer */
- vectorRemoveObject(bucket->peers,
- pos);
- GROW(pos->tables,
- pos->tableCount,
- 0);
- FREE(pos);
- pos = vectorGetFirst(bucket->peers);
- continue;
- }
- if ( (now - pos->lastTableRefresh > DHT_INACTIVITY_DEATH / 2) &&
- (now - pos->lastTimePingSend > DHT_PING_FREQUENCY) ) {
-#if DEBUG_DHT
- EncName enc;
-
- ENTER();
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&pos->id.hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "sending RPC `%s' to peer `%s'.\n",
- "DHT_ping",
- &enc);
-#endif
- pos->lastTimePingSend = now;
- GROW(pingRecords,
- pingRecordsSize,
- pingRecordsSize+1);
- GROW(pingTimes,
- pingTimesSize,
- pingTimesSize+1);
- pingRecords[pingRecordsSize-1]
- = rpcAPI->RPC_start(&pos->id,
- "DHT_ping",
- request_param,
- 0,
- DHT_PING_FREQUENCY,
- &ping_reply_handler,
- NULL);
- pingTimes[pingTimesSize-1]
- = now;
- }
- pos = vectorGetNext(bucket->peers);
- }
- } /* end for all buckets */
- vectorFree(request_param);
-
- /*
- OPTIMIZE-ME:
- for all content in all tables:
- check if this peer should still be responsible for
- it, if not, migrate!
- */
- MUTEX_UNLOCK(lock);
-}
-
-/**
- * Provide the DHT service. The DHT service depends on the RPC
- * service.
- *
- * @param capi the core API
- * @return NULL on errors, DHT_API otherwise
- */
-DHT_ServiceAPI * provide_module_dht(CoreAPIForApplication * capi) {
- static DHT_ServiceAPI api;
- unsigned long long i;
- unsigned long long j;
-
- ENTER();
- coreAPI = capi;
- ectx = capi->ectx;
- rpcAPI = capi->requestService("rpc");
- if (rpcAPI == NULL)
- return NULL;
- if ( (-1 == GC_get_configuration_value_number(capi->cfg,
- "DHT",
- "BUCKETCOUNT",
- 1,
- 512,
- 512,
- &i)) ||
- (-1 == GC_get_configuration_value_number(capi->cfg,
- "DHT",
- "MASTER-TABLE-SIZE",
- 1,
- 65536 * 1024,
- 65536,
- &j)) ) {
- capi->releaseService(rpcAPI);
- return NULL;
- }
- GROW(buckets,
- bucketCount,
- i);
- for (i=0;i<bucketCount;i++) {
- buckets[i].bstart = 512 * i / bucketCount;
- buckets[i].bend = 512 * (i+1) / bucketCount;
- buckets[i].peers = vectorNew(4);
- }
-
-
- rpcAPI->RPC_register("DHT_ping",
- &rpc_DHT_ping);
- rpcAPI->RPC_register("DHT_findNode",
- &rpc_DHT_findNode);
- rpcAPI->RPC_register_async("DHT_findValue",
- &rpc_DHT_findValue);
- rpcAPI->RPC_register_async("DHT_store",
- &rpc_DHT_store);
- rpcAPI->RPC_register_async("DHT_remove",
- &rpc_DHT_remove);
- lock = coreAPI->getConnectionModuleLock();
- api.get_start = &dht_get_async_start;
- api.get_stop = &dht_get_async_stop;
- api.put_start = &dht_put_async_start;
- api.put_stop = &dht_put_async_stop;
-
- memset(&masterTableId, 0, sizeof(HashCode512));
- /* join the master table */
- masterTableDatastore
- = create_datastore_dht_master(ectx,
- j);
- dht_join(masterTableDatastore,
- &masterTableId);
- cron_add_job(coreAPI->cron,
- &dhtMaintainJob,
- 0,
- DHT_MAINTAIN_FREQUENCY,
- NULL);
- return &api;
-}
-
-/**
- * Shutdown DHT service.
- */
-int release_module_dht() {
- unsigned int i;
- PeerInfo * bucket;
-
- ENTER();
- rpcAPI->RPC_unregister("DHT_ping",
- &rpc_DHT_ping);
- rpcAPI->RPC_unregister("DHT_findNode",
- &rpc_DHT_findNode);
- rpcAPI->RPC_unregister_async("DHT_findValue",
- &rpc_DHT_findValue);
- rpcAPI->RPC_unregister_async("DHT_store",
- &rpc_DHT_store);
- rpcAPI->RPC_unregister_async("DHT_remove",
- &rpc_DHT_remove);
- cron_del_job(coreAPI->cron,
- &dhtMaintainJob,
- DHT_MAINTAIN_FREQUENCY,
- NULL);
- /* stop existing / pending DHT operations */
- while (abortTableSize > 0) {
- cron_del_job(coreAPI->cron,
- abortTable[0].job,
- 0,
- abortTable[0].arg);
- abortTable[0].job(abortTable[0].arg);
- }
- /* leave the master table */
- dht_leave(&masterTableId);
- for (i=0;i<bucketCount;i++) {
- bucket = (PeerInfo*) vectorGetFirst(buckets[i].peers);
- while (bucket != NULL) {
- GROW(bucket->tables,
- bucket->tableCount,
- 0);
- bucket = (PeerInfo*) vectorGetNext(buckets[i].peers);
- }
- vectorFree(buckets[i].peers);
- }
- GROW(buckets,
- bucketCount,
- 0);
-
- dhtMaintainJob((void*)1); /* free's cron's internal resources! */
- destroy_datastore_dht_master(masterTableDatastore);
- coreAPI->releaseService(rpcAPI);
- lock = NULL;
- rpcAPI = NULL;
- coreAPI = NULL;
- return OK;
-}
-
-
-/* end of dht.c */
Modified: GNUnet/src/applications/dht/module/table.c
===================================================================
--- GNUnet/src/applications/dht/module/table.c 2006-12-27 06:30:26 UTC (rev
4062)
+++ GNUnet/src/applications/dht/module/table.c 2006-12-27 06:48:58 UTC (rev
4063)
@@ -36,12 +36,11 @@
* TODO: expose and improve reliabily metrics (to be added later)
* + dstore.c + plugin: SQL-based datastore: key, value, expiration
* (bounded FIFO-datastore, when full, kill oldest entry first)
- * [?: better replacement policy to guard against attacks?]
* + routing.c: tracking of get/put operations, retry, reply handling
* code tries best-match routing among entries in table
* + service.c: provide DHT services to rest of GNUnet process
* (i.e. register datastore with shared data, get/put operations)
- * + cs.c: services to out-of-process DHT clients (via dht-lib) [mostly done]
+ * + cs.c: services to out-of-process DHT clients (via dht-lib)
*/
#include "platform.h"
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-12-27 06:30:26 UTC (rev 4062)
+++ GNUnet/todo 2006-12-27 06:48:58 UTC (rev 4063)
@@ -34,6 +34,8 @@
+ dht/gap integration [RC]
+ ecrs/location URIs [RC]
+ fsui/location URI support [RC]
+ + dht/routing: GET retries [RC]
+ + dht/routing: handle routing loops [RC]
+ dstore bloomfilter (optimization)
* HTTP transport (libcurl, libmicrohttpd)
- minor improvements:
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r4063 - in GNUnet: . src/applications/dht/module,
grothoff <=