[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r6161 - in GNUnet/src/applications/fs: . gap
From: |
gnunet |
Subject: |
[GNUnet-SVN] r6161 - in GNUnet/src/applications/fs: . gap |
Date: |
Tue, 5 Feb 2008 23:52:41 -0700 (MST) |
Author: grothoff
Date: 2008-02-05 23:52:39 -0700 (Tue, 05 Feb 2008)
New Revision: 6161
Added:
GNUnet/src/applications/fs/gap/
GNUnet/src/applications/fs/gap/Makefile.am
GNUnet/src/applications/fs/gap/README
GNUnet/src/applications/fs/gap/TODO
GNUnet/src/applications/fs/gap/anonymity.c
GNUnet/src/applications/fs/gap/anonymity.h
GNUnet/src/applications/fs/gap/check.conf
GNUnet/src/applications/fs/gap/fs.c
GNUnet/src/applications/fs/gap/fs_dht.c
GNUnet/src/applications/fs/gap/fs_dht.h
GNUnet/src/applications/fs/gap/gap.c
GNUnet/src/applications/fs/gap/gap.h
GNUnet/src/applications/fs/gap/gap_old.c
GNUnet/src/applications/fs/gap/migration.c
GNUnet/src/applications/fs/gap/migration.h
GNUnet/src/applications/fs/gap/ondemand.c
GNUnet/src/applications/fs/gap/ondemand.h
GNUnet/src/applications/fs/gap/pid_table.c
GNUnet/src/applications/fs/gap/pid_table.h
GNUnet/src/applications/fs/gap/plan.c
GNUnet/src/applications/fs/gap/plan.h
GNUnet/src/applications/fs/gap/querymanager.c
GNUnet/src/applications/fs/gap/querymanager.h
GNUnet/src/applications/fs/gap/shared.c
GNUnet/src/applications/fs/gap/shared.h
GNUnet/src/applications/fs/gap/test_linear_topology.c
GNUnet/src/applications/fs/gap/test_loopback.c
GNUnet/src/applications/fs/gap/test_star_topology.c
Log:
new
Added: GNUnet/src/applications/fs/gap/Makefile.am
===================================================================
--- GNUnet/src/applications/fs/gap/Makefile.am (rev 0)
+++ GNUnet/src/applications/fs/gap/Makefile.am 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,63 @@
+INCLUDES = -I$(top_srcdir)/src/include
+
+plugindir = $(libdir)/GNUnet
+
+plugin_LTLIBRARIES = \
+ libgnunetmodule_fs.la
+
+
+libgnunetmodule_fs_la_SOURCES = \
+ anonymity.c anonymity.h \
+ fs.c \
+ fs_dht.c fs_dht.h \
+ gap.c gap.h \
+ migration.c migration.h \
+ ondemand.c ondemand.h \
+ plan.c plan.h \
+ pid_table.c pid_table.h \
+ querymanager.c querymanager.h \
+ shared.c shared.h
+
+libgnunetmodule_fs_la_LDFLAGS = \
+ -export-dynamic -avoid-version -module
+libgnunetmodule_fs_la_LIBADD = \
+ $(top_builddir)/src/applications/fs/libecrs_core.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+
+
+check_PROGRAMS = \
+ test_loopback test_linear_topology test_star_topology
+
+TESTS = $(check_PROGRAMS)
+
+test_loopback_SOURCES = \
+ test_loopback.c
+test_loopback_LDADD = \
+ $(top_builddir)/src/applications/testing/libgnunettesting_api.la \
+ $(top_builddir)/src/applications/stats/libgnunetstats_api.la \
+ $(top_builddir)/src/applications/fs/ecrs/libgnunetecrs.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+
+test_linear_topology_SOURCES = \
+ test_linear_topology.c
+test_linear_topology_LDADD = \
+ $(top_builddir)/src/applications/testing/libgnunettesting_api.la \
+ $(top_builddir)/src/applications/stats/libgnunetstats_api.la \
+ $(top_builddir)/src/applications/fs/ecrs/libgnunetecrs.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+
+test_star_topology_SOURCES = \
+ test_star_topology.c
+test_star_topology_LDADD = \
+ $(top_builddir)/src/applications/identity/libgnunetidentity_api.la \
+ $(top_builddir)/src/applications/testing/libgnunettesting_api.la \
+ $(top_builddir)/src/applications/stats/libgnunetstats_api.la \
+ $(top_builddir)/src/applications/fs/ecrs/libgnunetecrs.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+
+EXTRA_DIST = \
+ check.conf
Added: GNUnet/src/applications/fs/gap/README
===================================================================
--- GNUnet/src/applications/fs/gap/README (rev 0)
+++ GNUnet/src/applications/fs/gap/README 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,6 @@
+Main data flow:
+
+ -> client -> -> querymanager -> ->
+core fs.c plan.c core
+ -> peer -> -> gap -> ->
+
Added: GNUnet/src/applications/fs/gap/TODO
===================================================================
--- GNUnet/src/applications/fs/gap/TODO (rev 0)
+++ GNUnet/src/applications/fs/gap/TODO 2008-02-06 06:52:39 UTC (rev 6161)
@@ -0,0 +1,36 @@
+CORRECTNESS:
+1) (periodic?) clean up of stale data in plan.c datastructures -- easy --- 45
minutes
+
+PERFORMANCE:
+1) core reservation of response bandwidth --- medium --- 60 minutes
+2) core notification of P2P-level disconnect (to call cleanup functions!)
+
+
+HARD STUFF:
+1) priority determination
+ - theoretically unbounded (important not to have a max!)
+ - bound by inbound priorities for forwarded queries
+ - start with priorities similar to current inbound priorities for our own
requests
+2) TTL determination
+ - if priority non-zero, use maximum permitted TTL (for ours as well as
forwarded;
+ make sure to decrement forwarded priority by at least one; one trust unit
+ should correspond to TTL_DECREMENT loss in max TTL)
+ - if priority is zero:
+ - decrement inbound TTL randomly
+ - use single random TTL decrement for local queries (especially for the
first run if we are initiator)
+3) implement PLAN_request target count calculation
+ Strategy:
+ - if entropy of ratings is high, increase #targets
+ - make it possible (even likely) that we only pick one target (keeping most
of the priority!)
+4) implement PLAN_request peer ranking calculation
+
+
+DETAILS:
+1) make sure that anonymity-level zero content is pushed into
+ the DHT by both fs (on insert/index) and by migration (for refresh)
+
+
+OUT-OF-SCOPE:
+1) modify datastore to return diverse subsets of large response sets,
+ except when processing for loopback! -- medium --- 100 minutes
+2) make sure core polls whenever outbound bandwidth is available
Added: GNUnet/src/applications/fs/gap/anonymity.c
===================================================================
--- GNUnet/src/applications/fs/gap/anonymity.c (rev 0)
+++ GNUnet/src/applications/fs/gap/anonymity.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,111 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2003, 2004, 2005, 2008 Christian Grothoff (and other
contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/fs/gap/anonymity.c
+ * @brief code for checking if cover traffic is sufficient
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "anonymity.h"
+#include "gnunet_protocols.h"
+#include "gnunet_traffic_service.h"
+
+static GNUNET_Traffic_ServiceAPI * traffic;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+int
+GNUNET_FS_ANONYMITY_check (unsigned int level,
+ unsigned short content_type)
+{
+ unsigned int count;
+ unsigned int peers;
+ unsigned int sizes;
+ unsigned int timevect;
+
+ if (level == 0)
+ return GNUNET_OK;
+ level--;
+ if (traffic == NULL)
+ return GNUNET_SYSERR;
+ if (GNUNET_OK != traffic->get (5 * GNUNET_CRON_SECONDS /
GNUNET_TRAFFIC_TIME_UNIT, /* TTL_DECREMENT/TTU */
+ content_type,
+ GNUNET_TRAFFIC_TYPE_RECEIVED, &count, &peers,
+ &sizes, &timevect))
+ {
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _("Failed to get traffic stats.\n"));
+ return GNUNET_SYSERR;
+ }
+ if (level > 1000)
+ {
+ if (peers < level / 1000)
+ {
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Not enough cover traffic to satisfy anonymity
requirements (%u, %u peers). "
+ "Result dropped.\n", level, peers);
+ return GNUNET_SYSERR;
+ }
+ if (count < level % 1000)
+ {
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Not enough cover traffic to satisfy anonymity
requirements (%u, %u messages). "
+ "Result dropped.\n", level, count);
+ return GNUNET_SYSERR;
+ }
+ }
+ else
+ {
+ if (count < level)
+ {
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Not enough cover traffic to satisfy anonymity
requirements (%u, %u messages). "
+ "Result dropped.\n", level, count);
+ return GNUNET_SYSERR;
+ }
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Initialize the migration module.
+ */
+void
+GNUNET_FS_ANONYMITY_init (GNUNET_CoreAPIForPlugins * capi)
+{
+ coreAPI = capi;
+ traffic = capi->request_service("traffic");
+}
+
+void
+GNUNET_FS_ANONYMITY_done ()
+{
+ if (traffic != NULL)
+ coreAPI->release_service("traffic");
+}
+
+/* end of anonymity.c */
Property changes on: GNUnet/src/applications/fs/gap/anonymity.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/anonymity.h
===================================================================
--- GNUnet/src/applications/fs/gap/anonymity.h (rev 0)
+++ GNUnet/src/applications/fs/gap/anonymity.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,51 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2003, 2004, 2005, 2008 Christian Grothoff (and other
contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file applications/fs/gap/module/anonymity.h
+ * @brief code for checking if cover traffic is sufficient
+ * @author Christian Grothoff
+ */
+
+#ifndef ANONYMITY_H
+#define ANONYMITY_H
+
+#include "gnunet_core.h"
+
+/**
+ * Initialize the migration module.
+ */
+void
+GNUNET_FS_ANONYMITY_init (GNUNET_CoreAPIForPlugins * capi);
+
+void
+GNUNET_FS_ANONYMITY_done (void);
+
+/**
+ * Consider traffic volume before sending out content or
+ * queries.
+ *
+ * @return GNUNET_OK if cover traffic is sufficient
+ */
+int
+GNUNET_FS_ANONYMITY_check (unsigned int anonymityLevel,
+ unsigned short content_type);
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/anonymity.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/check.conf
===================================================================
--- GNUnet/src/applications/fs/gap/check.conf (rev 0)
+++ GNUnet/src/applications/fs/gap/check.conf 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,16 @@
+# General settings
+[GNUNET]
+GNUNET_HOME = "/tmp/gnunet-gap-test-driver"
+LOGLEVEL = "WARNING"
+LOGFILE = ""
+PROCESS-PRIORITY = "NORMAL"
+
+# Network options for the clients
+[NETWORK]
+HOST = "localhost:2087"
+
+
+[TESTING]
+WEAKRANDOM = YES
+
+
Added: GNUnet/src/applications/fs/gap/fs.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs.c (rev 0)
+++ GNUnet/src/applications/fs/gap/fs.c 2008-02-06 06:52:39 UTC (rev 6161)
@@ -0,0 +1,875 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Christian Grothoff
(and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/fs/gap/fs.c
+ * @brief functions for handling CS and P2P file-sharing requests
+ * @author Christian Grothoff
+ *
+ * This file contains all of the entry points to the file-sharing
+ * module.
+ *
+ * TODO:
+ * - integrate with migration submodule
+ * - make sure we do an immediate PUSH for DHT stuff
+ * given to us with anonymityLevel zero.
+ */
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_directories.h"
+#include "gnunet_protocols.h"
+#include "gnunet_datastore_service.h"
+#include "gnunet_dht_service.h"
+#include "gnunet_identity_service.h"
+#include "gnunet_stats_service.h"
+#include "gnunet_traffic_service.h"
+#include "ecrs_core.h"
+#include "anonymity.h"
+#include "fs.h"
+#include "fs_dht.h"
+#include "gap.h"
+#include "migration.h"
+#include "querymanager.h"
+#include "ondemand.h"
+#include "plan.h"
+#include "pid_table.h"
+#include "shared.h"
+
+
+#define DEBUG_FS GNUNET_NO
+
+/**
+ * Lock shared between all C files in this
+ * directory.
+ */
+struct GNUNET_Mutex * GNUNET_FS_lock;
+
+static struct GNUNET_GE_Context *ectx;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static GNUNET_Identity_ServiceAPI *identity;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static GNUNET_Datastore_ServiceAPI * datastore;
+
+static int stat_gap_query_received;
+
+static int stat_gap_content_received;
+
+static int stat_gap_trust_awarded;
+
+/**
+ * Hard CPU limit
+ */
+static unsigned long long hardCPULimit;
+
+/**
+ * Hard network upload limit.
+ */
+static unsigned long long hardUpLimit;
+
+
+
+/* ********************* CS handlers ********************** */
+
+/**
+ * Process a request to insert content from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_insert_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ const CS_fs_request_insert_MESSAGE *ri;
+ GNUNET_DatastoreValue *datum;
+ GNUNET_HashCode query;
+ int ret;
+#if DEBUG_FS
+ GNUNET_EncName enc;
+#endif
+
+ ri = (const CS_fs_request_insert_MESSAGE *) req;
+ if ( (ntohs (req->size) < sizeof (CS_fs_request_insert_MESSAGE)) ||
+ (GNUNET_OK !=
+ GNUNET_EC_file_block_check_and_get_query (ntohs (ri->header.size) -
+ sizeof
+
(CS_fs_request_insert_MESSAGE),
+ (const DBlock *) &ri[1],
+ GNUNET_YES, &query)) )
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ datum = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
+ ntohs (req->size) -
+ sizeof (CS_fs_request_insert_MESSAGE));
+ datum->size =
+ htonl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
+ sizeof (CS_fs_request_insert_MESSAGE));
+ datum->expirationTime = ri->expiration;
+ datum->prio = ri->prio;
+ datum->anonymityLevel = ri->anonymityLevel;
+ datum->type = htonl(GNUNET_EC_file_block_get_type (ntohs (ri->header.size) -
+ sizeof
(CS_fs_request_insert_MESSAGE),
+ (const DBlock *) &ri[1]));
+#if DEBUG_FS
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&query, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST INSERT (query: `%s', type: %u, priority
%u)\n",
+ &enc, type, ntohl (ri->prio));
+#endif
+ memcpy (&datum[1],
+ &ri[1], ntohs (req->size) - sizeof (CS_fs_request_insert_MESSAGE));
+ ret = datastore->putUpdate (&query, datum);
+ GNUNET_free (datum);
+ return coreAPI->sendValueToClient (sock, ret);
+}
+
+/**
+ * Process a request to symlink a file
+ */
+static int
+handle_cs_init_index_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ const CS_fs_request_init_index_MESSAGE *ri;
+ struct GNUNET_GE_Context *cectx;
+ int fnLen;
+ int ret;
+ char *fn;
+
+ fnLen = ntohs (req->size) - sizeof (CS_fs_request_init_index_MESSAGE);
+ if ( (ntohs (req->size) < sizeof (CS_fs_request_init_index_MESSAGE))
+#if WINDOWS
+ || (fnLen > _MAX_PATH)
+#endif
+ )
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ ri = (const CS_fs_request_init_index_MESSAGE *) req;
+ fn = GNUNET_malloc (fnLen + 1);
+ strncpy (fn, (const char *) &ri[1], fnLen + 1);
+ fn[fnLen] = 0;
+ cectx =
+ coreAPI->
+ cs_create_client_log_context (sock);
+ ret = GNUNET_FS_ONDEMAND_index_prepare_with_symlink (cectx, &ri->fileId, fn);
+ GNUNET_GE_free_context (cectx);
+ GNUNET_free (fn);
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Sending confirmation (%s) of index initialization request to
client\n",
+ ret == GNUNET_OK ? "success" : "failure");
+#endif
+ return coreAPI->sendValueToClient (sock, ret);
+}
+
+/**
+ * Process a request to index content from the client.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_index_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_index_MESSAGE *ri;
+ struct GNUNET_GE_Context *cectx;
+
+ if (ntohs (req->size) < sizeof (CS_fs_request_index_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ cectx =
+ coreAPI->
+ cs_create_client_log_context (sock);
+ ri = (const CS_fs_request_index_MESSAGE *) req;
+ ret = GNUNET_FS_ONDEMAND_add_indexed_content (cectx,
+ datastore,
+ ntohl (ri->prio),
+ GNUNET_ntohll (ri->expiration),
+ GNUNET_ntohll (ri->fileOffset),
+ ntohl (ri->anonymityLevel),
+ &ri->fileId,
+ ntohs (ri->header.size) -
+ sizeof (CS_fs_request_index_MESSAGE),
+ (const DBlock *) &ri[1]);
+ GNUNET_GE_free_context (cectx);
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Sending confirmation (%s) of index request to client\n",
+ ret == GNUNET_OK ? "success" : "failure");
+#endif
+ return coreAPI->sendValueToClient (sock, ret);
+}
+
+/**
+ * Process a query to delete content.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_delete_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_delete_MESSAGE *rd;
+ GNUNET_DatastoreValue *value;
+ GNUNET_HashCode query;
+ unsigned int type;
+#if DEBUG_FS
+ GNUNET_EncName enc;
+#endif
+
+ if (ntohs (req->size) < sizeof (CS_fs_request_delete_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ rd = (const CS_fs_request_delete_MESSAGE *) req;
+ value = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) +
+ ntohs (req->size) -
+ sizeof (CS_fs_request_delete_MESSAGE));
+ value->size =
+ ntohl (sizeof (GNUNET_DatastoreValue) + ntohs (req->size) -
+ sizeof (CS_fs_request_delete_MESSAGE));
+ type =
+ GNUNET_EC_file_block_get_type (ntohs (rd->header.size) -
+ sizeof (CS_fs_request_delete_MESSAGE),
+ (const DBlock *) &rd[1]);
+ value->type = htonl (type);
+ memcpy (&value[1],
+ &rd[1], ntohs (req->size) - sizeof (CS_fs_request_delete_MESSAGE));
+ if (GNUNET_OK !=
+ GNUNET_EC_file_block_check_and_get_query (ntohs (rd->header.size) -
+ sizeof
+ (CS_fs_request_delete_MESSAGE),
+ (const DBlock *) &rd[1],
+ GNUNET_NO, &query))
+ {
+ GNUNET_free (value);
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+#if DEBUG_FS
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&query, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST DELETE (query: `%s', type: %u)\n", &enc,
+ type);
+#endif
+ GNUNET_mutex_lock (GNUNET_FS_lock);
+ if (GNUNET_SYSERR == datastore->get (&query, type,
&GNUNET_FS_HELPER_complete_value_from_database_callback, value))
+ { /* aborted == found! */
+ ret = datastore->del (&query, value);
+ }
+ else
+ { /* not found */
+ ret = GNUNET_SYSERR;
+ }
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+ GNUNET_free (value);
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Sending confirmation (%s) of delete request to client\n",
+ ret != GNUNET_SYSERR ? "success" : "failure");
+#endif
+ return coreAPI->sendValueToClient (sock, ret);
+}
+
+/**
+ * Process a client request unindex content.
+ */
+static int
+handle_cs_unindex_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_unindex_MESSAGE *ru;
+ struct GNUNET_GE_Context *cectx;
+
+ cectx =
+ coreAPI->
+ cs_create_client_log_context (sock);
+ if (ntohs (req->size) != sizeof (CS_fs_request_unindex_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ GNUNET_GE_BREAK (cectx, 0);
+ GNUNET_GE_free_context (cectx);
+ return GNUNET_SYSERR;
+ }
+ ru = (const CS_fs_request_unindex_MESSAGE *) req;
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST UNINDEX\n");
+#endif
+ ret = GNUNET_FS_ONDEMAND_delete_indexed_content (cectx,
+ datastore, ntohl (ru->blocksize), &ru->fileId);
+ GNUNET_GE_free_context (cectx);
+ return coreAPI->sendValueToClient (sock, ret);
+}
+
+/**
+ * Process a client request to test if certain
+ * data is indexed.
+ */
+static int
+handle_cs_test_indexed_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ int ret;
+ const CS_fs_request_test_index_MESSAGE *ru;
+
+ if (ntohs (req->size) != sizeof (CS_fs_request_test_index_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ ru = (const CS_fs_request_test_index_MESSAGE *) req;
+#if DEBUG_FS
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "FS received REQUEST TESTINDEXED\n");
+#endif
+ ret = GNUNET_FS_ONDEMAND_test_indexed_file (datastore, &ru->fileId);
+ return coreAPI->sendValueToClient (sock, ret);
+}
+
+/**
+ * Any response that we get should be passed
+ * back to the client. If the response is unique,
+ * we should about the iteration (return GNUNET_SYSERR).
+ */
+static int
+fast_path_processor(const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue *
+ value, void *closure,
+ unsigned long long uid) {
+ struct GNUNET_ClientHandle * sock = closure;
+ const DBlock * dblock;
+ CS_fs_reply_content_MESSAGE * msg;
+ unsigned int size;
+ GNUNET_DatastoreValue * enc;
+ const GNUNET_DatastoreValue * use;
+
+ size = ntohl(value->size) - sizeof(GNUNET_DatastoreValue);
+ dblock = (const DBlock*) &value[1];
+ enc = NULL;
+ if ( (ntohl(dblock->type) == GNUNET_ECRS_BLOCKTYPE_ONDEMAND) &&
+ (GNUNET_OK != GNUNET_FS_ONDEMAND_get_indexed_content(value,
+ key,
+ &enc)) )
+ return GNUNET_OK; /* data corrupt, continue to search */
+ if (enc == NULL)
+ use = value;
+ else
+ use = enc;
+ size = ntohl(use->size) - sizeof(GNUNET_DatastoreValue);
+ dblock = (const DBlock*) &use[1];
+ msg = GNUNET_malloc(sizeof(CS_fs_reply_content_MESSAGE) + size);
+ msg->header.type = htons(GNUNET_CS_PROTO_GAP_RESULT);
+ msg->header.size = htons(sizeof(CS_fs_reply_content_MESSAGE) + size);
+ msg->anonymityLevel = use->anonymityLevel;
+ msg->expirationTime = use->expirationTime;
+ memcpy(&msg[1],
+ dblock,
+ size);
+ GNUNET_free_non_null(enc);
+ coreAPI->cs_send_to_client(sock, &msg->header, GNUNET_YES);
+ GNUNET_free(msg);
+ if (ntohl(dblock->type) == GNUNET_ECRS_BLOCKTYPE_DATA)
+ return GNUNET_SYSERR; /* unique response */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a query from the client. Forwards to the network.
+ *
+ * @return GNUNET_SYSERR if the TCP connection should be closed, otherwise
GNUNET_OK
+ */
+static int
+handle_cs_query_start_request (struct GNUNET_ClientHandle *sock,
+ const GNUNET_MessageHeader * req)
+{
+ static GNUNET_PeerIdentity all_zeros;
+ const CS_fs_request_search_MESSAGE *rs;
+ unsigned int keyCount;
+ unsigned int type;
+ unsigned int anonymityLevel;
+ int have_target;
+
+ if (ntohs (req->size) < sizeof (CS_fs_request_search_MESSAGE))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ rs = (const CS_fs_request_search_MESSAGE *) req;
+ type = ntohl (rs->type);
+ /* try "fast path" avoiding gap/dht if unique reply is locally available */
+ if (GNUNET_SYSERR == datastore->get(&rs->query[0],
+ type,
+ &fast_path_processor, sock))
+ return GNUNET_OK;
+ anonymityLevel = ntohl(rs->anonymityLevel);
+ keyCount =
+ 1 + (ntohs (req->size) -
+ sizeof (CS_fs_request_search_MESSAGE)) / sizeof (GNUNET_HashCode);
+ have_target = memcmp (&all_zeros, &rs->target, sizeof (GNUNET_PeerIdentity))
!= 0;
+ GNUNET_FS_QUERYMANAGER_start_query (&rs->query[0],
+ keyCount,
+ anonymityLevel,
+ type,
+ sock,
+ have_target ? &rs->target : NULL);
+ return GNUNET_OK;
+}
+
+/**
+ * Return 1 if the current network (upstream) or CPU load is
+ * (far) too high, 0 if the load is ok.
+ */
+static int
+test_load_too_high ()
+{
+ return ((hardCPULimit > 0) &&
+ (GNUNET_cpu_get_load (ectx,
+ coreAPI->cfg) >= hardCPULimit)) ||
+ ((hardUpLimit > 0) &&
+ (GNUNET_network_monitor_get_load (coreAPI->load_monitor,
+ GNUNET_ND_UPLOAD) >= hardUpLimit));
+}
+
+/**
+ * Handle P2P query for content.
+ */
+static int
+handle_p2p_query (const GNUNET_PeerIdentity * sender,
+ const GNUNET_MessageHeader * msg)
+{
+ const P2P_gap_query_MESSAGE * req;
+ unsigned int query_count;
+ unsigned short size;
+ unsigned int bloomfilter_size;
+ int ttl;
+ unsigned int prio;
+ unsigned int type;
+ unsigned int netLoad;
+ enum GNUNET_FS_RoutingPolicy policy;
+ GNUNET_PeerIdentity respond_to;
+ double preference;
+
+ if (test_load_too_high ())
+ {
+#if DEBUG_GAP
+ if (sender != NULL)
+ {
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&sender->hashPubKey, &enc));
+ }
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Dropping query from %s, this peer is too busy.\n",
+ sender == NULL ? "localhost" : (char *) &enc);
+#endif
+ return GNUNET_OK;
+ }
+ size = ntohs(msg->size);
+ if (size < sizeof(P2P_gap_query_MESSAGE))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* malformed query */
+ }
+ req = (const P2P_gap_query_MESSAGE*) msg;
+ query_count = ntohl(req->number_of_queries);
+ if ( (query_count == 0) ||
+ (query_count > GNUNET_MAX_BUFFER_SIZE / sizeof(GNUNET_HashCode)) ||
+ (size < sizeof(P2P_gap_query_MESSAGE) + (query_count-1) *
sizeof(GNUNET_HashCode)) ||
+ (0 == memcmp (&req->returnTo,
+ coreAPI->myIdentity,
+ sizeof (GNUNET_PeerIdentity))) )
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* malformed query */
+ }
+ if (stats != NULL)
+ stats->change (stat_gap_query_received, 1);
+
+ bloomfilter_size = size - (sizeof(P2P_gap_query_MESSAGE) + (query_count-1) *
sizeof(GNUNET_HashCode));
+ GNUNET_GE_ASSERT(NULL, bloomfilter_size < size);
+ prio = ntohl (req->priority);
+ netLoad =
+ GNUNET_network_monitor_get_load (coreAPI->load_monitor,
+ GNUNET_ND_UPLOAD);
+ if ( (netLoad == (unsigned int) -1) ||
+ (netLoad < GAP_IDLE_LOAD_THRESHOLD) )
+ {
+ prio = 0; /* minimum priority, no charge! */
+ policy = GNUNET_FS_RoutingPolicy_ALL;
+ }
+ else
+ {
+ prio = - identity->changeHostTrust (sender, -prio);
+ if (netLoad < GAP_IDLE_LOAD_THRESHOLD + prio)
+ policy = GNUNET_FS_RoutingPolicy_ALL;
+ else if (netLoad < 90 + 10 * prio)
+ policy = GNUNET_FS_RoutingPolicy_ANSWER |
GNUNET_FS_RoutingPolicy_FORWARD;
+ else if (netLoad < 100)
+ policy = GNUNET_FS_RoutingPolicy_ANSWER;
+ return GNUNET_OK; /* drop */
+ }
+ if ((policy & GNUNET_FS_RoutingPolicy_INDIRECT) > 0)
+ {
+ respond_to = *coreAPI->myIdentity;
+ }
+ else
+ {
+ /* otherwise we preserve the original sender
+ and kill the priority (since we cannot benefit) */
+ respond_to = *sender;
+ prio = 0;
+ }
+ ttl = GNUNET_FS_HELPER_bound_ttl(ntohl (req->ttl),
+ prio);
+ type = ntohl(req->type);
+ /* decrement ttl (always) */
+ if (ttl < 0)
+ {
+ ttl -= 2 * TTL_DECREMENT +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, TTL_DECREMENT);
+ if (ttl > 0)
+ /* integer underflow => drop (should be very rare)! */
+ return GNUNET_OK;
+ }
+ else
+ {
+ ttl -= 2 * TTL_DECREMENT +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, TTL_DECREMENT);
+ }
+
+
+ preference = (double) prio;
+ if (preference < QUERY_BANDWIDTH_VALUE)
+ preference = QUERY_BANDWIDTH_VALUE;
+ coreAPI->preferTrafficFrom (sender, preference);
+ GNUNET_FS_GAP_execute_query (&respond_to,
+ prio,
+ policy,
+ ttl,
+ type,
+ query_count,
+ &req->queries[0],
+ ntohl(req->filter_mutator),
+ bloomfilter_size,
+ &req->queries[query_count+1]);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Use content (forward to whoever sent the query).
+ * @param hostId the peer from where the content came,
+ * NULL for the local peer
+ */
+static int
+handle_p2p_content (const GNUNET_PeerIdentity * sender,
+ const GNUNET_MessageHeader * pmsg)
+{
+ const P2P_gap_reply_MESSAGE *msg;
+ const DBlock * dblock;
+ GNUNET_DatastoreValue * value;
+ GNUNET_HashCode query;
+ unsigned short size;
+ unsigned int data_size;
+ unsigned int prio;
+ unsigned long long expiration;
+ double preference;
+
+ size = ntohs(pmsg->size);
+ if (size < sizeof (P2P_gap_reply_MESSAGE))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* invalid! */
+ }
+ msg = (const P2P_gap_reply_MESSAGE *) pmsg;
+ data_size = size - sizeof (P2P_gap_reply_MESSAGE);
+ dblock = (const DBlock*) &msg[1];
+ if (GNUNET_OK !=
+ GNUNET_EC_file_block_check_and_get_query(data_size,
+ dblock,
+ GNUNET_YES,
+ &query))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* invalid! */
+ }
+ if (stats != NULL)
+ stats->change (stat_gap_content_received, 1);
+ expiration = GNUNET_ntohll(msg->expiration);
+ /* forward to other peers */
+ prio = GNUNET_FS_GAP_handle_response(sender,
+ &query,
+ expiration,
+ data_size,
+ dblock);
+ /* forward to local clients */
+ prio += GNUNET_FS_QUERYMANAGER_handle_response(sender,
+ &query,
+ expiration,
+ data_size,
+ dblock);
+ /* FIXME: offer to migration module? */
+ if ( (sender != NULL) &&
+ ( (prio > 0) || (! test_load_too_high ()) ) )
+ {
+ /* consider storing in local datastore */
+ value = GNUNET_malloc (data_size +
+ sizeof (GNUNET_DatastoreValue));
+ value->size = htonl (data_size +
+ sizeof (GNUNET_DatastoreValue));
+ value->type = dblock->type;
+ value->prio = htonl (prio);
+ value->anonymityLevel = htonl(1);
+ value->expirationTime = GNUNET_htonll(expiration + GNUNET_get_time());
+ memcpy (&value[1], dblock, data_size);
+ datastore->putUpdate(&query,
+ value);
+ GNUNET_free (value);
+ }
+ if (sender != NULL)
+ { /* if we are the sender, sender will be NULL */
+ identity->changeHostTrust (sender, prio);
+ if (stats != NULL)
+ stats->change (stat_gap_trust_awarded, prio);
+ preference = (double) prio;
+ if (preference < CONTENT_BANDWIDTH_VALUE)
+ preference = CONTENT_BANDWIDTH_VALUE;
+ coreAPI->preferTrafficFrom (sender, preference);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Initialize the FS module. This method name must match
+ * the library name (libgnunet_XXX => initialize_XXX).
+ *
+ * @return GNUNET_SYSERR on errors
+ */
+int
+initialize_module_fs (GNUNET_CoreAPIForPlugins * capi)
+{
+ ectx = capi->ectx;
+ coreAPI = capi;
+ GNUNET_GE_ASSERT (ectx, sizeof (CHK) == 128);
+ GNUNET_GE_ASSERT (ectx, sizeof (DBlock) == 4);
+ GNUNET_GE_ASSERT (ectx, sizeof (IBlock) == 132);
+ GNUNET_GE_ASSERT (ectx, sizeof (KBlock) == 524);
+ GNUNET_GE_ASSERT (ectx, sizeof (SBlock) == 724);
+ GNUNET_GE_ASSERT (ectx, sizeof (NBlock) == 716);
+ GNUNET_GE_ASSERT (ectx, sizeof (KNBlock) == 1244);
+ GNUNET_GE_ASSERT (ectx, sizeof (P2P_gap_reply_MESSAGE) == 68);
+ GNUNET_GE_ASSERT (ectx, sizeof (P2P_gap_query_MESSAGE) == 144);
+
+ if ( (-1 == GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD",
"HARDCPULIMIT", 0, 100000, /* 1000 CPUs!? */
+ 0, /* 0 == no
limit */
+ &hardCPULimit)) ||
+ (-1 == GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "LOAD",
"HARDUPLIMIT", 0, 999999999, 0, /* 0 == no limit */
+ &hardUpLimit)) )
+ return GNUNET_SYSERR;
+
+ stats = capi->request_service ("stats");
+ if (stats != NULL)
+ {
+ stat_gap_query_received =
+ stats->create (gettext_noop ("# gap requests total received"));
+ stat_gap_content_received =
+ stats->create (gettext_noop ("# gap content total received"));
+ stat_gap_trust_awarded =
+ stats->create (gettext_noop ("# gap total trust awarded"));
+ }
+ identity = capi->request_service ("identity");
+ if (identity == NULL)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ capi->release_service(stats);
+ return GNUNET_SYSERR;
+ }
+ datastore = capi->request_service ("datastore");
+ if (datastore == NULL)
+ {
+ capi->release_service(identity);
+ capi->release_service(stats);
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_FS_lock = GNUNET_mutex_create (GNUNET_YES);
+ GNUNET_FS_ANONYMITY_init(capi);
+ GNUNET_FS_PLAN_init(capi);
+ GNUNET_FS_ONDEMAND_init(capi);
+ GNUNET_FS_PT_init(ectx, stats);
+ GNUNET_FS_QUERYMANAGER_init(capi);
+ GNUNET_FS_DHT_init(capi);
+ GNUNET_FS_GAP_init(capi);
+ GNUNET_FS_MIGRATION_init(capi);
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ _
+ ("`%s' registering client handlers %d %d %d %d %d %d %d and
P2P handlers %d %d\n"),
+ "fs", GNUNET_CS_PROTO_GAP_QUERY_START,
+ GNUNET_CS_PROTO_GAP_INSERT,
+ GNUNET_CS_PROTO_GAP_INDEX, GNUNET_CS_PROTO_GAP_DELETE,
+ GNUNET_CS_PROTO_GAP_UNINDEX, GNUNET_CS_PROTO_GAP_TESTINDEX,
+ GNUNET_CS_PROTO_GAP_INIT_INDEX,
+ GNUNET_P2P_PROTO_GAP_QUERY,
+ GNUNET_P2P_PROTO_GAP_RESULT);
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->registerHandler (GNUNET_P2P_PROTO_GAP_QUERY,
&handle_p2p_query));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->registerHandler (GNUNET_P2P_PROTO_GAP_RESULT,
&handle_p2p_content));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->
+ registerClientHandler (GNUNET_CS_PROTO_GAP_QUERY_START,
+ &handle_cs_query_start_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->registerClientHandler (GNUNET_CS_PROTO_GAP_INSERT,
+ &handle_cs_insert_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->registerClientHandler (GNUNET_CS_PROTO_GAP_INDEX,
+ &handle_cs_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->
+ registerClientHandler (GNUNET_CS_PROTO_GAP_INIT_INDEX,
+ &handle_cs_init_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->registerClientHandler (GNUNET_CS_PROTO_GAP_DELETE,
+ &handle_cs_delete_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->registerClientHandler (GNUNET_CS_PROTO_GAP_UNINDEX,
+ &handle_cs_unindex_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ capi->
+ registerClientHandler (GNUNET_CS_PROTO_GAP_TESTINDEX,
+ &handle_cs_test_indexed_request));
+ GNUNET_GE_ASSERT (capi->ectx,
+ 0 == GNUNET_GC_set_configuration_value_string (capi->cfg,
+ capi->ectx,
+ "ABOUT",
+ "fs",
+ gettext_noop
+ ("enables
(anonymous) file-sharing")));
+ return GNUNET_OK;
+}
+
+void
+done_module_fs ()
+{
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "fs shutdown\n");
+
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->unregisterHandler (GNUNET_P2P_PROTO_GAP_QUERY,
&handle_p2p_query));
+
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->unregisterHandler (GNUNET_P2P_PROTO_GAP_RESULT,
&handle_p2p_content));
+
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ unregisterClientHandler (GNUNET_CS_PROTO_GAP_QUERY_START,
+ &handle_cs_query_start_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ unregisterClientHandler (GNUNET_CS_PROTO_GAP_INSERT,
+ &handle_cs_insert_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ unregisterClientHandler (GNUNET_CS_PROTO_GAP_INDEX,
+ &handle_cs_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ unregisterClientHandler (GNUNET_CS_PROTO_GAP_INIT_INDEX,
+ &handle_cs_init_index_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ unregisterClientHandler (GNUNET_CS_PROTO_GAP_DELETE,
+ &handle_cs_delete_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ unregisterClientHandler (GNUNET_CS_PROTO_GAP_UNINDEX,
+ &handle_cs_unindex_request));
+ GNUNET_GE_ASSERT (ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ unregisterClientHandler (GNUNET_CS_PROTO_GAP_TESTINDEX,
+ &handle_cs_test_indexed_request));
+ GNUNET_FS_MIGRATION_done();
+ GNUNET_FS_GAP_done();
+ GNUNET_FS_DHT_done();
+ GNUNET_FS_QUERYMANAGER_done();
+ GNUNET_FS_PT_done();
+ GNUNET_FS_ONDEMAND_done();
+ GNUNET_FS_PLAN_done();
+ GNUNET_FS_ANONYMITY_done();
+ if (stats != NULL)
+ {
+ coreAPI->release_service (stats);
+ stats = NULL;
+ }
+ coreAPI->release_service (datastore);
+ datastore = NULL;
+ coreAPI->release_service (identity);
+ identity = NULL;
+
+
+ GNUNET_mutex_destroy(GNUNET_FS_lock);
+ GNUNET_FS_lock = NULL;
+}
+
+/* end of fs.c */
Property changes on: GNUnet/src/applications/fs/gap/fs.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/fs_dht.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dht.c (rev 0)
+++ GNUnet/src/applications/fs/gap/fs_dht.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,201 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and
other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/fs_dht.c
+ * @brief integration of file-sharing with the DHT
+ * infrastructure
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_dht_service.h"
+#include "gnunet_protocols.h"
+#include "ecrs_core.h"
+#include "fs.h"
+#include "fs_dht.h"
+#include "querymanager.h"
+
+/**
+ * Linked list containing the DHT get handles
+ * of our active requests.
+ */
+struct ActiveRequestRecords
+{
+
+ struct ActiveRequestRecords * next;
+
+ struct GNUNET_DHT_GetHandle * handle;
+
+ GNUNET_CronTime end_time;
+
+ unsigned int type;
+
+};
+
+static GNUNET_DHT_ServiceAPI * dht;
+
+static GNUNET_CoreAPIForPlugins * coreAPI;
+
+static struct GNUNET_Mutex * lock;
+
+static struct ActiveRequestRecords * records;
+
+/**
+ * Cancel all requests with the DHT that
+ * are older than a certain time limit.
+ */
+static void
+purge_old_records(GNUNET_CronTime limit)
+{
+ struct ActiveRequestRecords * pos;
+ struct ActiveRequestRecords * prev;
+
+ prev = NULL;
+ pos = records;
+ while (pos != NULL)
+ {
+ if (pos->end_time < limit)
+ {
+ if (prev == NULL)
+ records = pos->next;
+ else
+ prev->next = pos->next;
+ dht->get_stop(pos->handle);
+ GNUNET_free(pos);
+ if (prev == NULL)
+ pos = records;
+ else
+ pos = prev->next;
+ }
+ else
+ {
+ prev = pos;
+ pos = pos->next;
+ }
+ }
+}
+
+
+/**
+ * We got a result from the DHT. Check that it is valid
+ * and pass to our clients.
+ *
+ * @param key the current key
+ * @param value the current value
+ * @param cls argument passed for context (closure)
+ * @return GNUNET_OK to continue with iteration, GNUNET_SYSERR to abort
+ */
+static int
+response_callback(const GNUNET_HashCode * key,
+ const GNUNET_DataContainer * value,
+ void *cls)
+{
+ struct ActiveRequestRecords * record = cls;
+ unsigned int size;
+ const DBlock * dblock;
+ GNUNET_HashCode hc;
+
+ size = ntohl(value->size);
+ if (size < 4)
+ {
+ GNUNET_GE_BREAK_OP(NULL, 0);
+ return GNUNET_OK;
+ }
+ dblock = (const DBlock*) &value[1];
+ if ( (GNUNET_SYSERR ==
+ GNUNET_EC_file_block_check_and_get_query(size,
+ dblock,
+ GNUNET_YES,
+ &hc)) ||
+ (0 != memcmp(key,
+ &hc,
+ sizeof(GNUNET_HashCode))) )
+ {
+ GNUNET_GE_BREAK_OP(NULL, 0);
+ return GNUNET_OK;
+ }
+ GNUNET_FS_QUERYMANAGER_handle_response(NULL,
+ &hc,
+ 0,
+ size,
+ dblock);
+ if (record->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+ {
+ record->end_time = 0; /* delete ASAP */
+ return GNUNET_SYSERR; /* no more! */
+ }
+ return GNUNET_OK;
+}
+
+/**
+ * Execute a GAP query. Determines where to forward
+ * the query and when (and captures state for the response).
+ * May also have to check the local datastore.
+ *
+ * @param type type of content requested
+ * @param querie hash code of the query
+ */
+void
+GNUNET_FS_DHT_execute_query(unsigned int type,
+ const GNUNET_HashCode * query)
+{
+ struct ActiveRequestRecords * record;
+ GNUNET_CronTime now;
+
+ if (dht == NULL)
+ return;
+ now = GNUNET_get_time();
+ record = GNUNET_malloc(sizeof(struct ActiveRequestRecords));
+ record->end_time = now + MAX_DHT_DELAY;
+ record->handle = dht->get_start(type,
+ query,
+ &response_callback,
+ record);
+ record->type = type;
+ GNUNET_mutex_lock(lock);
+ record->next = records;
+ records = record;
+ purge_old_records(now);
+ GNUNET_mutex_unlock(lock);
+}
+
+
+int
+GNUNET_FS_DHT_init(GNUNET_CoreAPIForPlugins * capi)
+{
+ coreAPI = capi;
+ lock = GNUNET_mutex_create(GNUNET_YES);
+ dht = capi->request_service("dht");
+ return 0;
+}
+
+int
+GNUNET_FS_DHT_done()
+{
+ purge_old_records(-1);
+ if (dht != NULL)
+ coreAPI->release_service(dht);
+ coreAPI = NULL;
+ GNUNET_mutex_destroy(lock);
+ lock = NULL;
+ return 0;
+}
+
Property changes on: GNUnet/src/applications/fs/gap/fs_dht.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/fs_dht.h
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dht.h (rev 0)
+++ GNUnet/src/applications/fs/gap/fs_dht.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,50 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and
other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/fs_dht.h
+ * @brief integration of file-sharing with the DHT
+ * infrastructure
+ * @author Christian Grothoff
+ */
+#ifndef FS_DHT_H
+#define FS_DHT_H
+
+#include "gnunet_util.h"
+
+int
+GNUNET_FS_DHT_init(GNUNET_CoreAPIForPlugins * capi);
+
+int
+GNUNET_FS_DHT_done(void);
+
+/**
+ * Execute a GAP query. Determines where to forward
+ * the query and when (and captures state for the response).
+ * May also have to check the local datastore.
+ *
+ * @param type type of content requested
+ * @param querie hash code of the query
+ */
+void
+GNUNET_FS_DHT_execute_query(unsigned int type,
+ const GNUNET_HashCode * query);
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/fs_dht.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/gap.c
===================================================================
--- GNUnet/src/applications/fs/gap/gap.c (rev 0)
+++ GNUnet/src/applications/fs/gap/gap.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,391 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and
other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/gap.c
+ * @brief protocol that performs anonymous routing
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_protocols.h"
+#include "gnunet_datastore_service.h"
+#include "gap.h"
+#include "fs.h"
+#include "ondemand.h"
+#include "plan.h"
+#include "pid_table.h"
+
+/**
+ * The GAP routing table.
+ */
+static struct RequestList ** table;
+
+static GNUNET_CoreAPIForPlugins * coreAPI;
+
+static GNUNET_Datastore_ServiceAPI * datastore;
+
+static struct GNUNET_CronManager * cron;
+
+/**
+ * Size of the routing table.
+ */
+static unsigned int table_size;
+
+/**
+ * Constant but peer-dependent value that randomizes the construction
+ * of the indices into the routing table. See
+ * computeRoutingIndex.
+ */
+static unsigned int random_qsel;
+
+
+static unsigned int
+get_table_index(const GNUNET_HashCode * key)
+{
+ unsigned int res
+ = (((unsigned int *) key)[0] ^
+ ((unsigned int *) key)[1] / (1 + random_qsel))
+ % table_size;
+ GNUNET_GE_ASSERT (coreAPI->ectx, res < table_size);
+ return res;
+}
+
+/**
+ * Cron-job to inject (artificially) delayed messages.
+ */
+static void
+send_delayed(void * cls)
+{
+ GNUNET_MessageHeader * msg = cls;
+
+ coreAPI->p2p_inject_message(NULL,
+ (const char*) msg,
+ ntohl(msg->size),
+ GNUNET_YES,
+ NULL);
+ GNUNET_free(msg);
+}
+
+/**
+ * An iterator over a set of Datastore items. This
+ * function is called whenever GAP is processing a
+ * request. It should
+ * 1) abort if the load is getting too high
+ * 2) try on-demand encoding (and if that fails,
+ * discard the entry)
+ * 3) assemble a response and inject it via
+ * loopback WITH a delay
+ *
+ * @param datum called with the next item
+ * @param closure user-defined extra argument
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
+ * GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+datastore_value_processor (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue *
+ value, void *closure,
+ unsigned long long uid)
+{
+ struct RequestList * req = closure;
+ P2P_gap_reply_MESSAGE * msg;
+ GNUNET_DatastoreValue * enc;
+ unsigned int size;
+ unsigned long long et;
+ GNUNET_CronTime now;
+ int ret;
+ GNUNET_HashCode hc;
+ GNUNET_HashCode mhc;
+
+ enc = NULL;
+ if (ntohl(value->type) == GNUNET_ECRS_BLOCKTYPE_ONDEMAND)
+ {
+ if (GNUNET_OK !=
+ GNUNET_FS_ONDEMAND_get_indexed_content(value,
+ key,
+ &enc))
+ return GNUNET_NO;
+ value = enc;
+ }
+ if (req->bloomfilter != NULL)
+ {
+ GNUNET_hash(&value[1],
+ ntohl(value->size) - sizeof(GNUNET_DatastoreValue),
+ &hc);
+ GNUNET_FS_HELPER_mingle_hash(&hc,
+ req->bloomfilter_mutator,
+ &mhc);
+ if (GNUNET_YES == GNUNET_bloomfilter_test(req->bloomfilter,
+ &mhc))
+ return GNUNET_OK; /* not useful */
+ }
+ et = GNUNET_ntohll(value->expirationTime);
+ now = GNUNET_get_time();
+ if (now > et)
+ et -= now;
+ else
+ et = 0;
+ et %= MAX_MIGRATION_EXP;
+ size = sizeof(P2P_gap_reply_MESSAGE) + ntohl(value->size) -
sizeof(GNUNET_DatastoreValue);
+ msg = GNUNET_malloc(size);
+ msg->header.type = htons(GNUNET_P2P_PROTO_GAP_RESULT);
+ msg->header.size = htons(size);
+ msg->reserved = htonl(0);
+ msg->expiration = et;
+ memcpy(&msg[1],
+ &value[1],
+ size - sizeof(P2P_gap_reply_MESSAGE));
+ GNUNET_cron_add_job(cron,
+ send_delayed,
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
+ TTL_DECREMENT),
+ 0,
+ msg);
+ ret = (ntohl(value->type) == GNUNET_ECRS_BLOCKTYPE_DATA) ? GNUNET_SYSERR :
GNUNET_OK;
+ GNUNET_free(msg);
+ GNUNET_free_non_null(enc);
+ return ret;
+}
+
+/**
+ * Execute a GAP query. Determines where to forward
+ * the query and when (and captures state for the response).
+ * Also check the local datastore.
+ *
+ * @param respond_to where to send replies
+ * @param priority how important is the request for us?
+ * @param ttl how long should the query live?
+ * @param type type of content requested
+ * @param query_count how many queries are in the queries array?
+ * @param queries hash codes of the query
+ * @param filter_mutator how to map replies to the bloom filter
+ * @param filter_size size of the bloom filter
+ * @param bloomfilter_data the bloom filter bits
+ */
+void
+GNUNET_FS_GAP_execute_query(const GNUNET_PeerIdentity * respond_to,
+ unsigned int priority,
+ enum GNUNET_FS_RoutingPolicy policy,
+ int ttl,
+ unsigned int type,
+ unsigned int query_count,
+ const GNUNET_HashCode * queries,
+ int filter_mutator,
+ unsigned int filter_size,
+ const void * bloomfilter_data)
+{
+ struct RequestList * rl;
+ PID_INDEX peer;
+ unsigned int index;
+ GNUNET_CronTime now;
+
+ GNUNET_GE_ASSERT(NULL, query_count > 0);
+ peer = GNUNET_FS_PT_intern(respond_to);
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ index = get_table_index(&queries[0]);
+ now = GNUNET_get_time();
+
+ /* check if table is full (and/or delete old entries!) */
+ if ( (table[index] != NULL) &&
+ (table[index]->next != NULL) )
+ {
+ /* limit to at most two entries per slot in table */
+ if ( (now + ttl < table[index]->expiration) &&
+ (now + ttl < table[index]->next->expiration) )
+ {
+ /* do not process */
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+ return;
+ }
+ if (table[index]->expiration >
+ table[index]->next->expiration)
+ {
+ GNUNET_FS_SHARED_free_request_list(table[index]->next);
+ table[index]->next = NULL;
+ }
+ else
+ {
+ rl = table[index];
+ table[index] = rl->next;
+ GNUNET_FS_SHARED_free_request_list(rl);
+ }
+ }
+
+ /* create new table entry */
+ rl = GNUNET_malloc(sizeof(struct RequestList) + (query_count-1) *
sizeof(GNUNET_HashCode));
+ memset(rl, 0, sizeof(struct RequestList));
+ memcpy(&rl->queries[0], queries, query_count * sizeof(GNUNET_HashCode));
+ rl->key_count = query_count;
+ if (filter_size > 0)
+ {
+ rl->bloomfilter_size = filter_size;
+ rl->bloomfilter_mutator = filter_mutator;
+ rl->bloomfilter = GNUNET_bloomfilter_init(coreAPI->ectx,
+ bloomfilter_data,
+ filter_size,
+ GAP_BLOOMFILTER_K);
+ }
+ rl->anonymityLevel = 1;
+ rl->type = type;
+ rl->value = priority;
+ rl->expiration = GNUNET_get_time() + ttl * GNUNET_CRON_SECONDS;
+ rl->next = table[index];
+ rl->response_target = GNUNET_FS_PT_intern(respond_to);
+ table[index] = rl;
+
+ /* check local data store */
+ datastore->get(&queries[0],
+ type,
+ datastore_value_processor,
+ rl);
+ /* if not found or not unique, forward */
+ GNUNET_FS_PLAN_request(NULL, peer, rl);
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+}
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ * for future routing decisions)
+ * @param primary_query hash code used for lookup
+ * (note that namespace membership may
+ * require additional verification that has
+ * not yet been performed; checking the
+ * signature has already been done)
+ * @param expiration relative time until the content
+ * will expire
+ * @param size size of the data
+ * @param data the data itself
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_FS_GAP_handle_response(const GNUNET_PeerIdentity * sender,
+ const GNUNET_HashCode * primary_query,
+ GNUNET_CronTime expiration,
+ unsigned int size,
+ const DBlock * data)
+{
+ GNUNET_HashCode hc;
+ GNUNET_PeerIdentity target;
+ struct RequestList * rl;
+ unsigned int value;
+ P2P_gap_reply_MESSAGE * msg;
+ PID_INDEX rid;
+
+ rid = GNUNET_FS_PT_intern(sender);
+ value = 0;
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ rl = table[get_table_index(primary_query)];
+ while (rl != NULL)
+ {
+ if (GNUNET_OK == GNUNET_FS_SHARED_test_valid_new_response(rl,
+ primary_query,
+ size,
+ data,
+ &hc))
+ {
+ GNUNET_GE_ASSERT(NULL, rl->response_target != 0);
+ GNUNET_FS_PT_resolve(rl->response_target,
+ &target);
+ /* queue response */
+ msg = GNUNET_malloc(sizeof(P2P_gap_reply_MESSAGE) + size);
+ msg->header.type = htons(GNUNET_CS_PROTO_GAP_RESULT);
+ msg->header.size = htons(sizeof(P2P_gap_reply_MESSAGE) + size);
+ msg->reserved = 0;
+ msg->expiration = GNUNET_htonll(expiration);
+ memcpy(&msg[1],
+ data,
+ size);
+ coreAPI->unicast(&target,
+ &msg->header,
+ BASE_REPLY_PRIORITY * (1 + rl->value),
+ MAX_GAP_DELAY);
+ GNUNET_free(msg);
+ if ( (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA) &&
+ (rl->bloomfilter != NULL) )
+ GNUNET_FS_SHARED_mark_response_seen(rl, &hc);
+ GNUNET_FS_PLAN_success(rid, NULL, rl->response_target, rl);
+ value += rl->value;
+ rl->value = 0;
+ }
+ rl = rl->next;
+ }
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+ GNUNET_FS_PT_change_rc(rid, -1);
+ return value;
+}
+
+int
+GNUNET_FS_GAP_init(GNUNET_CoreAPIForPlugins * capi)
+{
+ unsigned long long ts;
+
+ coreAPI = capi;
+ datastore = capi->request_service("datastore");
+ random_qsel = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 0xFFFF);
+ if (-1 ==
+ GNUNET_GC_get_configuration_value_number (coreAPI->cfg, "GAP",
"TABLESIZE",
+ MIN_INDIRECTION_TABLE_SIZE,
+ GNUNET_MAX_GNUNET_malloc_CHECKED
+ /
+ sizeof
+ (struct RequestList *),
+ MIN_INDIRECTION_TABLE_SIZE,
+ &ts))
+ return GNUNET_SYSERR;
+ table_size = ts;
+ table = GNUNET_malloc (sizeof (struct RequestList*) * table_size);
+ memset (table,
+ 0, sizeof (struct RequestList*) * table_size);
+ cron = GNUNET_cron_create(coreAPI->ectx);
+ GNUNET_cron_start(cron);
+ return 0;
+}
+
+int
+GNUNET_FS_GAP_done()
+{
+ unsigned int i;
+ struct RequestList * rl;
+
+ for (i = 0; i < table_size; i++)
+ {
+ while (NULL != (rl = table[i]))
+ {
+ table[i] = rl->next;
+ GNUNET_FS_SHARED_free_request_list(rl);
+ }
+ }
+ GNUNET_free(table);
+ coreAPI->release_service(datastore);
+ datastore = NULL;
+ GNUNET_cron_stop(cron);
+ GNUNET_cron_destroy(cron);
+ return 0;
+}
+
+/* end of gap.c */
Property changes on: GNUnet/src/applications/fs/gap/gap.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/gap.h
===================================================================
--- GNUnet/src/applications/fs/gap/gap.h (rev 0)
+++ GNUnet/src/applications/fs/gap/gap.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,94 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and
other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/gap.h
+ * @brief protocol that performs anonymous routing
+ * @author Christian Grothoff
+ */
+#ifndef GAP_H
+#define GAP_H
+
+#include "gnunet_util.h"
+#include "ecrs_core.h"
+
+enum GNUNET_FS_RoutingPolicy {
+ GNUNET_FS_RoutingPolicy_ANSWER = 1,
+ GNUNET_FS_RoutingPolicy_FORWARD = 2,
+ GNUNET_FS_RoutingPolicy_INDIRECT = 4,
+ GNUNET_FS_RoutingPolicy_ALL = 7
+};
+
+int GNUNET_FS_GAP_init(GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_FS_GAP_done(void);
+
+/**
+ * Execute a GAP query. Determines where to forward
+ * the query and when (and captures state for the response).
+ * Also check the local datastore.
+ *
+ * @param respond_to where to send replies
+ * @param priority how important is the request for us?
+ * @param ttl how long should the query live?
+ * @param type type of content requested
+ * @param query_count how many queries are in the queries array?
+ * @param queries hash codes of the query
+ * @param filter_mutator how to map replies to the bloom filter
+ * @param filter_size size of the bloom filter
+ * @param bloomfilter_data the bloom filter bits
+ */
+void
+GNUNET_FS_GAP_execute_query(const GNUNET_PeerIdentity * respond_to,
+ unsigned int priority,
+ enum GNUNET_FS_RoutingPolicy policy,
+ int ttl,
+ unsigned int type,
+ unsigned int query_count,
+ const GNUNET_HashCode * queries,
+ int filter_mutator,
+ unsigned int filter_size,
+ const void * bloomfilter_data);
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ * for future routing decisions)
+ * @param primary_query hash code used for lookup
+ * (note that namespace membership may
+ * require additional verification that has
+ * not yet been performed; checking the
+ * signature has already been done)
+ * @param expiration relative time until the content
+ * will expire
+ * @param size size of the data
+ * @param data the data itself
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_FS_GAP_handle_response(const GNUNET_PeerIdentity * sender,
+ const GNUNET_HashCode * primary_query,
+ GNUNET_CronTime expiration,
+ unsigned int size,
+ const DBlock * data);
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/gap.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/gap_old.c
===================================================================
--- GNUnet/src/applications/fs/gap/gap_old.c (rev 0)
+++ GNUnet/src/applications/fs/gap/gap_old.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,2688 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007 Christian Grothoff (and other
contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file gap/gap.c
+ * @brief protocol that performs anonymous routing
+ * @author Christian Grothoff
+ *
+ * The code roughly falls into two main functionality groups:
+ *
+ * - keeping track of queries that have been routed,
+ * sending back replies along the path, deciding
+ * which old queries to drop from the routing table
+ * - deciding when to forward which query to which
+ * set of peers; this includes tracking from where
+ * we receive responses to make an educated guess
+ * (also called 'hot path' routing).
+ *
+ */
+
+#include "gap.h"
+
+#include "gnunet_util.h"
+#include "gnunet_core.h"
+#include "gnunet_protocols.h"
+#include "gnunet_gap_service.h"
+#include "gnunet_identity_service.h"
+#include "gnunet_stats_service.h"
+#include "gnunet_traffic_service.h"
+#include "gnunet_topology_service.h"
+#include "pid_table.h"
+
+#define DEBUG_GAP GNUNET_NO
+
+#define EXTRA_CHECKS ALLOW_EXTRA_CHECKS
+
+
+/* ***************** policy constants **************** */
+
+/**
+ * Until which load do we consider the peer idle and do not
+ * charge at all? (should be larger than GNUNET_IDLE_LOAD_THRESHOLD used
+ * by the rest of the code)!
+ */
+#define GAP_IDLE_LOAD_THRESHOLD ((100 + GNUNET_IDLE_LOAD_THRESHOLD) / 2)
+
+/**
+ * For how many different hosts can we have a query pending (at most).
+ * If this threshold is crossed, the hosts waiting list is reset.
+ */
+#define MAX_HOSTS_WAITING 16
+
+/**
+ * How many seen values do we keep at most for any given query before
+ * we kill it (or at least start to do a probabilistic drop).
+ */
+#define MAX_SEEN_VALUES 32
+
+/**
+ * By which amount do we decrement the TTL for simple forwarding /
+ * indirection of the query; in milli-seconds. Set somewhat in
+ * accordance to your network latency (above the time it'll take you
+ * to send a packet and get a reply).
+ */
+#define TTL_DECREMENT 5 * GNUNET_CRON_SECONDS
+
+/**
+ * Default size of the bitmap that we use for marking to which
+ * peers a query has already been sent to. 16 byte = 128 bits
+ */
+#define BITMAP_SIZE 16
+
+/**
+ * Of how many outbound queries do we simultaneously keep track?
+ */
+#define QUERY_RECORD_COUNT 512
+
+/**
+ * How much is a query worth 'in general' (even
+ * if there is no trust relationship between
+ * the peers!). Multiplied by the number of queries
+ * in the request. 20 is for '20 bytes / GNUNET_hash',
+ * so this is kind of the base unit.
+ */
+#define BASE_QUERY_PRIORITY 20
+
+/**
+ * minimum indirection table size, defaults to 8192 entries, reduce if
+ * you have very little memory, enlarge if you start to overflow often
+ * and have memory available.<p>
+ *
+ * If the average query lives for say 1 minute (10 hops), and you have
+ * a 56k connection (= 420 kb/minute, or approximately 8000
+ * queries/minute) the maximum reasonable routing table size would
+ * thus be 8192 entries. Every entry takes about 68 bytes.<p>
+ *
+ * The larger the value is that you pick here, the greater your
+ * anonymity can become. It also can improve your download speed.<p>
+ *
+ * Memory consumption:
+ * <ul>
+ * <li>8192 => 560k indirection table => approx. 6 MB gnunetd</li>
+ * <li>65536 => 4456k indirection table => approx. 10 MB gnuentd</li>
+ * </ul>
+ * <p>
+ * THE VALUE YOU PICK MUST BE A POWER OF 2, for example:
+ * 128, 256, 512, 1024, 2048, 4092, 8192, 16384, 32768, 65536
+ */
+#define MIN_INDIRECTION_TABLE_SIZE 1024
+/* #define MIN_INDIRECTION_TABLE_SIZE 4 */
+
+/**
+ * Under certain cirumstances, two peers can interlock in their
+ * routing such that both have a slot that is blocked exactly until
+ * the other peer will make that slot available. This is the
+ * probability that one will give in. And yes, it's a hack. It
+ * may not be needed anymore once we add collision-resistance to
+ * the routing GNUNET_hash table.
+ */
+#define TIE_BREAKER_CHANCE 4
+
+/**
+ * For how many _local_ requests do we track the current, non-zero
+ * request priorities for rewarding peers that send replies? If this
+ * number is too low, we will 'forget' to reward peers for good
+ * replies (and our routing will degrade). If it is too high, we'll
+ * scan though a large array for each content message and waste
+ * memory.<p>
+ *
+ * A good value reflects the number of concurrent, local queries that
+ * we expect to see.
+ */
+#define MAX_REWARD_TRACKS 128
+
+/**
+ * ITE modes for addToSlot.
+ */
+#define ITE_REPLACE 0
+#define ITE_GNUNET_array_grow 1
+
+
+/**
+ * In this struct, we store information about a
+ * query that is being send from the local node to
+ * optimize the sending strategy.
+ */
+typedef struct
+{
+
+ /**
+ * How often did we send this query so far?
+ */
+ unsigned int sendCount;
+
+ /**
+ * How many nodes were connected when we initated sending this
+ * query?
+ */
+ unsigned int activeConnections;
+
+ /**
+ * What is the total distance of the query to the connected nodes?
+ */
+ unsigned long long totalDistance;
+
+ /**
+ * The message that we are sending.
+ */
+ P2P_gap_query_MESSAGE *msg;
+
+ /**
+ * How important would it be to send the message to all peers in
+ * this bucket?
+ */
+ int *rankings;
+
+ /**
+ * When do we stop forwarding (!) this query?
+ */
+ GNUNET_CronTime expires;
+
+ /**
+ * To which peer will we never send this message?
+ */
+ PID_INDEX noTarget;
+
+ /**
+ * Bit-map marking the hostIndices
(GNUNET_CORE_connection_compute_index_of_peer) of nodes that have
+ * received this query already. Note that the bit-map has a maximum
+ * size, if the index is out-of-bounds, it is hashed into the
+ * smaller size of the bitmap. There may thus be nodes with
+ * identical indices, in that case, only one of the nodes will
+ * receive the query.
+ */
+ unsigned char bitmap[BITMAP_SIZE];
+
+ /**
+ * To how many peers has / will this query be transmitted?
+ */
+ unsigned int transmissionCount;
+
+} QueryRecord;
+
+/**
+ * Indirection table entry. Lists what we're looking for,
+ * where to forward it, and how long to keep looking for it.
+ * Keep this struct as small as possible -- an array of these
+ * takes 80% of GNUnet's memory (for 65536 routing table entries,
+ * the array itself uses about 8 MB of memory; the contents
+ * that the entries point to can easily use another 8 MB at this
+ * point [see Mantis #1058])
+ */
+typedef struct
+{
+ /**
+ * What are we waiting for?
+ */
+ GNUNET_HashCode primaryKey;
+
+ /**
+ * For what type of reply are we waiting?
+ */
+ unsigned int type;
+
+ /**
+ * How much is this query worth to us, that is, how much would
+ * this node be willing to "pay" for an answer that matches the
+ * GNUNET_hash stored in this ITE? (This is NOT the inbound priority,
+ * it is the trust-adjusted inbound priority.)
+ */
+ unsigned int priority;
+
+ /**
+ * When can we forget about this entry?
+ */
+ GNUNET_CronTime ttl;
+
+ /**
+ * Which replies have we already seen?
+ */
+ unsigned int seenIndex;
+
+ int seenReplyWasUnique; /* GNUNET_YES/GNUNET_NO, only valid if
seenIndex == 1 */
+
+ /**
+ * Hashcodes of the encrypted (!) replies that we have forwarded so far
+ */
+ GNUNET_HashCode *seen;
+
+ /**
+ * Who are these hosts?
+ */
+ PID_INDEX *destination;
+
+ /**
+ * How many hosts are waiting for an answer to this query (length of
+ * destination array)
+ */
+ unsigned int hostsWaiting;
+
+ /**
+ * Do we currently have a response in the delay loop (delays are
+ * introduced to make traffic analysis harder and thus enable
+ * anonymity)? This marker is set to avoid looking up content again
+ * before the first content exits the delay loop. Since this *not*
+ * looking up content again is not externally visible, it is ok to
+ * do this optimization to reduce disk accesses (see Mantis bug
+ * #407).
+ */
+ int successful_local_lookup_in_delay_loop;
+
+} IndirectionTableEntry;
+
+
+/**
+ * @brief structure to keep track of which peers send responses
+ * to queries from a certain peer at the moment
+ * Linked list of peer ids with number of replies received.
+ */
+typedef struct RL_
+{
+ struct RL_ *next;
+ PID_INDEX responder;
+ unsigned int responseCount;
+} ResponseList;
+
+/**
+ * Structure for tracking from which peer we got valueable replies for
+ * which clients / other peers.
+ */
+typedef struct RTD_
+{
+
+ /**
+ * This is a linked list.
+ */
+ struct RTD_ *next;
+
+ /**
+ * For which client does this entry track replies?
+ */
+ PID_INDEX queryOrigin;
+
+ /**
+ * Linked list of peers that responded, with
+ * number of responses.
+ */
+ ResponseList *responseList;
+
+ /**
+ * Time at which we received the last reply
+ * for this client. Used to discard old entries
+ * eventually.
+ */
+ GNUNET_Int32Time lastReplyReceived;
+} ReplyTrackData;
+
+/**
+ * Tracking of just reward data (how much trust a peer
+ * can gain for a particular reply).
+ */
+typedef struct
+{
+ GNUNET_HashCode query;
+ unsigned int prio;
+} RewardEntry;
+
+
+/* ********************** GLOBALS ******************** */
+
+/**
+ * Avoiding concurrent lookups for the same ITE: lock to grant
+ * access to peers to perform a lookup that matches this ITE entry.
+ */
+static struct GNUNET_Mutex *lookup_exclusion;
+
+/**
+ * GNUnet core.
+ */
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+/**
+ * Identity service.
+ */
+static GNUNET_Identity_ServiceAPI *identity;
+
+/**
+ * Statistics service.
+ */
+static GNUNET_Stats_ServiceAPI *stats;
+
+static int stat_routing_collisions;
+
+static int stat_routing_direct_drops;
+
+static int stat_routing_successes;
+
+static int stat_routing_request_repeat;
+
+static int stat_routing_request_duplicates;
+
+static int stat_routing_external_ttl;
+
+static int stat_routing_internal_ttl;
+
+static int stat_routing_outbound_ttl;
+
+static int stat_routing_totals;
+
+static int stat_routing_slots_used;
+
+static int stat_routing_forwards;
+
+static int stat_routing_reply_drops;
+
+static int stat_routing_reply_dups;
+
+static int stat_routing_no_route_policy;
+
+static int stat_routing_no_answer_policy;
+
+static int stat_routing_local_results;
+
+static int stat_routing_processed;
+
+static int stat_memory_seen;
+
+static int stat_memory_destinations;
+
+static int stat_pending_rewards;
+
+static int stat_response_count;
+
+/**
+ * Topology service.
+ */
+static GNUNET_Topology_ServiceAPI *topology;
+
+/**
+ * Traffic service.
+ */
+static GNUNET_Traffic_ServiceAPI *traffic;
+
+/**
+ * For migration / local stores, local lookup and
+ * content verification.
+ */
+static GNUNET_Blockstore *bs;
+
+/**
+ * Function that can be used to identify unique
+ * replies.
+ */
+static GNUNET_UniqueReplyIdentifierCallback uri;
+
+static GNUNET_ReplyHashingCallback rhf;
+
+/**
+ * The routing table. This table has entries for all
+ * queries that we have recently send out. It helps
+ * GNUnet to route the replies back to the respective
+ * sender.
+ */
+static IndirectionTableEntry *ROUTING_indTable_;
+
+/**
+ * Size of the indirection table specified in gnunetd.conf
+ */
+static unsigned long long indirectionTableSize;
+
+/**
+ * Constant but peer-dependent value that randomizes the construction
+ * of the indices into the routing table. See
+ * computeRoutingIndex.
+ */
+static unsigned int random_qsel;
+
+/**
+ * Array of the queries we are currently sending out.
+ */
+static QueryRecord queries[QUERY_RECORD_COUNT];
+
+/**
+ * Mutex for all gap structures.
+ */
+static struct GNUNET_Mutex *lock;
+
+/**
+ * Linked list tracking reply statistics. Synchronize access using
+ * the lock!
+ */
+static ReplyTrackData *rtdList;
+
+static RewardEntry *rewards;
+
+static unsigned int rewardSize;
+
+static unsigned int rewardPos;
+
+static long long external_total_ttls;
+
+static unsigned int external_query_count;
+
+static long long internal_total_ttls;
+
+static unsigned int internal_query_count;
+
+static long long outbound_total_ttls;
+
+static unsigned int outbound_query_count;
+
+
+#if DO_HISTOGRAM
+static int histogram[65536];
+static int hist_total;
+#endif
+
+static struct GNUNET_GE_Context *ectx;
+
+static struct GNUNET_GC_Configuration *cfg;
+
+/* ****************** helper functions ***************** */
+
+/**
+ * Adjust the TTL (priority limitation heuristic)
+ */
+static int
+adjustTTL (int ttl, unsigned int prio)
+{
+ if ((ttl > 0) && (ttl > (int) (prio + 3) * TTL_DECREMENT))
+ ttl = (int) (prio + 3) * TTL_DECREMENT; /* bound! */
+ return ttl;
+}
+
+/**
+ * A query has been received. The question is, if it should be
+ * forwarded and if with which priority. Routing decisions(to whom)
+ * are to be taken elsewhere. <p>
+ *
+ * @param sender the host sending us the query
+ * @param priority the priority the query had when it came in,
+ * may be an arbitrary number if the
+ * sender is malicious! Cap by trustlevel first!
+ * Set to the resulting priority.
+ * @return binary encoding: QUERY_XXXX constants
+ */
+static QUERY_POLICY
+evaluateQuery (const GNUNET_PeerIdentity * sender, unsigned int *priority)
+{
+ unsigned int
+
+}
+
+/**
+ * Map the id to an index into the bitmap array.
+ */
+static unsigned int
+getIndex (const GNUNET_PeerIdentity * peer)
+{
+ return ((unsigned int *) peer)[0] % (8 * BITMAP_SIZE);
+}
+
+static void
+setBit (QueryRecord * qr, int bit)
+{
+ unsigned char theBit = (1 << (bit & 7));
+ qr->bitmap[bit >> 3] |= theBit;
+}
+
+static int
+getBit (const QueryRecord * qr, int bit)
+{
+ unsigned char theBit = (1 << (bit & 7));
+ return (qr->bitmap[bit >> 3] & theBit) > 0;
+}
+
+
+/* ************* tracking replies, routing queries ********** */
+
+/**
+ * Cron job that ages the RTD data and that frees
+ * memory for entries that reach 0.
+ */
+static void
+ageRTD (void *unused)
+{
+ ReplyTrackData *pos;
+ ReplyTrackData *prev;
+ ResponseList *rpos;
+ ResponseList *rprev;
+
+ GNUNET_mutex_lock (lock);
+ prev = NULL;
+ pos = rtdList;
+ while (pos != NULL)
+ {
+ /* after 10 minutes, always discard everything */
+ if (pos->lastReplyReceived < GNUNET_get_time_int32 (NULL) - 600)
+ {
+ while (pos->responseList != NULL)
+ {
+ rpos = pos->responseList;
+ pos->responseList = rpos->next;
+ change_pid_rc (rpos->responder, -1);
+ GNUNET_free (rpos);
+ }
+ }
+ /* otherwise, age reply counts */
+ rprev = NULL;
+ rpos = pos->responseList;
+ while (rpos != NULL)
+ {
+ if (stats != NULL)
+ stats->change (stat_response_count, rpos->responseCount / 2);
+ rpos->responseCount = rpos->responseCount / 2;
+ if (rpos->responseCount == 0)
+ {
+ if (rprev == NULL)
+ pos->responseList = rpos->next;
+ else
+ rprev->next = rpos->next;
+ change_pid_rc (rpos->responder, -1);
+ GNUNET_free (rpos);
+ if (rprev == NULL)
+ rpos = pos->responseList;
+ else
+ rpos = rprev->next;
+ continue;
+ }
+ rprev = rpos;
+ rpos = rprev->next;
+ }
+ /* if we have no counts for a peer anymore,
+ free pos entry */
+ if (pos->responseList == NULL)
+ {
+ if (prev == NULL)
+ rtdList = pos->next;
+ else
+ prev->next = pos->next;
+ change_pid_rc (pos->queryOrigin, -1);
+ GNUNET_free (pos);
+ if (prev == NULL)
+ pos = rtdList;
+ else
+ pos = prev->next;
+ continue;
+ }
+ prev = pos;
+ pos = pos->next;
+ }
+ GNUNET_mutex_unlock (lock);
+}
+
+/**
+ * We received a reply from 'responder' to a query received from
+ * 'origin'. Update reply track data!
+ *
+ * @param origin
+ * @param responder peer that send the reply
+ */
+static void
+updateResponseData (PID_INDEX origin, PID_INDEX responder)
+{
+ ReplyTrackData *pos;
+ ReplyTrackData *prev;
+ ResponseList *rpos;
+ ResponseList *rprev;
+
+ if (responder == 0)
+ return; /* we don't track local responses */
+ GNUNET_mutex_lock (lock);
+ pos = rtdList;
+ prev = NULL;
+ while (pos != NULL)
+ {
+ if (origin == pos->queryOrigin)
+ break; /* found */
+ prev = pos;
+ pos = pos->next;
+ }
+ if (pos == NULL)
+ {
+ pos = GNUNET_malloc (sizeof (ReplyTrackData));
+ pos->next = NULL;
+ pos->responseList = NULL;
+ pos->queryOrigin = origin;
+ change_pid_rc (origin, 1);
+ if (prev == NULL)
+ rtdList = pos;
+ else
+ prev->next = pos;
+ }
+ GNUNET_get_time_int32 (&pos->lastReplyReceived);
+ rpos = pos->responseList;
+ rprev = NULL;
+ while (rpos != NULL)
+ {
+ if (responder == rpos->responder)
+ {
+ rpos->responseCount++;
+ if (stats != NULL)
+ stats->change (stat_response_count, 1);
+ GNUNET_mutex_unlock (lock);
+ return;
+ }
+ rprev = rpos;
+ rpos = rpos->next;
+ }
+ rpos = GNUNET_malloc (sizeof (ResponseList));
+ rpos->responseCount = 1;
+ if (stats != NULL)
+ stats->change (stat_response_count, 1);
+ rpos->responder = responder;
+ change_pid_rc (responder, 1);
+ rpos->next = NULL;
+ if (rprev == NULL)
+ pos->responseList = rpos;
+ else
+ rprev->next = rpos;
+ GNUNET_mutex_unlock (lock);
+}
+
+/**
+ * Callback method for filling buffers. This method is invoked by the
+ * core if a message is about to be send and there is space left for a
+ * QUERY. We then search the pending queries and fill one (or more)
+ * in if possible.
+ *
+ * Note that the same query is not transmitted twice to a peer and that
+ * queries are not queued more frequently than 2 TTL_DECREMENT.
+ *
+ * @param receiver the receiver of the message
+ * @param position is the reference to the
+ * first unused position in the buffer where GNUnet is building
+ * the message
+ * @param padding is the number of bytes left in that buffer.
+ * @return the number of bytes written to
+ * that buffer (must be a positive number).
+ */
+static unsigned int
+fillInQuery (const GNUNET_PeerIdentity * receiver,
+ void *position, unsigned int padding)
+{
+ static unsigned int pos = 0;
+ unsigned int start;
+ unsigned int delta;
+ GNUNET_CronTime now;
+ QueryRecord *qr;
+ PID_INDEX receiverId;
+
+ now = GNUNET_get_time ();
+ receiverId = intern_pid (receiver);
+ GNUNET_mutex_lock (lock);
+ start = pos;
+ delta = 0;
+ while (padding - delta > sizeof (P2P_gap_query_MESSAGE))
+ {
+ qr = &queries[pos];
+ if ((qr->expires > now) &&
+ (0 == getBit (qr, getIndex (receiver))) &&
+ (receiverId != qr->noTarget) &&
+ (0 != memcmp (&receiver->hashPubKey,
+ &qr->msg->returnTo.hashPubKey,
+ sizeof (GNUNET_HashCode)))
+ && (padding - delta >= ntohs (qr->msg->header.size)))
+ {
+ setBit (&queries[pos], getIndex (receiver));
+ memcpy (&((char *) position)[delta],
+ qr->msg, ntohs (qr->msg->header.size));
+ qr->sendCount++;
+ delta += ntohs (qr->msg->header.size);
+ }
+ pos++;
+ if (pos >= QUERY_RECORD_COUNT)
+ pos = 0;
+ if (pos == start)
+ break;
+ }
+ GNUNET_mutex_unlock (lock);
+ change_pid_rc (receiverId, -1);
+ return delta;
+}
+
+/**
+ * Select a subset of the peers for forwarding. Called
+ * on each connected node by the core.
+ */
+static void
+hotpathSelectionCode (const GNUNET_PeerIdentity * peer, void *cls)
+{
+ QueryRecord *qr = cls;
+ ReplyTrackData *pos;
+ ResponseList *rp;
+ unsigned int ranking = 0;
+ int distance;
+ PID_INDEX id;
+ unsigned int idx;
+#if DEBUG_GAP
+ GNUNET_EncName enc;
+ GNUNET_EncName enc2;
+#endif
+
+ id = intern_pid (peer);
+ /* compute some basic ranking based on historical
+ queries from the same origin */
+ pos = rtdList;
+ while (pos != NULL)
+ {
+ if (pos->queryOrigin == qr->noTarget)
+ break;
+ pos = pos->next;
+ }
+ rp = NULL;
+ if (pos != NULL)
+ {
+ rp = pos->responseList;
+ while (rp != NULL)
+ {
+ if (rp->responder == id)
+ break;
+ rp = rp->next;
+ }
+ if (rp != NULL)
+ {
+ if (rp->responseCount < 0xFFFF)
+ ranking = 0x7FFF * rp->responseCount;
+ else
+ ranking = 0x7FFFFFF;
+ }
+ }
+ distance = GNUNET_hash_distance_u32 (&qr->msg->queries[0],
&peer->hashPubKey) >> 10; /* change to value in [0:63] */
+ if (distance <= 0)
+ distance = 1;
+ ranking += GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 1 + 0xFFFF * 10 /
(1 + distance)); /* 0 to 20 "response equivalents" for proximity */
+ ranking += GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 0xFFFF); /* 2
"response equivalents" random chance for everyone */
+ if (id == qr->noTarget)
+ ranking = 0; /* no chance for blocked peers */
+ idx = getIndex (peer);
+#if DEBUG_GAP
+ GNUNET_hash_to_enc (&qr->msg->queries[0], &enc);
+ ((char *) &enc)[6] = '\0';
+ GNUNET_hash_to_enc (&peer->hashPubKey, &enc2);
+ ((char *) &enc2)[6] = '\0';
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Q %s peer %2u (%s) ranks (responses: %2u, distance %4d):
%u%s\n",
+ &enc,
+ idx,
+ &enc2,
+ rp == NULL ? 0 : rp->responseCount,
+ distance, ranking, id == qr->noTarget ? " (no target)" : "");
+#endif
+ qr->rankings[idx] = ranking;
+ change_pid_rc (id, -1);
+}
+
+
+/**
+ * A "GNUNET_NodeIteratorCallback" method that forwards the query to the
selected
+ * nodes.
+ */
+static void
+sendToSelected (const GNUNET_PeerIdentity * peer, void *cls)
+{
+ const QueryRecord *qr = cls;
+ PID_INDEX id;
+#if DEBUG_GAP
+ GNUNET_EncName encq;
+ GNUNET_EncName encp;
+#endif
+
+ if (0 ==
+ memcmp (&peer->hashPubKey, &qr->msg->returnTo.hashPubKey,
+ sizeof (GNUNET_HashCode)))
+ return; /* never send back to source */
+
+ /* Load above hard limit? */
+ if (loadTooHigh ())
+ return;
+
+ id = intern_pid (peer);
+ if (id == qr->noTarget)
+ {
+ change_pid_rc (id, -1);
+ return; /* never send back to source */
+ }
+
+ if (getBit (qr, getIndex (peer)) == 1)
+ {
+#if DEBUG_GAP
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&peer->hashPubKey, &encp);
+ GNUNET_hash_to_enc (&qr->msg->queries[0], &encq));
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Sending query `%s' to `%s'\n", &encq, &encp);
+#endif
+ if (stats != NULL)
+ {
+ stats->change (stat_routing_forwards, 1);
+ GNUNET_mutex_lock(lock);
+ outbound_query_count++;
+ outbound_total_ttls += (int) ntohl(qr->msg->ttl);
+ stats->set (stat_routing_outbound_ttl,
+ outbound_total_ttls / outbound_query_count);
+ GNUNET_mutex_unlock(lock);
+ }
+ coreAPI->unicast (peer,
+ &qr->msg->header,
+ BASE_QUERY_PRIORITY * ntohl (qr->msg->priority) * 2,
+ TTL_DECREMENT);
+ }
+ change_pid_rc (id, -1);
+}
+
+/**
+ * Take a query and forward it to the appropriate number of nodes
+ * (depending on load, queue, etc).
+ */
+static void
+forwardQuery (const P2P_gap_query_MESSAGE * msg,
+ const GNUNET_PeerIdentity * target,
+ const GNUNET_PeerIdentity * excludePeer)
+{
+ GNUNET_CronTime now;
+ QueryRecord *qr;
+ QueryRecord dummy;
+ GNUNET_CronTime oldestTime;
+ GNUNET_CronTime expirationTime;
+ int oldestIndex;
+ int i;
+ int j;
+ int noclear = GNUNET_NO;
+ unsigned long long rankingSum;
+ unsigned long long sel;
+ unsigned long long pos;
+ PID_INDEX tpid;
+
+ if (target != NULL)
+ {
+ /* connect to target host -- if known */
+ coreAPI->unicast (target, NULL, ntohl (msg->priority), 0);
+ }
+ now = GNUNET_get_time ();
+ GNUNET_mutex_lock (lock);
+
+ oldestIndex = -1;
+ expirationTime = now + ntohl (msg->ttl);
+ oldestTime = expirationTime;
+ for (i = 0; i < QUERY_RECORD_COUNT; i++)
+ {
+ if (queries[i].expires < oldestTime)
+ {
+ oldestTime = queries[i].expires;
+ oldestIndex = i;
+ }
+ if (queries[i].msg == NULL)
+ continue;
+ if ((queries[i].msg->header.size == msg->header.size) &&
+ (0 == memcmp (&queries[i].msg->queries[0],
+ &msg->queries[0],
+ ntohs (msg->header.size)
+ - sizeof (P2P_gap_query_MESSAGE)
+ + sizeof (GNUNET_HashCode))))
+ {
+ /* We have exactly this query pending already.
+ Replace existing query! */
+ oldestIndex = i;
+ if ((queries[i].expires > now - 4 * TTL_DECREMENT) && /* not long
expired */
+ (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 4) != 0))
+ {
+ /* do not clear the bitmap describing which peers we have
+ forwarded the query to already; but do this only with high
+ probability since we may want to try again if the query is
+ retransmitted lots (this can happen if this is the only
+ query; we may forward it to all connected peers and get no
+ reply. If the initiator keeps retrying, we want to
+ eventually forward it again.
+
+ Note that the initial probability here (0.6.0/0.6.1) was
+ very low (1:64), which is far too low considering that the
+ clients do an exponential back-off. The rule is a pure
+ optimization, and as such the probability that we
+ eventually forward must be significant. 25% seems to work
+ better... (extra-note: in small testbeds, the problem
+ is bigger than in a larger network where the case that
+ a query stays in the QM indefinitely might be much more
+ rare; so don't just trust a micro-scale benchmark when
+ trying to figure out an 'optimal' threshold). */
+ noclear = GNUNET_YES;
+ }
+ break; /* this is it, do not scan for other
+ 'oldest' entries */
+ }
+ }
+ if (oldestIndex == -1)
+ {
+ memset (&dummy, 0, sizeof (QueryRecord));
+ qr = &dummy;
+ }
+ else
+ {
+ qr = &queries[oldestIndex];
+ GNUNET_free_non_null (qr->msg);
+ qr->msg = NULL;
+ }
+ qr->expires = expirationTime;
+ qr->transmissionCount = 0;
+ qr->msg = GNUNET_malloc (ntohs (msg->header.size));
+ memcpy (qr->msg, msg, ntohs (msg->header.size));
+ if (noclear == GNUNET_NO)
+ memset (&qr->bitmap[0], 0, BITMAP_SIZE);
+
+ if (qr->noTarget != 0)
+ change_pid_rc (qr->noTarget, -1);
+ if (excludePeer != NULL)
+ qr->noTarget = intern_pid (excludePeer);
+ else
+ qr->noTarget = intern_pid (coreAPI->myIdentity);
+ qr->totalDistance = 0;
+ qr->rankings = GNUNET_malloc (sizeof (int) * 8 * BITMAP_SIZE);
+ qr->activeConnections
+ = coreAPI-> (&hotpathSelectionCode, qr);
+ /* actual selection, proportional to rankings
+ assigned by hotpathSelectionCode ... */
+ rankingSum = 0;
+ for (i = 0; i < 8 * BITMAP_SIZE; i++)
+ rankingSum += qr->rankings[i];
+ if (qr->activeConnections > 0)
+ {
+ /* select 4 peers for forwarding */
+ for (i = 0; i < 4; i++)
+ {
+ if (rankingSum == 0)
+ break;
+ sel = GNUNET_random_u64 (GNUNET_RANDOM_QUALITY_WEAK, rankingSum);
+ pos = 0;
+ for (j = 0; j < 8 * BITMAP_SIZE; j++)
+ {
+ pos += qr->rankings[j];
+ if (pos > sel)
+ {
+ setBit (qr, j);
+ GNUNET_GE_ASSERT (ectx, rankingSum >= qr->rankings[j]);
+ rankingSum -= qr->rankings[j];
+ qr->rankings[j] = 0;
+ break;
+ }
+ }
+ }
+ }
+ GNUNET_free (qr->rankings);
+ qr->rankings = NULL;
+ if (target != NULL)
+ {
+ tpid = intern_pid (target);
+ setBit (qr, tpid);
+ change_pid_rc (tpid, -1);
+ }
+ /* now forward to a couple of selected nodes */
+ coreAPI-> (&sendToSelected, qr);
+ if (qr == &dummy)
+ {
+ change_pid_rc (dummy.noTarget, -1);
+ GNUNET_free (dummy.msg);
+ }
+ GNUNET_mutex_unlock (lock);
+}
+
+/**
+ * Stop transmitting a certain query (we don't route it anymore or
+ * we have learned the answer).
+ */
+static int
+dequeueQuery (const GNUNET_HashCode * query)
+{
+ int i;
+ int ret;
+ QueryRecord *qr;
+
+ ret = GNUNET_SYSERR;
+ GNUNET_mutex_lock (lock);
+ for (i = 0; i < QUERY_RECORD_COUNT; i++)
+ {
+ qr = &queries[i];
+ if (qr->msg != NULL)
+ {
+ if (0 ==
+ memcmp (query, &qr->msg->queries[0], sizeof (GNUNET_HashCode)))
+ {
+ qr->expires = 0; /* expire NOW! */
+ ret = GNUNET_OK;
+ break;
+ }
+ }
+ }
+ GNUNET_mutex_unlock (lock);
+ return ret;
+}
+
+/* ********** tracking queries, forwarding replies ********** */
+
+/**
+ * Compute the hashtable index of a host id.
+ */
+static unsigned int
+computeRoutingIndex (const GNUNET_HashCode * query)
+{
+ unsigned int res
+ = (((unsigned int *) query)[0] ^
+ ((unsigned int *) query)[1] / (1 + random_qsel))
+ % indirectionTableSize;
+ GNUNET_GE_ASSERT (ectx, res < indirectionTableSize);
+#if DO_HISTOGRAM
+ histogram[res % 65536]++;
+ if (++hist_total % 10000 == 0)
+ {
+ int i;
+ for (i = 0; i < 65536; i++)
+ if (histogram[i] != 0)
+ {
+ printf ("%d: %d\n", i, histogram[i]);
+ }
+ }
+#endif
+ return res;
+}
+
+/**
+ * Use content (forward to whoever sent the query).
+ * @param hostId the peer from where the content came,
+ * NULL for the local peer
+ */
+static int useContent (const GNUNET_PeerIdentity * hostId,
+ const GNUNET_MessageHeader * pmsg);
+
+/**
+ * Call useContent "later" and then free the pmsg.
+ */
+static void
+useContentLater (void *data)
+{
+ GNUNET_MessageHeader *pmsg = data;
+ useContent (NULL, pmsg);
+ GNUNET_free (pmsg);
+}
+
+/**
+ * Queue a reply with cron to simulate
+ * another peer returning the response with
+ * some latency (and then route as usual).
+ *
+ * @param sender the next hop
+ * @param result the content that was found
+ * @param data is a GNUNET_DataContainer which
+ * wraps the content in the format that
+ * can be passed to the FS module (GapWrapper),
+ * which in turn wraps the DBlock (including
+ * the type ID).
+ */
+static int
+queueReply (const GNUNET_PeerIdentity * sender,
+ const GNUNET_HashCode * primaryKey,
+ const GNUNET_DataContainer * data)
+{
+ P2P_gap_reply_MESSAGE *pmsg;
+ IndirectionTableEntry *ite;
+ unsigned int size;
+#if DEBUG_GAP
+ GNUNET_EncName enc;
+
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (primaryKey, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Gap queues reply to query `%s' for later use.\n", &enc);
+#endif
+
+#if EXTRA_CHECKS
+ /* verify data is valid */
+ uri (data, GNUNET_ECRS_BLOCKTYPE_ANY, GNUNET_YES, primaryKey);
+#endif
+
+ ite = &ROUTING_indTable_[computeRoutingIndex (primaryKey)];
+ if (0 != memcmp (&ite->primaryKey, primaryKey, sizeof (GNUNET_HashCode)))
+ {
+#if DEBUG_GAP
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "GAP: Dropping reply, routing table has no query
associated with it (anymore)\n");
+#endif
+ return GNUNET_NO; /* we don't care for the reply (anymore) */
+ }
+ if (GNUNET_YES == ite->successful_local_lookup_in_delay_loop)
+ {
+#if DEBUG_GAP
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "GAP: Dropping reply, found reply locally during
delay\n");
+#endif
+ return GNUNET_NO; /* wow, really bad concurrent DB lookup and
processing for
+ the same query. Well, at least we should
not also
+ queue the delayed reply twice... */
+ }
+ size =
+ sizeof (P2P_gap_reply_MESSAGE) + ntohl (data->size) -
+ sizeof (GNUNET_DataContainer);
+ if (size >= GNUNET_MAX_BUFFER_SIZE)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ ite->successful_local_lookup_in_delay_loop = GNUNET_YES;
+ pmsg = GNUNET_malloc (size);
+ pmsg->header.size = htons (size);
+ pmsg->header.type = htons (GNUNET_P2P_PROTO_GAP_RESULT);
+ pmsg->primaryKey = *primaryKey;
+ memcpy (&pmsg[1], &data[1], size - sizeof (P2P_gap_reply_MESSAGE));
+ /* delay reply, delay longer if we are busy (makes it harder
+ to predict / analyze, too). */
+ GNUNET_cron_add_job (coreAPI->cron,
+ &useContentLater,
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK,
+ TTL_DECREMENT), 0, pmsg);
+ return GNUNET_YES;
+}
+
+static void
+addReward (const GNUNET_HashCode * query, unsigned int prio)
+{
+ if (prio == 0)
+ return;
+ GNUNET_mutex_lock (lock);
+ rewards[rewardPos].query = *query;
+ if (stats != NULL)
+ stats->change (stat_pending_rewards, prio - rewards[rewardPos].prio);
+ rewards[rewardPos].prio = prio;
+ rewardPos++;
+ if (rewardPos == rewardSize)
+ rewardPos = 0;
+ GNUNET_mutex_unlock (lock);
+}
+
+static unsigned int
+claimReward (const GNUNET_HashCode * query)
+{
+ int i;
+ unsigned int ret;
+
+ ret = 0;
+ GNUNET_mutex_lock (lock);
+ for (i = 0; i < rewardSize; i++)
+ {
+ if (0 == memcmp (query, &rewards[i].query, sizeof (GNUNET_HashCode)))
+ {
+ ret += rewards[i].prio;
+ if (stats != NULL)
+ stats->change (stat_pending_rewards, -rewards[i].prio);
+ rewards[i].prio = 0;
+ }
+ }
+ GNUNET_mutex_unlock (lock);
+ return ret;
+}
+
+static void
+resetSeen (IndirectionTableEntry * ite)
+{
+ if (stats != NULL)
+ stats->change (stat_memory_seen, -ite->seenIndex);
+ GNUNET_array_grow (ite->seen, ite->seenIndex, 0);
+}
+
+static void
+resetDestinations (IndirectionTableEntry * ite)
+{
+ decrement_pid_rcs (ite->destination, ite->hostsWaiting);
+ if (stats != NULL)
+ stats->change (stat_memory_destinations, -ite->hostsWaiting);
+ GNUNET_array_grow (ite->destination, ite->hostsWaiting, 0);
+}
+
+/**
+ * Add an entry to the routing table. The lock on the ite
+ * must be held.
+ *
+ * @param mode replace or extend an existing entry?
+ * @param ite slot in the routing table that is manipulated
+ * @param query the query to look for
+ * @param ttl how long to keep the new entry, relative ttl
+ * @param priority how important is the new entry
+ * @param sender for which node is the entry
+ * @return GNUNET_OK if sender was added, GNUNET_SYSERR if existed already
+ * in the queue
+ */
+static int
+addToSlot (int mode,
+ IndirectionTableEntry * ite,
+ const GNUNET_HashCode * query,
+ int ttl, unsigned int priority, PID_INDEX sender)
+{
+ unsigned int i;
+ GNUNET_CronTime now;
+#if DEBUG__GAP
+ GNUNET_EncName enc;
+
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (query, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "GAP: Queueing query '%s' in slot %p\n", &enc, ite);
+#endif
+ GNUNET_GE_ASSERT (ectx, sender != 0); /* do NOT add to RT for local clients!
*/
+ now = GNUNET_get_time ();
+ if ((stats != NULL) && (ite->ttl == 0))
+ stats->change (stat_routing_slots_used, 1);
+
+ if (mode == ITE_REPLACE)
+ {
+ resetSeen (ite);
+ ite->seenReplyWasUnique = GNUNET_NO;
+ if (0 == memcmp (query, &ite->primaryKey, sizeof (GNUNET_HashCode)))
+ {
+ ite->ttl = now + ttl;
+ ite->priority += priority;
+ for (i = 0; i < ite->hostsWaiting; i++)
+ if (ite->destination[i] == sender)
+ return GNUNET_SYSERR;
+ if (ite->hostsWaiting >= MAX_HOSTS_WAITING)
+ resetDestinations (ite);
+ }
+ else
+ {
+ ite->successful_local_lookup_in_delay_loop = GNUNET_NO;
+ /* different request, flush pending queues */
+ dequeueQuery (&ite->primaryKey);
+ ite->primaryKey = *query;
+ resetDestinations (ite);
+ ite->ttl = now + ttl;
+ ite->priority = priority;
+ }
+ }
+ else
+ { /* GNUNET_array_grow mode */
+ GNUNET_GE_ASSERT (ectx,
+ 0 == memcmp (query, &ite->primaryKey,
+ sizeof (GNUNET_HashCode)));
+ /* extend lifetime */
+ if (ite->ttl < now + ttl)
+ ite->ttl = now + ttl;
+ ite->priority += priority;
+ for (i = 0; i < ite->hostsWaiting; i++)
+ if (sender == ite->destination[i])
+ return GNUNET_SYSERR; /* already there! */
+ }
+ if (stats != NULL)
+ stats->change (stat_memory_destinations, 1);
+ GNUNET_array_grow (ite->destination, ite->hostsWaiting,
+ ite->hostsWaiting + 1);
+ ite->destination[ite->hostsWaiting - 1] = sender;
+ change_pid_rc (sender, 1);
+ /* again: new listener, flush seen list */
+ resetSeen (ite);
+ ite->seenReplyWasUnique = GNUNET_NO;
+ return GNUNET_OK;
+}
+
+/**
+ * Find out, if this query is already pending. If the ttl of
+ * the new query is higher than the ttl of an existing query,
+ * GNUNET_NO is returned since we should re-send the query.<p>
+ *
+ * If GNUNET_YES is returned, the slot is also marked as used by
+ * the query and the sender (HostId or socket) is added.<p>
+ *
+ * This method contains a heuristic that attempts to do its best to
+ * route queries without getting too many cycles, send a query and
+ * then drop it from the routing table without sending a response,
+ * etc. Before touching this code, definitely consult Christian
+ * (address@hidden) who has put more bugs in these five lines
+ * of code than anyone on this planet would think is possible.
+ *
+ *
+ * @param query the GNUNET_hash to look for
+ * @param ttl how long would the new query last
+ * @param priority the priority of the query
+ * @param sender which peer transmitted the query?
+ * @param isRouted set to GNUNET_OK if we can route this
+ * query, GNUNET_SYSERR if we can not
+ * @param doForward is set to GNUNET_OK if we should
+ * forward the query, GNUNET_SYSERR if not
+ * @return a case ID for debugging
+ */
+static int
+needsForwarding (const GNUNET_HashCode * query,
+ int ttl,
+ unsigned int priority,
+ PID_INDEX sender, int *isRouted, int *doForward)
+{
+ IndirectionTableEntry *ite;
+ GNUNET_CronTime now;
+ GNUNET_CronTime new_ttl;
+ int equal_to_pending;
+
+ now = GNUNET_get_time ();
+ ite = &ROUTING_indTable_[computeRoutingIndex (query)];
+ equal_to_pending =
+ 0 == memcmp (query, &ite->primaryKey, sizeof (GNUNET_HashCode));
+ if ((stats != NULL) && (equal_to_pending))
+ stats->change (stat_routing_request_duplicates, 1);
+
+ new_ttl = now + ttl;
+ if ((ite->ttl < now) &&
+ (ite->ttl < now - (GNUNET_CronTime) (TTL_DECREMENT * 10L)) &&
+ (ttl > -TTL_DECREMENT * 5))
+ {
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_YES;
+ return 21;
+ }
+ if ((ttl < 0) && (equal_to_pending))
+ {
+ /* if ttl is "expired" and we have
+ the exact query pending, route
+ replies but do NOT forward _again_! */
+ addToSlot (ITE_GNUNET_array_grow, ite, query, ttl, priority, sender);
+ *isRouted = GNUNET_NO;
+ /* don't go again, we are not even going to reset the seen
+ list, so why bother looking locally again, if we would find
+ something, the seen list would block sending the reply anyway
+ since we're not resetting that (ttl too small!)! */
+ *doForward = GNUNET_NO;
+ return 0;
+ }
+
+ if ((ite->ttl < new_ttl) &&
+ (ite->ttl +
+ (GNUNET_CronTime) (TTL_DECREMENT * topology->estimateNetworkSize ()) <
+ new_ttl)
+ && (ite->ttl + (GNUNET_CronTime) (TTL_DECREMENT * 10L) < new_ttl)
+ && (ite->ttl < now))
+ {
+ /* expired AND is significantly (!)
+ longer expired than new query */
+ /* previous entry relatively expired, start using the slot --
+ and kill the old seen list! */
+ resetSeen (ite);
+ ite->seenReplyWasUnique = GNUNET_NO;
+ if ((equal_to_pending) &&
+ (GNUNET_YES == ite->successful_local_lookup_in_delay_loop))
+ {
+ *isRouted = GNUNET_NO;
+ *doForward = GNUNET_NO;
+ addToSlot (ITE_GNUNET_array_grow, ite, query, ttl, priority,
+ sender);
+ return 1;
+ }
+ else
+ {
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_YES;
+ if ((stats != NULL) && (equal_to_pending))
+ {
+ stats->change (stat_routing_request_repeat, 1);
+ }
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ return 2;
+ }
+ }
+ if (equal_to_pending)
+ {
+ if (ite->seenIndex == 0)
+ {
+ if ((ite->ttl < new_ttl) &&
+ (ite->ttl + (GNUNET_CronTime) TTL_DECREMENT < new_ttl))
+ {
+ /* ttl of new is SIGNIFICANTLY longer? */
+ /* query again */
+ if (GNUNET_YES == ite->successful_local_lookup_in_delay_loop)
+ {
+ *isRouted = GNUNET_NO; /* don't go again, we are
already
+ processing a local lookup!
*/
+ *doForward = GNUNET_NO;
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ return 3;
+ }
+ else
+ {
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_YES;
+ if (stats != NULL)
+ stats->change (stat_routing_request_repeat, 1);
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ return 4;
+ }
+ }
+ else
+ {
+ /* new TTL is lower than the old one, thus
+ just wait for the reply that may come back */
+ if (GNUNET_OK ==
+ addToSlot (ITE_GNUNET_array_grow, ite, query, ttl, priority,
+ sender))
+ {
+ if (GNUNET_YES ==
+ ite->successful_local_lookup_in_delay_loop)
+ {
+ *isRouted = GNUNET_NO;
+ /* don't go again, we are already processing a
+ local lookup! */
+ *doForward = GNUNET_NO;
+ return 5;
+ }
+ else
+ {
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_NO;
+ return 6;
+ }
+ }
+ else
+ {
+ *isRouted = GNUNET_NO; /* same query with _higher_
TTL has already been
+ processed FOR THE SAME
recipient! Do NOT do
+ the lookup *again*. */
+ *doForward = GNUNET_NO;
+ return 7;
+ }
+ }
+ }
+ /* ok, we've seen at least one reply before, replace
+ more agressively */
+
+ /* pending == new! */
+ if (ite->seenReplyWasUnique)
+ {
+ if (ite->ttl < new_ttl)
+ { /* ttl of new is longer? */
+ /* go again */
+ resetSeen (ite);
+ ite->seenReplyWasUnique = GNUNET_NO;
+ if (GNUNET_YES == ite->successful_local_lookup_in_delay_loop)
+ {
+ *isRouted = GNUNET_NO;
+ /* don't go again, we are already processing a local lookup!
*/
+ *doForward = GNUNET_NO;
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ return 8;
+ }
+ else
+ {
+ *isRouted = GNUNET_YES;
+ /* only forward if new TTL is significantly higher */
+ if (ite->ttl + TTL_DECREMENT < new_ttl)
+ {
+ *doForward = GNUNET_YES;
+ if (stats != NULL)
+ stats->change (stat_routing_request_repeat, 1);
+ }
+ else
+ *doForward = GNUNET_NO;
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ return 9;
+ }
+ }
+ else
+ {
+ /* new TTL is lower than the old one, thus
+ just wait for the reply that may come back */
+ if (GNUNET_OK ==
+ addToSlot (ITE_GNUNET_array_grow, ite, query, ttl, priority,
+ sender))
+ {
+ if (GNUNET_YES ==
+ ite->successful_local_lookup_in_delay_loop)
+ {
+ *isRouted = GNUNET_NO;
+ *doForward = GNUNET_NO;
+ return 10;
+ }
+ else
+ {
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_NO;
+ return 11;
+ }
+ }
+ else
+ {
+ *isRouted = GNUNET_NO;
+ *doForward = GNUNET_NO;
+ return 12;
+ }
+ }
+ }
+ else
+ { /* KSK or SKS, multiple results possible! */
+ /* It's a pending KSK or SKS that can have multiple
+ replies. Do not re-send, just forward the
+ answers that we get from now on to this additional
+ receiver */
+ int isttlHigher;
+ if (ite->ttl < new_ttl)
+ isttlHigher = GNUNET_NO;
+ else
+ isttlHigher = GNUNET_YES;
+ if (GNUNET_OK ==
+ addToSlot (ITE_GNUNET_array_grow, ite, query, ttl, priority,
+ sender))
+ {
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_NO;
+ return 13;
+ }
+ else
+ {
+ *isRouted = isttlHigher;
+ /* receiver is the same as the one that already got the
+ answer, do not bother to do this again, IF
+ the TTL is not higher! */
+ *doForward = GNUNET_NO;
+ return 14;
+ }
+ }
+ }
+ /* a different query that is expired a bit longer is using
+ the slot; but if it is a query that has received
+ a unique response already, we can eagerly throw it out
+ anyway, since the request has been satisfied
+ completely */
+ if ((ite->ttl + TTL_DECREMENT < new_ttl) &&
+ (ite->ttl < now) && (ite->seenReplyWasUnique))
+ {
+ /* we have seen the unique answer, get rid of it early */
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_YES;
+ return 15;
+ }
+ /* Another still valid query is using the slot. Now we need a _really_
+ good reason to discard it... */
+ if (ttl < 0)
+ {
+ *isRouted = GNUNET_NO;
+ *doForward = GNUNET_NO;
+ if (stats != NULL)
+ stats->change (stat_routing_collisions, 1);
+ return 16; /* if new ttl is "expired", don't bother with
priorities */
+ }
+
+ /* Finally try to find a _strong_ reason looking at priority/ttl
+ relationships to replace the existing query. A low ttl with high
+ priority should be preferred, so we do a cross-multiplication
+ (!). Also, we want a _strong_ reason, so we add a "magic" factor
+ of 10 for the additional work that the replacement would make
+ (the network needs a certain amount of resilience to changes in
+ the routing table, otherwise it might happen that query A
+ replaces query B which replaces query A which could happen so
+ quickly that no response to either query ever makes it through...
+ */
+ if ((long long) ((ite->ttl - now) * priority) >
+ (long long) 10 * (ttl * ite->priority))
+ {
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_YES;
+ return 17;
+ }
+ if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, TIE_BREAKER_CHANCE) == 0)
+ {
+ addToSlot (ITE_REPLACE, ite, query, ttl, priority, sender);
+ *isRouted = GNUNET_YES;
+ *doForward = GNUNET_YES;
+ return 20;
+ }
+ /* sadly, the slot is busy with something else; we can
+ not even add ourselves to the reply set */
+ *isRouted = GNUNET_NO;
+ *doForward = GNUNET_NO;
+ if (stats != NULL)
+ stats->change (stat_routing_collisions, 1);
+
+ return 18;
+}
+
+/**
+ * Send a reply to a host.
+ *
+ * @param ite the matching slot in the indirection table
+ * @param msg the message to route
+ */
+static void
+sendReply (IndirectionTableEntry * ite, const GNUNET_MessageHeader * msg)
+{
+ unsigned int j;
+ unsigned int maxDelay;
+ GNUNET_CronTime now;
+ GNUNET_PeerIdentity recv;
+#if DEBUG_GAP
+ GNUNET_EncName enc;
+#endif
+
+ if (stats != NULL)
+ stats->change (stat_routing_successes, 1);
+ now = GNUNET_get_time ();
+ if (now < ite->ttl)
+ maxDelay = ite->ttl - now;
+ else
+ maxDelay = TTL_DECREMENT; /* for expired queries */
+ /* send to peers */
+ for (j = 0; j < ite->hostsWaiting; j++)
+ {
+ resolve_pid (ite->destination[j], &recv);
+#if DEBUG_GAP
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&recv.hashPubKey, &enc));
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "GAP sending reply to `%s'\n", &enc);
+#endif
+ coreAPI->unicast (&recv, msg, BASE_REPLY_PRIORITY * (ite->priority + 5),
+ /* weigh priority */
+ maxDelay);
+ }
+}
+
+struct qLRC
+{
+ GNUNET_DataContainer **values;
+ unsigned int valueCount;
+ GNUNET_HashCode query;
+};
+
+/**
+ * Callback for processing local results.
+ * Inserts all results into the qLRC closure.
+ *
+ * @param primaryKey is the key needed to decrypt
+ * the block
+ * @param value is a GNUNET_DataContainer which
+ * wraps the content in the format that
+ * can be passed to the FS module (GapWrapper),
+ * which in turn wraps the DBlock (including
+ * the type ID).
+ */
+static int
+queryLocalResultCallback (const GNUNET_HashCode * primaryKey,
+ const GNUNET_DataContainer * value, void *closure)
+{
+ struct qLRC *cls = closure;
+ int i;
+
+#if EXTRA_CHECKS
+ /* verify data is valid */
+ uri (value, GNUNET_ECRS_BLOCKTYPE_ANY, GNUNET_YES, primaryKey);
+#endif
+ /* check seen */
+ if ((cls->valueCount > MAX_SEEN_VALUES) &&
+ (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, cls->valueCount) > 8))
+ return GNUNET_OK; /* statistical drop, too many replies to keep
in memory */
+ for (i = 0; i < cls->valueCount; i++)
+ if (0 == memcmp (value, cls->values[i], ntohl (value->size)))
+ return GNUNET_OK; /* drop, duplicate entry in DB! */
+ GNUNET_array_grow (cls->values, cls->valueCount, cls->valueCount + 1);
+ cls->values[cls->valueCount - 1] = GNUNET_malloc (ntohl (value->size));
+ memcpy (cls->values[cls->valueCount - 1], value, ntohl (value->size));
+ return GNUNET_OK;
+}
+
+/**
+ * Execute a single query. Tests if the query can be routed. If yes,
+ * the query is added to the routing table and the content is looked
+ * for locally. If the content is available locally, a deferred
+ * response is simulated with a cron job and the local content is
+ * marked as valueable. The method returns GNUNET_OK if the query should
+ * subsequently be routed to other peers.
+ *
+ * @param sender next hop in routing of the reply, NULL for us
+ * @param target peer to ask primarily (maybe NULL)
+ * @param prio the effective priority of the query
+ * @param ttl the relative ttl of the query
+ * @param query the query itself
+ * @return GNUNET_OK/GNUNET_YES if the query will be routed further,
+ * GNUNET_NO if we already found the one and only response,
+ * GNUNET_SYSERR if not (out of resources)
+ */
+static int
+execQuery (const GNUNET_PeerIdentity * sender,
+ const GNUNET_PeerIdentity * target,
+ unsigned int prio,
+ QUERY_POLICY policy, int ttl, const P2P_gap_query_MESSAGE * query)
+{
+ IndirectionTableEntry *ite;
+ int isRouted;
+ struct qLRC cls;
+ int i;
+ int max;
+ unsigned int *perm;
+ int doForward;
+ PID_INDEX senderID;
+#if DEBUG_GAP
+ GNUNET_EncName enc;
+#endif
+
+ /* Load above hard limit? */
+ if (loadTooHigh ())
+ return GNUNET_SYSERR;
+ if (rhf == NULL)
+ return GNUNET_SYSERR; /* not fully initialized */
+
+ senderID = intern_pid (sender);
+ GNUNET_GE_ASSERT (ectx, (senderID != 0) || (sender == NULL));
+ ite = &ROUTING_indTable_[computeRoutingIndex (&query->queries[0])];
+ GNUNET_mutex_lock (lookup_exclusion);
+ i = -1;
+ if (sender != NULL)
+ {
+ if (((policy & QUERY_ANSWER) > 0) &&
+ (((policy & QUERY_INDIRECT) > 0) ||
+ (bs->fast_get (&query->queries[0]))))
+ {
+ i = needsForwarding (&query->queries[0],
+ ttl, prio, senderID, &isRouted, &doForward);
+ }
+ else
+ {
+ isRouted = GNUNET_NO;
+ doForward = GNUNET_NO;
+ if (stats != NULL)
+ {
+ if ((policy & QUERY_ANSWER) > 0)
+ stats->change (stat_routing_no_route_policy, 1);
+ else
+ stats->change (stat_routing_no_answer_policy, 1);
+ }
+ }
+ }
+ else
+ {
+ addReward (&query->queries[0], prio);
+ isRouted = GNUNET_YES;
+ doForward = GNUNET_YES;
+ }
+ if ((policy & QUERY_FORWARD) == 0)
+ doForward = GNUNET_NO;
+
+#if DEBUG_GAP
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&query->queries[0], &enc));
+ ((char *) &enc)[6] = '\0';
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_INFO | GNUNET_GE_IMMEDIATE | GNUNET_GE_USER,
+ "GAP is executing request for `%s':%s%s (%d)\n",
+ &enc,
+ doForward ? " forwarding" : "", isRouted ? " routing" : "",
+ i);
+#endif
+ if ((stats != NULL) && (isRouted || doForward))
+ stats->change (stat_routing_processed, 1);
+ cls.values = NULL;
+ cls.valueCount = 0;
+ cls.query = query->queries[0];
+ if ((isRouted == GNUNET_YES) && /* if we can't route, lookup useless!
*/
+ ((policy & QUERY_ANSWER) > 0))
+ {
+ bs->get (bs->closure,
+ ntohl (query->type),
+ prio,
+ 1 + (ntohs (query->header.size)
+ -
+ sizeof (P2P_gap_query_MESSAGE)) /
+ sizeof (GNUNET_HashCode), &query->queries[0],
+ &queryLocalResultCallback, &cls);
+ }
+
+ if (cls.valueCount > 0)
+ {
+ perm = GNUNET_permute (GNUNET_RANDOM_QUALITY_WEAK, cls.valueCount);
+ max =
+ GNUNET_network_monitor_get_load (coreAPI->load_monitor,
+ GNUNET_ND_DOWNLOAD);
+ if (max > 100)
+ max = 100;
+ if (max == -1)
+ max = 50; /* we don't know the load, assume
middle-of-the-road */
+ max = max / 10; /* 1 reply per 10% free capacity */
+ max = 1 + (10 - max);
+ if (max > cls.valueCount)
+ max = cls.valueCount; /* can't send more back then
+ what we have */
+
+ for (i = 0; i < cls.valueCount; i++)
+ {
+ if ((i == 0) &&
+ (GNUNET_SYSERR == bs->put (bs->closure,
+ &query->queries[0],
+ cls.values[perm[i]], ite->priority)))
+ {
+ GNUNET_GE_BREAK (NULL, 0);
+ GNUNET_free (cls.values[perm[i]]);
+ continue;
+ }
+ if ((i < max) &&
+ (sender != NULL) &&
+ (GNUNET_YES == queueReply (sender,
+ &query->queries[0],
+ cls.values[perm[i]]))
+ && (stats != NULL))
+ stats->change (stat_routing_local_results, 1);
+ /* even for local results, always do 'put'
+ (at least to give back results to local client &
+ to update priority; but only do this for
+ the first result */
+ if (uri (cls.values[perm[i]], ite->type, GNUNET_NO, /* no need to
verify local results! */
+ &query->queries[0]))
+ doForward = GNUNET_NO; /* we have the one and only answer,
+ do not bother to forward... */
+ GNUNET_free (cls.values[perm[i]]);
+ }
+ GNUNET_free (perm);
+ }
+ GNUNET_array_grow (cls.values, cls.valueCount, 0);
+ GNUNET_mutex_unlock (lookup_exclusion);
+ if (doForward)
+ forwardQuery (query, target, sender);
+ change_pid_rc (senderID, -1);
+ return doForward;
+}
+
+/**
+ * Content has arrived. We must decide if we want to a) forward it to
+ * our clients b) indirect it to other nodes. The routing module
+ * should know what to do. This method checks the routing table if we
+ * have a matching route and if yes queues the reply. It also makes
+ * sure that we do not send the same reply back on the same route more
+ * than once.
+ *
+ * @param host who sent the content? NULL
+ * for locally found content.
+ * @param msg the p2p reply that was received
+ * @return how good this content was (effective
+ * priority of the original request)
+ */
+static int
+useContent (const GNUNET_PeerIdentity * host,
+ const GNUNET_MessageHeader * pmsg)
+{
+ const P2P_gap_reply_MESSAGE *msg;
+ unsigned int i;
+ GNUNET_HashCode contentHC;
+ IndirectionTableEntry *ite;
+ unsigned int size;
+ int ret;
+ unsigned int prio;
+ GNUNET_DataContainer *value;
+ double preference;
+ PID_INDEX hostId;
+#if DEBUG_GAP
+ GNUNET_EncName enc;
+ GNUNET_EncName enc2;
+#endif
+
+ if (ntohs (pmsg->size) < sizeof (P2P_gap_reply_MESSAGE))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* invalid! */
+ }
+ msg = (const P2P_gap_reply_MESSAGE *) pmsg;
+#if DEBUG_GAP
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ if (host != NULL) GNUNET_hash_to_enc (&host->hashPubKey, &enc));
+ GNUNET_hash_to_enc (&msg->primaryKey, &enc2);
+ ((char *) &enc2)[6] = '\0';
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "GAP received content `%s' from `%s'\n",
+ &enc2, (host != NULL) ? (const char *) &enc : "myself");
+#endif
+
+ ite = &ROUTING_indTable_[computeRoutingIndex (&msg->primaryKey)];
+ ite->successful_local_lookup_in_delay_loop = GNUNET_NO;
+ size = ntohs (msg->header.size) - sizeof (P2P_gap_reply_MESSAGE);
+ prio = 0;
+
+ if (rhf == NULL)
+ {
+ if (stats != NULL)
+ stats->change (stat_routing_reply_drops, 1);
+ return GNUNET_OK; /* not fully initialized! */
+ }
+ value = GNUNET_malloc (size + sizeof (GNUNET_DataContainer));
+ value->size = htonl (size + sizeof (GNUNET_DataContainer));
+ memcpy (&value[1], &msg[1], size);
+ rhf (value, &contentHC);
+
+ /* FIRST: check if valid */
+ ret = bs->put (bs->closure, &msg->primaryKey, value, 0);
+ if (ret == GNUNET_SYSERR)
+ {
+ GNUNET_EncName enc;
+
+ IF_GELOG (ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
+ if (host != NULL) GNUNET_hash_to_enc (&host->hashPubKey,
+ &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _("GAP received invalid content from `%s'\n"),
+ (host != NULL) ? (const char *) &enc : _("myself"));
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ GNUNET_free (value);
+ return GNUNET_SYSERR; /* invalid */
+ }
+
+ /* SECOND: check if seen */
+ GNUNET_mutex_lock (lookup_exclusion);
+ for (i = 0; i < ite->seenIndex; i++)
+ {
+ if (0 == memcmp (&contentHC, &ite->seen[i], sizeof (GNUNET_HashCode)))
+ {
+ GNUNET_mutex_unlock (lookup_exclusion);
+ GNUNET_free (value);
+ if (stats != NULL)
+ stats->change (stat_routing_reply_dups, 1);
+ return 0; /* seen before, useless */
+ }
+ }
+ GNUNET_mutex_unlock (lookup_exclusion);
+
+
+ /* THIRD: compute content priority/value and
+ send remote reply (ITE processing) */
+ hostId = intern_pid (host);
+ GNUNET_mutex_lock (lookup_exclusion);
+ if (0 ==
+ memcmp (&ite->primaryKey, &msg->primaryKey, sizeof (GNUNET_HashCode)))
+ {
+ prio = ite->priority;
+ ite->priority = 0;
+ /* remove the sender from the waiting list
+ (if the sender was waiting for a response) */
+ if (host != NULL)
+ {
+ for (i = 0; i < ite->hostsWaiting; i++)
+ {
+ if (hostId == ite->destination[i])
+ {
+ change_pid_rc (ite->destination[i], -1);
+ ite->destination[i] =
+ ite->destination[ite->hostsWaiting - 1];
+ if (stats != NULL)
+ stats->change (stat_memory_destinations, -1);
+ GNUNET_array_grow (ite->destination,
+ ite->hostsWaiting,
+ ite->hostsWaiting - 1);
+ }
+ }
+ }
+ if (stats != NULL)
+ stats->change (stat_memory_seen, 1);
+ GNUNET_array_grow (ite->seen, ite->seenIndex, ite->seenIndex + 1);
+ ite->seen[ite->seenIndex - 1] = contentHC;
+ if (ite->seenIndex == 1)
+ {
+ ite->seenReplyWasUnique = uri (value, ite->type, GNUNET_NO, /*
already verified */
+ &ite->primaryKey);
+ }
+ else
+ {
+ ite->seenReplyWasUnique = GNUNET_NO;
+ }
+ sendReply (ite, &msg->header);
+ if (ite->seenIndex > MAX_SEEN_VALUES * 2)
+ {
+ /* kill routing entry -- we have seen so many different
+ replies already that we cannot afford to continue
+ to keep track of all of the responses seen (#1014) */
+ resetDestinations (ite);
+ resetSeen (ite);
+ ite->priority = 0;
+ ite->type = 0;
+ ite->ttl = 0;
+ if (stats != NULL)
+ stats->change (stat_routing_slots_used, -1);
+ }
+ }
+ else
+ {
+ if (stats != NULL)
+ stats->change (stat_routing_reply_drops, 1);
+ }
+ GNUNET_mutex_unlock (lookup_exclusion);
+ prio += claimReward (&msg->primaryKey);
+
+ /* FOURTH: update content priority in local datastore */
+ if (prio > 0)
+ {
+ bs->put (bs->closure, &msg->primaryKey, value, prio);
+ }
+
+ /* FIFTH: if unique reply, stop querying */
+ if (uri (value, ite->type, GNUNET_NO, /* already verified */
+ &ite->primaryKey))
+ {
+ /* unique reply, stop forwarding! */
+ dequeueQuery (&ite->primaryKey);
+ }
+ GNUNET_free (value);
+
+ /* SIXTH: adjust traffic preferences */
+ if (host != NULL)
+ { /* if we are the sender, hostId will be NULL */
+ preference = (double) prio;
+ identity->changeHostTrust (host, prio);
+ for (i = 0; i < ite->hostsWaiting; i++)
+ updateResponseData (ite->destination[i], hostId);
+ if (preference < CONTENT_BANDWIDTH_VALUE)
+ preference = CONTENT_BANDWIDTH_VALUE;
+ coreAPI->preferTrafficFrom (host, preference);
+ }
+ change_pid_rc (hostId, -1);
+ return GNUNET_OK;
+}
+
+/* ***************** GAP API implementation ***************** */
+
+/**
+ * Start GAP.
+ *
+ * @param datastore the storage callbacks to use for storing data
+ * @return GNUNET_SYSERR on error, GNUNET_OK on success
+ */
+static int
+init (GNUNET_Blockstore * datastore, GNUNET_UniqueReplyIdentifierCallback uid,
+ GNUNET_ReplyHashingCallback rh)
+{
+ if (bs != NULL)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ bs = datastore;
+ uri = uid;
+ rhf = rh;
+ return GNUNET_OK;
+}
+
+/**
+ * Perform a GET operation using 'key' as the key. Note that no
+ * callback is given for the results since GAP just calls PUT on the
+ * datastore on anything that is received, and the caller will be
+ * listening for these puts.
+ *
+ * @param target peer to ask primarily (maybe NULL)
+ * @param type the type of the block that we're looking for
+ * @param anonymityLevel how much cover traffic is required? 1 for none
+ * (0 does not require GAP, 1 requires GAP but no cover traffic)
+ * @param keys the keys to query for
+ * @param timeout how long to wait until this operation should
+ * automatically time-out
+ * @return GNUNET_OK if we will start to query, GNUNET_SYSERR if all of our
+ * buffers are full or other error, GNUNET_NO if we already
+ * returned the one and only reply (local hit)
+ */
+static int
+get_start (const GNUNET_PeerIdentity * target,
+ unsigned int type,
+ unsigned int anonymityLevel,
+ unsigned int keyCount,
+ const GNUNET_HashCode * keys, GNUNET_CronTime timeout,
+ unsigned int prio)
+{
+ P2P_gap_query_MESSAGE *msg;
+ unsigned int size;
+ int ret;
+ GNUNET_CronTime now;
+
+ size =
+ sizeof (P2P_gap_query_MESSAGE) + (keyCount -
+ 1) * sizeof (GNUNET_HashCode);
+ if (size >= GNUNET_MAX_BUFFER_SIZE)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR; /* too many keys! */
+ }
+
+ /* anonymity level considerations:
+ check cover traffic availability! */
+ if (anonymityLevel > 0)
+ {
+ unsigned int count;
+ unsigned int peers;
+ unsigned int sizes;
+ unsigned int timevect;
+
+ anonymityLevel--;
+ if (traffic == NULL)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _
+ ("Cover traffic requested but traffic service not
loaded. Rejecting request.\n"));
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ traffic->get ((TTL_DECREMENT + timeout) / GNUNET_TRAFFIC_TIME_UNIT,
+ GNUNET_P2P_PROTO_GAP_QUERY,
+ GNUNET_TRAFFIC_TYPE_RECEIVED, &count, &peers, &sizes,
+ &timevect))
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _("Failed to get traffic stats.\n"));
+ return GNUNET_SYSERR;
+ }
+ if (anonymityLevel > 1000)
+ {
+ if (peers < anonymityLevel / 1000)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK |
+ GNUNET_GE_USER,
+ _
+ ("Cannot satisfy desired level of anonymity,
ignoring request.\n"));
+ return GNUNET_SYSERR;
+ }
+ if (count < anonymityLevel % 1000)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK |
+ GNUNET_GE_USER,
+ _
+ ("Cannot satisfy desired level of anonymity,
ignoring request.\n"));
+ return GNUNET_SYSERR;
+ }
+ }
+ else
+ {
+ if (count < anonymityLevel)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK |
+ GNUNET_GE_USER,
+ _
+ ("Cannot satisfy desired level of anonymity,
ignoring request.\n"));
+ return GNUNET_SYSERR;
+ }
+ }
+ }
+
+ now = GNUNET_get_time();
+ if (stats != NULL)
+ {
+ GNUNET_mutex_lock(lock);
+ internal_total_ttls += timeout - now;
+ internal_query_count++;
+ stats->set (stat_routing_internal_ttl,
+ internal_total_ttls / internal_query_count);
+ GNUNET_mutex_unlock(lock);
+ }
+
+ msg = GNUNET_malloc (size);
+ msg->header.size = htons (size);
+ msg->header.type = htons (GNUNET_P2P_PROTO_GAP_QUERY);
+ msg->type = htonl (type);
+ msg->priority = htonl (prio);
+ msg->ttl = htonl (adjustTTL ((int) timeout - now, prio));
+ memcpy (&msg->queries[0], keys, sizeof (GNUNET_HashCode) * keyCount);
+ msg->returnTo = *coreAPI->myIdentity;
+ ret = execQuery (NULL,
+ target,
+ prio,
+ QUERY_ANSWER | QUERY_FORWARD | QUERY_INDIRECT,
+ timeout - GNUNET_get_time (), msg);
+ GNUNET_free (msg);
+ return ret;
+}
+
+/**
+ * Stop sending out queries for a given key. GAP will automatically
+ * stop sending queries at some point, but this method can be used to
+ * stop it earlier.
+ */
+static int
+get_stop (unsigned int type, unsigned int keyCount,
+ const GNUNET_HashCode * keys)
+{
+ if (keyCount < 1)
+ return GNUNET_SYSERR;
+ return dequeueQuery (&keys[0]);
+}
+
+/**
+ * Try to migrate the given content.
+ *
+ * @param data the content to migrate
+ * @param position where to write the message
+ * @param padding the maximum size that the message may be
+ * @return the number of bytes written to
+ * that buffer (must be a positive number).
+ */
+static unsigned int
+tryMigrate (const GNUNET_DataContainer * data,
+ const GNUNET_HashCode * primaryKey,
+ char *position, unsigned int padding)
+{
+ P2P_gap_reply_MESSAGE *reply;
+ unsigned int size;
+
+ size =
+ sizeof (P2P_gap_reply_MESSAGE) + ntohl (data->size) -
+ sizeof (GNUNET_DataContainer);
+ if ((size > padding) || (size >= GNUNET_MAX_BUFFER_SIZE))
+ return 0;
+ reply = (P2P_gap_reply_MESSAGE *) position;
+ reply->header.type = htons (GNUNET_P2P_PROTO_GAP_RESULT);
+ reply->header.size = htons (size);
+ reply->primaryKey = *primaryKey;
+ memcpy (&reply[1], &data[1], size - sizeof (P2P_gap_reply_MESSAGE));
+ return size;
+}
+
+/**
+ * Handle query for content. Depending on how we like the sender,
+ * lookup, forward or even indirect.
+ */
+static int
+handleQuery (const GNUNET_PeerIdentity * sender,
+ const GNUNET_MessageHeader * msg)
+{
+ QUERY_POLICY policy;
+ P2P_gap_query_MESSAGE *qmsg;
+ unsigned int queries;
+ int ttl;
+ unsigned int prio;
+ double preference;
+#if DEBUG_GAP
+ GNUNET_EncName enc;
+#endif
+
+ if (bs == NULL)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return 0;
+ }
+ /* Load above hard limit? */
+ if (loadTooHigh ())
+ {
+#if DEBUG_GAP
+ if (sender != NULL)
+ {
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&sender->hashPubKey, &enc));
+ }
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Dropping query from %s, this peer is too busy.\n",
+ sender == NULL ? "localhost" : (char *) &enc);
+#endif
+ return GNUNET_OK;
+ }
+ queries = 1 + (ntohs (msg->size) - sizeof (P2P_gap_query_MESSAGE))
+ / sizeof (GNUNET_HashCode);
+ if ((queries <= 0) ||
+ (ntohs (msg->size) < sizeof (P2P_gap_query_MESSAGE)) ||
+ (ntohs (msg->size) != sizeof (P2P_gap_query_MESSAGE) +
+ (queries - 1) * sizeof (GNUNET_HashCode)))
+ {
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ return GNUNET_SYSERR; /* malformed query */
+ }
+
+ qmsg = GNUNET_malloc (ntohs (msg->size));
+ memcpy (qmsg, msg, ntohs (msg->size));
+ if (0 == memcmp (&qmsg->returnTo.hashPubKey,
+ &coreAPI->myIdentity->hashPubKey,
+ sizeof (GNUNET_HashCode)))
+ {
+ /* A to B, B sends to C without source rewriting,
+ C sends back to A again without source rewriting;
+ (or B directly back to A; also should not happen)
+ in this case, A must just drop; however, this
+ should not happen (peers should check). */
+ GNUNET_GE_BREAK_OP (ectx, 0);
+ GNUNET_free (qmsg);
+ return GNUNET_OK;
+ }
+ if (stats != NULL)
+ stats->change (stat_routing_totals, 1);
+
+ if (stats != NULL)
+ {
+ GNUNET_mutex_lock(lock);
+ external_total_ttls += (int) ntohl(qmsg->ttl);
+ external_query_count++;
+ stats->set (stat_routing_external_ttl,
+ external_total_ttls / external_query_count);
+ GNUNET_mutex_unlock(lock);
+ }
+
+ /* decrement ttl (always) */
+ ttl = ntohl (qmsg->ttl);
+ if (ttl < 0)
+ {
+ ttl =
+ ttl - 2 * TTL_DECREMENT -
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, TTL_DECREMENT);
+ if (ttl > 0)
+ { /* integer underflow => drop (should be very
rare)! */
+ GNUNET_free (qmsg);
+ if (stats != NULL)
+ stats->change (stat_routing_direct_drops, 1);
+ return GNUNET_OK; /* just abort */
+ }
+ }
+ else
+ {
+ ttl =
+ ttl - 2 * TTL_DECREMENT -
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, TTL_DECREMENT);
+ }
+ prio = ntohl (qmsg->priority);
+ policy = evaluateQuery (sender, &prio);
+#if DEBUG_GAP
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&qmsg->queries[0], &enc));
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Received GAP query `%s'.\n", &enc);
+#endif
+ if ((policy & QUERY_DROPMASK) == 0)
+ {
+ /* policy says no answer/forward/indirect => direct drop;
+ this happens if the peer is too busy (netload-up >= 100%). */
+ GNUNET_free (qmsg);
+#if DEBUG_GAP
+ if (sender != NULL)
+ {
+ IF_GELOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&sender->hashPubKey, &enc));
+ }
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Dropping query from %s, policy decided that this peer is
too busy.\n",
+ sender == NULL ? "localhost" : (const char *) &enc);
+#endif
+ if (stats != NULL)
+ stats->change (stat_routing_direct_drops, 1);
+ return GNUNET_OK; /* straight drop. */
+ }
+ preference = (double) prio;
+ if ((policy & QUERY_INDIRECT) > 0)
+ {
+ qmsg->returnTo = *coreAPI->myIdentity;
+ }
+ else
+ {
+ /* otherwise we preserve the original sender
+ and kill the priority (since we cannot benefit) */
+ prio = 0;
+ }
+
+ if (preference < QUERY_BANDWIDTH_VALUE)
+ preference = QUERY_BANDWIDTH_VALUE;
+ coreAPI->preferTrafficFrom (sender, preference);
+ /* adjust priority */
+ qmsg->priority = htonl (prio);
+ qmsg->ttl = htonl (adjustTTL (ttl, prio));
+
+ ttl = ntohl (qmsg->ttl);
+ if (ttl < 0)
+ ttl = 0;
+ execQuery (sender, NULL, prio, policy, ttl, qmsg);
+ GNUNET_free (qmsg);
+ return GNUNET_OK;
+}
+
+static unsigned int
+getAvgPriority ()
+{
+ IndirectionTableEntry *ite;
+ unsigned long long tot;
+ int i;
+ unsigned int active;
+
+ tot = 0;
+ active = 0;
+ for (i = indirectionTableSize - 1; i >= 0; i--)
+ {
+ ite = &ROUTING_indTable_[i];
+ if ((ite->hostsWaiting > 0) && (ite->seenIndex == 0))
+ {
+ tot += ite->priority;
+ active++;
+ }
+ }
+ if (active == 0)
+ return 0;
+ else
+ return (unsigned int) (tot / active);
+}
+
+
+GNUNET_GAP_ServiceAPI *
+provide_module_gap (GNUNET_CoreAPIForPlugins * capi)
+{
+ static GNUNET_GAP_ServiceAPI api;
+ unsigned int i;
+
+ ectx = capi->ectx;
+ cfg = capi->cfg;
+ coreAPI = capi;
+ || (-1 ==
+ GNUNET_GC_get_configuration_value_number (cfg, "GAP", "TABLESIZE",
+ MIN_INDIRECTION_TABLE_SIZE,
+
GNUNET_MAX_GNUNET_malloc_CHECKED
+ /
+ sizeof
+ (IndirectionTableEntry),
+ MIN_INDIRECTION_TABLE_SIZE,
+ &indirectionTableSize)))
+ return NULL;
+
+ stats = capi->request_service ("stats");
+ if (stats != NULL)
+ {
+ stat_routing_totals =
+ stats->create (gettext_noop ("# gap requests total received"));
+ stat_routing_direct_drops =
+ stats->
+ create (gettext_noop ("# gap requests policy: immediate drop"));
+ stat_routing_no_route_policy =
+ stats->create (gettext_noop ("# gap requests policy: not routed"));
+ stat_routing_no_answer_policy =
+ stats->create (gettext_noop ("# gap requests policy: not answered"));
+ stat_routing_processed =
+ stats->
+ create (gettext_noop
+ ("# gap requests processed: attempted add to RT"));
+ stat_routing_local_results =
+ stats->
+ create (gettext_noop ("# gap requests processed: local result"));
+ stat_routing_successes =
+ stats->create (gettext_noop ("# gap routing successes (total)"));
+ stat_routing_collisions =
+ stats->
+ create (gettext_noop ("# gap requests dropped: collision in RT"));
+ stat_routing_forwards =
+ stats->
+ create (gettext_noop
+ ("# gap requests forwarded (counting each peer)"));
+ stat_routing_request_duplicates =
+ stats->create (gettext_noop ("# gap duplicate requests (pending)"));
+ stat_routing_request_repeat =
+ stats->
+ create (gettext_noop ("# gap duplicate requests that were re-tried"));
+ stat_routing_external_ttl =
+ stats->
+ create (gettext_noop ("# gap average inbound ttl"));
+ stat_routing_internal_ttl =
+ stats->
+ create (gettext_noop ("# gap average client ttl"));
+ stat_routing_outbound_ttl =
+ stats->
+ create (gettext_noop ("# gap average outbound ttl"));
+
+ stat_routing_reply_dups =
+ stats->create (gettext_noop ("# gap reply duplicates"));
+ stat_routing_reply_drops =
+ stats->create (gettext_noop ("# gap spurious replies (dropped)"));
+ stat_routing_slots_used =
+ stats->create (gettext_noop ("# gap routing slots currently in use"));
+ stat_memory_seen =
+ stats->
+ create (gettext_noop ("# gap memory used for tracking seen content"));
+ stat_memory_destinations =
+ stats->
+ create (gettext_noop
+ ("# gap memory used for tracking routing destinations"));
+ stat_pending_rewards =
+ stats->create (gettext_noop ("# gap rewards pending"));
+ stat_response_count =
+ stats->create (gettext_noop ("# gap response weights"));
+ }
+ init_pid_table (ectx, stats);
+ GNUNET_array_grow (rewards, rewardSize, MAX_REWARD_TRACKS);
+
+
+ identity = coreAPI->request_service ("identity");
+ GNUNET_GE_ASSERT (ectx, identity != NULL);
+ topology = coreAPI->request_service ("topology");
+ GNUNET_GE_ASSERT (ectx, topology != NULL);
+ traffic = coreAPI->request_service ("traffic");
+ if (traffic == NULL)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _
+ ("Traffic service failed to load; gap cannot ensure
cover-traffic availability.\n"));
+ }
+ random_qsel = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 0xFFFF);
+ lookup_exclusion = GNUNET_mutex_create (GNUNET_NO);
+ ROUTING_indTable_
+ = GNUNET_malloc (sizeof (IndirectionTableEntry) * indirectionTableSize);
+ memset (ROUTING_indTable_,
+ 0, sizeof (IndirectionTableEntry) * indirectionTableSize);
+ for (i = 0; i < indirectionTableSize; i++)
+ {
+ ROUTING_indTable_[i].successful_local_lookup_in_delay_loop = GNUNET_NO;
+ }
+
+ for (i = 0; i < QUERY_RECORD_COUNT; i++)
+ {
+ queries[i].expires = 0; /* all expired */
+ queries[i].msg = NULL;
+ }
+ lock = coreAPI->connection_get_lock ();
+ GNUNET_cron_add_job (capi->cron, &ageRTD, 2 * GNUNET_CRON_MINUTES,
+ 2 * GNUNET_CRON_MINUTES, NULL);
+
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ _("`%s' registering handlers %d %d\n"),
+ "gap", GNUNET_P2P_PROTO_GAP_QUERY,
+ GNUNET_P2P_PROTO_GAP_RESULT);
+ coreAPI->
+ connection_register_send_callback (sizeof
+ (P2P_gap_query_MESSAGE), &fillInQuery);
+
+ api.init = &init;
+ api.get_start = &get_start;
+ api.get_stop = &get_stop;
+ api.tryMigrate = &tryMigrate;
+ api.getAvgPriority = &getAvgPriority;
+ return &api;
+}
+
+void
+release_module_gap ()
+{
+ unsigned int i;
+ ResponseList *rpos;
+ ReplyTrackData *pos;
+ IndirectionTableEntry *ite;
+
+ coreAPI->
+ connection_unregister_send_callback (sizeof
+ (P2P_gap_query_MESSAGE),
+ &fillInQuery);
+
+ GNUNET_cron_del_job (coreAPI->cron, &ageRTD, 2 * GNUNET_CRON_MINUTES, NULL);
+
+ for (i = 0; i < indirectionTableSize; i++)
+ {
+ ite = &ROUTING_indTable_[i];
+ resetSeen (ite);
+ ite->seenReplyWasUnique = GNUNET_NO;
+ resetDestinations (ite);
+ }
+
+ GNUNET_mutex_destroy (lookup_exclusion);
+ lookup_exclusion = NULL;
+ while (rtdList != NULL)
+ {
+ pos = rtdList;
+ rtdList = rtdList->next;
+ while (pos->responseList != NULL)
+ {
+ rpos = pos->responseList;
+ pos->responseList = rpos->next;
+ GNUNET_free (rpos);
+ }
+ GNUNET_free (pos);
+ }
+ for (i = 0; i < QUERY_RECORD_COUNT; i++)
+ GNUNET_free_non_null (queries[i].msg);
+
+ coreAPI->release_service (identity);
+ identity = NULL;
+ coreAPI->release_service (topology);
+ topology = NULL;
+ if (traffic != NULL)
+ {
+ coreAPI->release_service (traffic);
+ traffic = NULL;
+ }
+ GNUNET_free (ROUTING_indTable_);
+ GNUNET_array_grow (rewards, rewardSize, 0);
+ done_pid_table ();
+ if (stats != NULL)
+ {
+ stats->set (stat_pending_rewards, 0);
+ coreAPI->release_service (stats);
+ stats = NULL;
+ }
+ lock = NULL;
+ coreAPI = NULL;
+ bs = NULL;
+ uri = NULL;
+ ectx = NULL;
+ cfg = NULL;
+}
+
+/* end of gap.c */
Property changes on: GNUnet/src/applications/fs/gap/gap_old.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/migration.c
===================================================================
--- GNUnet/src/applications/fs/gap/migration.c (rev 0)
+++ GNUnet/src/applications/fs/gap/migration.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,393 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2003, 2004, 2005, 2007 Christian Grothoff (and other
contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/fs/module/migration.c
+ * @brief This module is responsible for pushing content out
+ * into the network.
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "migration.h"
+#include "fs.h"
+#include "gnunet_datastore_service.h"
+#include "gnunet_dht_service.h"
+#include "gnunet_stats_service.h"
+#include "gnunet_protocols.h"
+#include "anonymity.h"
+#include "ondemand.h"
+
+#define DEBUG_MIGRATION GNUNET_NO
+
+/**
+ * To how many peers may we migrate the same piece of content during
+ * one iteration? Higher values mean less IO, but also migration
+ * becomes quickly much less effective (everyone has the same
+ * content!). Also, numbers larger than the number of connections are
+ * simply a waste of memory.
+ */
+#define MAX_RECEIVERS 16
+
+/**
+ * How many migration records do we keep in memory
+ * at the same time? Each record is about 32k, so
+ * 64 records will use about 2 MB of memory.
+ * We might want to allow users to specify larger
+ * values in the configuration file some day.
+ */
+#define MAX_RECORDS 64
+
+/**
+ * How often do we poll the datastore for content (at most).
+ */
+#define MAX_POLL_FREQUENCY (250 * GNUNET_CRON_MILLISECONDS)
+
+/**
+ * Datastore service.
+ */
+static GNUNET_Datastore_ServiceAPI *datastore;
+
+/**
+ * Global core API.
+ */
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+/**
+ * DHT service. Maybe NULL!
+ */
+static GNUNET_DHT_ServiceAPI *dht;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static int stat_migration_count;
+
+static int stat_migration_factor;
+
+static int stat_on_demand_migration_attempts;
+
+/**
+ * Lock used to access content.
+ */
+static struct GNUNET_Mutex *lock;
+
+struct MigrationRecord
+{
+ GNUNET_DatastoreValue *value;
+ GNUNET_HashCode key;
+ unsigned int receiverIndices[MAX_RECEIVERS];
+ unsigned int sentCount;
+};
+
+static struct MigrationRecord content[MAX_RECORDS];
+
+/**
+ * Callback method for pushing content into the network.
+ * The method chooses either a "recently" deleted block
+ * or content that has a GNUNET_hash close to the receiver ID
+ * (randomized to guarantee diversity, unpredictability
+ * etc.).<p>
+ *
+ * @param receiver the receiver of the message
+ * @param position is the reference to the
+ * first unused position in the buffer where GNUnet is building
+ * the message
+ * @param padding is the number of bytes left in that buffer.
+ * @return the number of bytes written to
+ * that buffer (must be a positive number).
+ */
+static unsigned int
+activeMigrationCallback (const GNUNET_PeerIdentity * receiver,
+ void *position, unsigned int padding)
+{
+ static GNUNET_CronTime discard_time;
+ unsigned int ret;
+ unsigned int size;
+ GNUNET_CronTime et;
+ GNUNET_CronTime now;
+ unsigned int anonymity;
+ GNUNET_DatastoreValue *enc;
+ GNUNET_DatastoreValue *value;
+ P2P_gap_reply_MESSAGE * msg;
+ unsigned int index;
+ int entry;
+ int discard_entry;
+ int discard_match;
+ int i;
+ int j;
+ int match;
+ unsigned int dist;
+ unsigned int minDist;
+
+ index = coreAPI->connection_compute_index_of_peer (receiver);
+ GNUNET_mutex_lock (lock);
+ now = GNUNET_get_time ();
+ entry = -1;
+ discard_entry = -1;
+ discard_match = -1;
+ minDist = -1; /* max */
+ for (i = 0; i < MAX_RECORDS; i++)
+ {
+ if (content[i].value == NULL)
+ {
+ if (discard_time >= now - MAX_POLL_FREQUENCY)
+ continue;
+ discard_time = now;
+ if (GNUNET_OK !=
+ datastore->getRandom (&content[i].key, &content[i].value))
+ {
+ content[i].value = NULL; /* just to be sure... */
+ continue;
+ }
+ else
+ {
+ if (stats != NULL)
+ stats->change (stat_migration_factor, 1);
+ }
+ }
+ match = 1;
+ if (ntohl (content[i].value->size) + sizeof (P2P_gap_reply_MESSAGE) -
+ sizeof (GNUNET_DatastoreValue) <= padding)
+ {
+ match = 0;
+ for (j = 0; j < content[i].sentCount; j++)
+
+ {
+ if (content[i].receiverIndices[j] == index)
+ {
+ match = 1;
+ break;
+ }
+ }
+ }
+ if (match == 0)
+ {
+ dist =
+ GNUNET_hash_distance_u32 (&content[i].key, &receiver->hashPubKey);
+ if (dist <= minDist)
+ {
+ entry = i;
+ minDist = dist;
+ break;
+ }
+ }
+ else
+ {
+ if ((content[i].sentCount > discard_match) || (discard_match == -1))
+ {
+ discard_match = content[i].sentCount;
+ discard_entry = i;
+ }
+ }
+ }
+ if ((discard_entry != -1) &&
+ (discard_match > MAX_RECEIVERS / 2) &&
+ (discard_time < now - MAX_POLL_FREQUENCY))
+ {
+ discard_time = now;
+ GNUNET_free_non_null (content[discard_entry].value);
+ content[discard_entry].value = NULL;
+ content[discard_entry].sentCount = 0;
+ if (GNUNET_OK != datastore->getRandom (&content[discard_entry].key,
+ &content[discard_entry].value))
+ {
+ content[discard_entry].value = NULL; /* just to be sure... */
+ discard_entry = -1;
+ }
+ else
+ {
+ if (stats != NULL)
+ stats->change (stat_migration_factor, 1);
+ }
+ }
+ if (entry == -1)
+ entry = discard_entry;
+ if (entry == -1)
+ {
+#if DEBUG_MIGRATION
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Migration: no content available for migration.\n");
+#endif
+ GNUNET_mutex_unlock (lock);
+ return 0;
+ }
+ value = content[entry].value;
+ if (value == NULL)
+ {
+ GNUNET_GE_ASSERT (NULL, 0);
+ GNUNET_mutex_unlock (lock);
+ return 0;
+ }
+ size =
+ sizeof (P2P_gap_reply_MESSAGE) + ntohl (value->size) -
+ sizeof (GNUNET_DatastoreValue);
+ if (size > padding)
+ {
+#if DEBUG_MIGRATION
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Migration: available content too big (%u > %u) for
migration.\n",
+ size, padding);
+#endif
+ GNUNET_mutex_unlock (lock);
+ return 0;
+ }
+#if DEBUG_MIGRATION
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_BULK | GNUNET_GE_USER,
+ "Migration: random lookup in datastore returned type %d.\n",
+ ntohl (value->type));
+#endif
+ if ((ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_ONDEMAND) ||
+ (ntohl (value->type) == GNUNET_ECRS_BLOCKTYPE_ONDEMAND_OLD))
+ {
+ if (GNUNET_FS_ONDEMAND_get_indexed_content (value, &content[entry].key,
&enc) != GNUNET_OK)
+ {
+ GNUNET_free_non_null (value);
+ content[entry].value = NULL;
+ GNUNET_mutex_unlock (lock);
+#if DEBUG_MIGRATION
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Migration: failed to locate indexed content for
migration.\n");
+#endif
+ return 0;
+ }
+ if (stats != NULL)
+ stats->change (stat_on_demand_migration_attempts, 1);
+ content[entry].value = enc;
+ GNUNET_free (value);
+ value = enc;
+ }
+ size =
+ sizeof (P2P_gap_reply_MESSAGE) + ntohl (value->size) -
+ sizeof (GNUNET_DatastoreValue);
+ if (size > padding)
+ {
+ GNUNET_mutex_unlock (lock);
+#if DEBUG_MIGRATION
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Migration: available content too big (%u > %u) for
migration.\n",
+ size, padding);
+#endif
+ return 0;
+ }
+ msg = position;
+ et = GNUNET_ntohll (value->expirationTime);
+ if (et > now)
+ et -= now;
+ else
+ et = 0;
+ et %= MAX_MIGRATION_EXP;
+ anonymity = ntohl (value->anonymityLevel);
+ ret = 0;
+ if ( (anonymity == 0) ||
+ (GNUNET_OK == GNUNET_FS_ANONYMITY_check(anonymity,
+ GNUNET_P2P_PROTO_GAP_RESULT)) )
+ {
+ msg->header.type = htons(GNUNET_P2P_PROTO_GAP_RESULT);
+ msg->header.size = htons(size);
+ msg->reserved = htonl(0);
+ msg->expiration = GNUNET_htonll(et);
+ memcpy(&msg[1],
+ &value[1],
+ size - sizeof(P2P_gap_reply_MESSAGE));
+ ret = size;
+ if (content[entry].sentCount == MAX_RECEIVERS)
+ {
+ GNUNET_free (content[entry].value);
+ content[entry].value = NULL;
+ content[entry].sentCount = 0;
+ }
+ else
+ {
+ content[entry].receiverIndices[content[entry].sentCount++] =
+ index;
+ }
+ }
+ else
+ {
+#if DEBUG_MIGRATION
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Migration: not enough cover traffic\n");
+#endif
+ }
+ GNUNET_mutex_unlock (lock);
+ if ((ret > 0) && (stats != NULL))
+ stats->change (stat_migration_count, 1);
+ GNUNET_GE_BREAK (NULL, ret <= padding);
+ return ret;
+}
+
+void
+GNUNET_FS_MIGRATION_init (GNUNET_CoreAPIForPlugins * capi)
+{
+ coreAPI = capi;
+ lock = GNUNET_mutex_create (GNUNET_NO);
+ coreAPI->
+ connection_register_send_callback
+ (GNUNET_GAP_ESTIMATED_DATA_SIZE,
+ GNUNET_FS_GAP_CONTENT_MIGRATION_PRIORITY,
+ &activeMigrationCallback);
+ datastore = capi->request_service ("datastore");
+ dht = capi->request_service ("dht");
+ stats = capi->request_service ("stats");
+ if (stats != NULL)
+ {
+ stat_migration_count
+ = stats->create (gettext_noop ("# blocks migrated"));
+ stat_migration_factor
+ = stats->create (gettext_noop ("# blocks fetched for migration"));
+ stat_on_demand_migration_attempts
+ =
+ stats->create (gettext_noop ("# on-demand block migration attempts"));
+ }
+}
+
+void
+GNUNET_FS_MIGRATION_done ()
+{
+ int i;
+ coreAPI->
+ connection_unregister_send_callback
+ (GNUNET_GAP_ESTIMATED_DATA_SIZE, &activeMigrationCallback);
+ if (stats != NULL)
+ {
+ coreAPI->release_service (stats);
+ stats = NULL;
+ }
+ coreAPI->release_service (datastore);
+ datastore = NULL;
+ coreAPI->release_service (dht);
+ dht = NULL;
+ coreAPI = NULL;
+ for (i = 0; i < MAX_RECORDS; i++)
+ {
+ GNUNET_free_non_null (content[i].value);
+ content[i].value = NULL;
+ }
+ GNUNET_mutex_destroy (lock);
+ lock = NULL;
+}
+
+/* end of migration.c */
Property changes on: GNUnet/src/applications/fs/gap/migration.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/migration.h
===================================================================
--- GNUnet/src/applications/fs/gap/migration.h (rev 0)
+++ GNUnet/src/applications/fs/gap/migration.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,42 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2008 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @author Christian Grothoff
+ * @brief This module is responsible for pushing content out
+ * into the network.
+ * @file applications/fs/gap/migration.h
+ */
+#ifndef MIGRATION_H
+#define MIGRATION_H
+
+#include "gnunet_core.h"
+
+/**
+ * Initialize the migration module.
+ */
+void
+GNUNET_FS_MIGRATION_init (GNUNET_CoreAPIForPlugins * capi);
+
+void
+GNUNET_FS_MIGRATION_done (void);
+
+/* end of migration.h */
+#endif
Property changes on: GNUnet/src/applications/fs/gap/migration.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/ondemand.c
===================================================================
--- GNUnet/src/applications/fs/gap/ondemand.c (rev 0)
+++ GNUnet/src/applications/fs/gap/ondemand.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,621 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Christian Grothoff
(and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/fs/gap/ondemand.c
+ * @brief functions for handling on-demand encoding
+ * @author Christian Grothoff
+ */
+
+
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_directories.h"
+#include "gnunet_protocols.h"
+#include "gnunet_datastore_service.h"
+#include "gnunet_state_service.h"
+#include "ecrs_core.h"
+#include "shared.h"
+#include "ondemand.h"
+
+/**
+ * Format of an on-demand block.
+ */
+typedef struct
+{
+ GNUNET_DatastoreValue header;
+
+ unsigned int type;
+
+ /**
+ * Size of the on-demand encoded part of the file
+ * that this Block represents.
+ */
+ unsigned int blockSize;
+
+ /**
+ * At what offset in the plaintext file is
+ * this content stored?
+ */
+ unsigned long long fileOffset;
+
+ /**
+ * What is the GNUNET_hash of the file that contains
+ * this block? Used to determine the name
+ * of the file in the on-demand datastore.
+ */
+ GNUNET_HashCode fileId;
+
+} OnDemandBlock;
+
+/**
+ * Name of the directory where we store symlinks to indexed
+ * files.
+ */
+static char * index_directory;
+
+static GNUNET_State_ServiceAPI *state;
+
+static GNUNET_Datastore_ServiceAPI *datastore;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+/**
+ * Get the name of the symbolic link corresponding
+ * to the given hash of an indexed file.
+ */
+static char *
+get_indexed_filename (const GNUNET_HashCode * fileId)
+{
+ GNUNET_EncName enc;
+ char *fn;
+
+ GNUNET_hash_to_enc (fileId, &enc);
+ fn = GNUNET_malloc (strlen (index_directory) + sizeof (GNUNET_EncName) + 1);
+ strcpy (fn, index_directory);
+ strcat (fn, DIR_SEPARATOR_STR);
+ strcat (fn, (char *) &enc);
+ return fn;
+}
+
+/**
+ * We use the state-DB to mark that certain indexed
+ * files have disappeared. If they are indexed again
+ * or explicitly unindexed, we should remove the
+ * respective markers.
+ *
+ * @param fileId hash of the file for which the marker
+ * should be removed
+ */
+static void
+remove_unavailable_mark(const GNUNET_HashCode * fileId)
+{
+ GNUNET_EncName enc;
+ char unavail_key[256];
+
+ GNUNET_hash_to_enc (fileId, &enc);
+ GNUNET_snprintf (unavail_key,
+ 256,
+ "FIRST_UNAVAILABLE-%s", (char *) &enc);
+ state->unlink (coreAPI->ectx, unavail_key);
+}
+
+/**
+ * We use the state-DB to mark that certain indexed
+ * files have disappeared. If they are marked as
+ * disappeared for a while, we remove all traces of
+ * those files from the database. This function is
+ * called to either initially mark a file as unavailable,
+ * or, if the condition persists, to trigger its
+ * removal from the database.
+ */
+static void
+publish_unavailable_mark(const GNUNET_HashCode * fileId)
+{
+ char unavail_key[256];
+ GNUNET_EncName enc;
+ unsigned long long *first_unavail;
+ unsigned long long now;
+ unsigned int len;
+ char *ofn;
+ char *fn;
+ int ret;
+
+ now = GNUNET_get_time ();
+ GNUNET_hash_to_enc (fileId, &enc);
+ GNUNET_snprintf (unavail_key, 256, "FIRST_UNVAILABLE-%s",
+ (char *) &enc);
+ if (state->read (coreAPI->ectx, unavail_key, (void *) &first_unavail) !=
+ sizeof (GNUNET_CronTime))
+ {
+ now = GNUNET_htonll (now);
+ state->write (coreAPI->ectx,
+ unavail_key, sizeof (GNUNET_CronTime),
+ (void *) &now);
+ return;
+ }
+ if (GNUNET_ntohll (*first_unavail) - now <
+ 3 * GNUNET_CRON_DAYS)
+ return; /* do nothing for first 3 days */
+ fn = get_indexed_filename(fileId);
+ /* Delete it after 3 days */
+ len = 256;
+ ofn = GNUNET_malloc (len);
+ while (((ret = READLINK (fn, ofn, len)) == -1) &&
+ (errno == ENAMETOOLONG) && (len < 4 * 1024 * 1024))
+ if (len * 2 < len)
+ {
+ GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ GNUNET_array_grow (ofn, len, 0);
+ GNUNET_free (fn);
+ return;
+ }
+ GNUNET_array_grow (ofn, len, len * 2);
+ if (ret != -1)
+ {
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_BULK |
+ GNUNET_GE_USER,
+ _
+ ("Because the file `%s' has been unavailable for 3 days"
+ " it got removed from your share. Please unindex files
before"
+ " deleting them as the index now contains invalid
references!\n"),
+ ofn);
+ }
+ GNUNET_free (ofn);
+ state->unlink (coreAPI->ectx, unavail_key);
+ UNLINK (fn);
+ GNUNET_free (fn);
+}
+
+/**
+ * Creates a symlink to the given file in the shared directory
+ *
+ * @param fn the file that was indexed
+ * @param fileId the file's GNUNET_hash code
+ * @return GNUNET_SYSERR on error, GNUNET_NO if symlinking failed,
+ * GNUNET_YES on success
+ */
+int
+GNUNET_FS_ONDEMAND_index_prepare_with_symlink (struct GNUNET_GE_Context * ectx,
+ const GNUNET_HashCode * fileId, const char *fn)
+{
+ GNUNET_EncName enc;
+ char *serverFN;
+ GNUNET_HashCode linkId;
+
+ if ( (GNUNET_SYSERR == GNUNET_hash_file (ectx,
+ fn,
+ &linkId)) ||
+ (0 != memcmp (&linkId, fileId, sizeof (GNUNET_HashCode))) )
+ return GNUNET_SYSERR;
+ serverFN =
+ GNUNET_malloc (strlen (index_directory) + 2 + sizeof (GNUNET_EncName));
+ strcpy (serverFN, index_directory);
+ strcat (serverFN, DIR_SEPARATOR_STR);
+ GNUNET_hash_to_enc (fileId, &enc);
+ strcat (serverFN, (char *) &enc);
+ UNLINK (serverFN);
+ GNUNET_disk_directory_create_for_file (ectx, serverFN);
+ if (0 != SYMLINK (fn, serverFN))
+ {
+ GNUNET_GE_LOG_STRERROR_FILE (ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK, "symlink",
+ fn);
+ GNUNET_GE_LOG_STRERROR_FILE (ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK, "symlink",
+ serverFN);
+ GNUNET_GE_free_context (ectx);
+ GNUNET_free (serverFN);
+ return GNUNET_NO;
+ }
+ GNUNET_GE_free_context (ectx);
+ GNUNET_free (serverFN);
+ remove_unavailable_mark(fileId);
+ return GNUNET_YES;
+}
+
+/**
+ * Writes the given content to the file at the specified offset
+ * and stores an OnDemandBlock into the datastore.
+ *
+ * @return GNUNET_NO if already present, GNUNET_YES on success,
+ * GNUNET_SYSERR on other error (i.e. datastore full)
+ */
+int
+GNUNET_FS_ONDEMAND_add_indexed_content (struct GNUNET_GE_Context * ectx,
+ GNUNET_Datastore_ServiceAPI * datastore,
+ unsigned int prio,
+ GNUNET_CronTime expiration,
+ unsigned long long fileOffset,
+ unsigned int anonymityLevel,
+ const GNUNET_HashCode * fileId,
+ unsigned int size, const DBlock * content)
+{
+ int ret;
+ OnDemandBlock odb;
+ GNUNET_HashCode key;
+ struct stat sbuf;
+ char *fn;
+ int fd;
+
+ if (size <= sizeof (DBlock))
+ {
+ GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ fn = get_indexed_filename (fileId);
+ if ((0 != LSTAT (fn, &sbuf))
+#ifdef S_ISLNK
+ || (!S_ISLNK (sbuf.st_mode))
+#endif
+ )
+ {
+ /* not sym-linked, write content to offset! */
+ fd = GNUNET_disk_file_open (ectx, fn, O_LARGEFILE | O_CREAT | O_WRONLY,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); /* 644 */
+ if (fd == -1)
+ {
+ GNUNET_free (fn);
+ return GNUNET_SYSERR;
+ }
+ LSEEK (fd, fileOffset, SEEK_SET);
+ ret = WRITE (fd, &content[1], size - sizeof (DBlock));
+ if (ret == size - sizeof (DBlock))
+ {
+ ret = GNUNET_OK;
+ }
+ else
+ {
+ GNUNET_GE_LOG_STRERROR_FILE (ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK,
+ "write", fn);
+ ret = GNUNET_SYSERR;
+ }
+ CLOSE (fd);
+ if (ret == GNUNET_SYSERR)
+ {
+ GNUNET_free (fn);
+ return GNUNET_SYSERR;
+ }
+ }
+ GNUNET_free (fn);
+
+ odb.header.size = htonl (sizeof (OnDemandBlock));
+ odb.header.type = htonl (GNUNET_ECRS_BLOCKTYPE_ONDEMAND);
+ odb.header.prio = htonl (prio);
+ odb.header.anonymityLevel = htonl (anonymityLevel);
+ odb.header.expirationTime = GNUNET_htonll (expiration);
+ odb.type = htonl (GNUNET_ECRS_BLOCKTYPE_ONDEMAND);
+ odb.fileOffset = GNUNET_htonll (fileOffset);
+ odb.blockSize = htonl (size - sizeof (DBlock));
+ odb.fileId = *fileId;
+ /* compute the primary key */
+ GNUNET_EC_file_block_get_query (content, size, &key);
+#if EXTRA_CHECKS
+ {
+ GNUNET_DatastoreValue *dsvalue;
+ if (GNUNET_OK !=
+ GNUNET_EC_file_block_encode (content, size, &key, &dsvalue))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ }
+ else
+ {
+ GNUNET_free (dsvalue);
+ }
+ }
+#endif
+ return datastore->putUpdate (&key, &odb.header);
+}
+
+/**
+ * Call datastore's delete method using the
+ * query and datastore value from the closure.
+ * (and free the closure).
+ */
+static void
+async_delete_job (void *cls)
+{
+ GNUNET_HashCode * query = cls;
+ GNUNET_DatastoreValue * dbv = (GNUNET_DatastoreValue*) &query[1];
+
+ datastore->del (query, dbv);
+ GNUNET_free (query);
+}
+
+/**
+ * Delete the query that still references the unavailable file. This
+ * must be done asynchronously since we are in the "get" iterator and
+ * a del operation during "get" would deadlock!
+ */
+static void
+delete_content_asynchronously (const GNUNET_DatastoreValue * dbv, const
GNUNET_HashCode * query)
+{
+ GNUNET_HashCode * ctx;
+
+ ctx = GNUNET_malloc (sizeof (GNUNET_HashCode) + ntohl(dbv->size));
+ *ctx = *query;
+ memcpy (&ctx[1], dbv, ntohl (dbv->size));
+ GNUNET_cron_add_job (coreAPI->cron, &async_delete_job, 0, 0, ctx);
+}
+
+/**
+ * A query on the datastore resulted in the on-demand
+ * block dbv. On-demand encode the block and return
+ * the resulting DSV in enc. If the on-demand
+ * encoding fails because the file is no longer there,
+ * this function also removes the OD-Entry
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR if there was an error
+ */
+int
+GNUNET_FS_ONDEMAND_get_indexed_content (const GNUNET_DatastoreValue * dbv,
+ const GNUNET_HashCode * query,
+ GNUNET_DatastoreValue ** enc)
+{
+ char *fn;
+ char *iobuf;
+ int blen;
+ int fileHandle;
+ int ret;
+ const OnDemandBlock *odb;
+ DBlock *db;
+ struct stat linkStat;
+
+
+ if ( (ntohl (dbv->size) != sizeof (OnDemandBlock)) ||
+ (ntohl (dbv->type) != GNUNET_ECRS_BLOCKTYPE_ONDEMAND) )
+ {
+ GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ odb = (const OnDemandBlock *) dbv;
+ fn = get_indexed_filename (&odb->fileId);
+ if ((GNUNET_YES != GNUNET_disk_file_test (coreAPI->ectx,
+ fn)) ||
+ (-1 == (fileHandle = GNUNET_disk_file_open (coreAPI->ectx,
+ fn, O_LARGEFILE | O_RDONLY,
+ 0))))
+ {
+ GNUNET_GE_LOG_STRERROR_FILE (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK, "open",
+ fn);
+ /* Is the symlink (still) there? */
+ if (LSTAT (fn, &linkStat) == -1)
+ delete_content_asynchronously (dbv, query);
+ else
+ publish_unavailable_mark(&odb->fileId);
+ GNUNET_free (fn);
+ return GNUNET_SYSERR;
+ }
+
+ if (GNUNET_ntohll (odb->fileOffset) != LSEEK (fileHandle,
+ GNUNET_ntohll (odb->
+ fileOffset),
+ SEEK_SET))
+ {
+ GNUNET_GE_LOG_STRERROR_FILE (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK, "lseek",
+ fn);
+ GNUNET_free (fn);
+ CLOSE (fileHandle);
+ delete_content_asynchronously (dbv, query);
+ return GNUNET_SYSERR;
+ }
+ db = GNUNET_malloc (sizeof (DBlock) + ntohl (odb->blockSize));
+ db->type = htonl (GNUNET_ECRS_BLOCKTYPE_DATA);
+ iobuf = (char *) &db[1];
+ blen = READ (fileHandle, iobuf, ntohl (odb->blockSize));
+ if (blen != ntohl (odb->blockSize))
+ {
+ GNUNET_GE_LOG_STRERROR_FILE (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK, "read",
+ fn);
+ GNUNET_free (fn);
+ GNUNET_free (db);
+ CLOSE (fileHandle);
+ delete_content_asynchronously (dbv, query);
+ return GNUNET_SYSERR;
+ }
+ CLOSE (fileHandle);
+ ret = GNUNET_EC_file_block_encode (db,
+ ntohl (odb->blockSize) + sizeof (DBlock),
+ query, enc);
+ GNUNET_free (db);
+ GNUNET_free (fn);
+ if (ret == GNUNET_SYSERR)
+ {
+ GNUNET_GE_LOG (coreAPI->ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _
+ ("Indexed content changed (does not match its hash).\n"));
+ delete_content_asynchronously (dbv, query);
+ return GNUNET_SYSERR;
+ }
+ (*enc)->anonymityLevel = dbv->anonymityLevel;
+ (*enc)->expirationTime = dbv->expirationTime;
+ (*enc)->prio = dbv->prio;
+ return GNUNET_OK;
+}
+
+/**
+ * Test if the file with the given ID is
+ * indexed.
+ * @return GNUNET_YES if so, GNUNET_NO if not.
+ */
+int
+GNUNET_FS_ONDEMAND_test_indexed_file (GNUNET_Datastore_ServiceAPI * datastore,
+ const GNUNET_HashCode * fileId)
+{
+ char *fn;
+ int fd;
+
+ fn = get_indexed_filename (fileId);
+ fd = GNUNET_disk_file_open (coreAPI->ectx, fn, O_RDONLY);
+ GNUNET_free (fn);
+ if (fd == -1)
+ return GNUNET_NO;
+ CLOSE (fd);
+ return GNUNET_YES;
+}
+
+/**
+ * Unindex the file with the given ID. Removes the file from the
+ * filesystem and all of the corresponding obd blocks from the
+ * datastore. Note that the IBlocks are NOT removed by this function.
+ *
+ * @param blocksize the size of each of the
+ * indexed blocks (required to break
+ * up the file properly when computing
+ * the keys of the odb blocks).
+ */
+int
+GNUNET_FS_ONDEMAND_delete_indexed_content (struct GNUNET_GE_Context * ectx,
+ GNUNET_Datastore_ServiceAPI * datastore,
+ unsigned int blocksize, const GNUNET_HashCode * fileId)
+{
+ char *fn;
+ int fd;
+ int ret;
+ OnDemandBlock odb;
+ GNUNET_HashCode key;
+ unsigned long long pos;
+ unsigned long long size;
+ unsigned long long delta;
+ DBlock *block;
+ GNUNET_EncName enc;
+
+ fn = get_indexed_filename (fileId);
+ fd = GNUNET_disk_file_open (ectx, fn, O_RDONLY | O_LARGEFILE, 0);
+ if (fd == -1)
+ {
+ GNUNET_free (fn);
+ return GNUNET_SYSERR;
+ }
+ pos = 0;
+ if (GNUNET_OK != GNUNET_disk_file_size (ectx, fn, &size, GNUNET_YES))
+ {
+ GNUNET_free (fn);
+ return GNUNET_SYSERR;
+ }
+ block = GNUNET_malloc (sizeof (DBlock) + blocksize);
+ block->type = htonl (GNUNET_ECRS_BLOCKTYPE_DATA);
+ while (pos < size)
+ {
+ delta = size - pos;
+ if (delta > blocksize)
+ delta = blocksize;
+ if (delta != READ (fd, &block[1], delta))
+ {
+ GNUNET_GE_LOG_STRERROR_FILE (ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK,
+ "read", fn);
+ CLOSE (fd);
+ GNUNET_free (fn);
+ GNUNET_free (block);
+ return GNUNET_SYSERR;
+ }
+ odb.header.size = htonl (sizeof (OnDemandBlock));
+ odb.header.type = htonl (GNUNET_ECRS_BLOCKTYPE_ONDEMAND);
+ odb.header.prio = 0;
+ odb.header.anonymityLevel = 0;
+ odb.header.expirationTime = 0;
+ odb.type = htonl (GNUNET_ECRS_BLOCKTYPE_ONDEMAND);
+ odb.fileOffset = GNUNET_htonll (pos);
+ odb.blockSize = htonl (delta);
+ odb.fileId = *fileId;
+ /* compute the primary key */
+ GNUNET_EC_file_block_get_query (block, delta + sizeof (DBlock), &key);
+ if (GNUNET_SYSERR == datastore->get (&key,
GNUNET_ECRS_BLOCKTYPE_ONDEMAND,
&GNUNET_FS_HELPER_complete_value_from_database_callback, &odb.header)) /*
aborted == found! */
+ ret = datastore->del (&key, &odb.header);
+ else /* not found */
+ ret = GNUNET_SYSERR;
+ if (ret == GNUNET_SYSERR)
+ {
+ IF_GELOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (&key, &enc));
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _
+ ("Unindexed ODB block `%s' from offset %llu already
missing from datastore.\n"),
+ &enc, pos);
+ }
+ pos += delta;
+ }
+ GNUNET_free (block);
+ CLOSE (fd);
+ UNLINK (fn);
+ GNUNET_free (fn);
+ remove_unavailable_mark(fileId);
+ return GNUNET_OK;
+}
+
+
+
+int GNUNET_FS_ONDEMAND_init(GNUNET_CoreAPIForPlugins * capi)
+{
+ char * tmp;
+
+ coreAPI = capi;
+ GNUNET_GC_get_configuration_value_filename (capi->cfg,
+ "GNUNETD",
+ "GNUNETD_HOME",
+
GNUNET_DEFAULT_DAEMON_VAR_DIRECTORY,
+ &tmp);
+ tmp = GNUNET_realloc (tmp, strlen (tmp) + strlen ("/data/shared/") + 1);
+ strcat (tmp, "/data/shared/");
+ GNUNET_GC_get_configuration_value_filename (capi->cfg,
+ "FS",
+ "INDEX-DIRECTORY",
+ tmp, &index_directory);
+ GNUNET_free (tmp);
+ GNUNET_disk_directory_create (coreAPI->ectx, index_directory); /* just in
case */
+
+ state = capi->request_service ("state");
+ if (state == NULL)
+ {
+ GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ return GNUNET_SYSERR;
+ }
+
+ return 0;
+}
+
+int GNUNET_FS_ONDEMAND_done()
+{
+ coreAPI->release_service (state);
+ state = NULL;
+ GNUNET_free (index_directory);
+ index_directory = NULL;
+ return 0;
+}
+
+
+/* end of ondemand.c */
Property changes on: GNUnet/src/applications/fs/gap/ondemand.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/ondemand.h
===================================================================
--- GNUnet/src/applications/fs/gap/ondemand.h (rev 0)
+++ GNUnet/src/applications/fs/gap/ondemand.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,105 @@
+/*
+ This file is part of GNUnet.
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Christian Grothoff
(and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/*/
+ * @file applications/fs/gap/ondemand.h
+ * @brief functions for handling on-demand encoding
+ * @author Christian Grothoff
+ */
+#ifndef ONDEMAND_H
+#define ONDEMAND_H
+
+#include "gnunet_core.h"
+#include "gnunet_datastore_service.h"
+#include "ecrs_core.h"
+
+/**
+ * A query on the datastore resulted in the on-demand
+ * block dbv. On-demand encode the block and return
+ * the resulting DSV in enc. If the on-demand
+ * encoding fails because the file is no longer there,
+ * this function also removes the OD-Entry
+ *
+ * @return GNUNET_OK on success, GNUNET_SYSERR if there was an error
+ */
+int
+GNUNET_FS_ONDEMAND_get_indexed_content (const GNUNET_DatastoreValue * dbv,
+ const GNUNET_HashCode * query,
+ GNUNET_DatastoreValue ** enc);
+
+
+/**
+ * Creates a symlink to the given file in the shared directory
+ *
+ * @param fn the file that was indexed
+ * @param fileId the file's GNUNET_hash code
+ * @return GNUNET_SYSERR on error, GNUNET_NO if symlinking failed,
+ * GNUNET_YES on success
+ */
+int
+GNUNET_FS_ONDEMAND_index_prepare_with_symlink (struct GNUNET_GE_Context *cectx,
+ const GNUNET_HashCode * fileId,
const char *fn);
+
+/**
+ * Writes the given content to the file at the specified offset
+ * and stores an OnDemandBlock into the datastore.
+ *
+ * @return GNUNET_NO if already present, GNUNET_YES on success,
+ * GNUNET_SYSERR on other error (i.e. datastore full)
+ */
+int
+GNUNET_FS_ONDEMAND_add_indexed_content (struct GNUNET_GE_Context *cectx,
+ GNUNET_Datastore_ServiceAPI * datastore,
+ unsigned int prio,
+ GNUNET_CronTime expiration,
+ unsigned long long fileOffset,
+ unsigned int anonymityLevel,
+ const GNUNET_HashCode * fileId,
+ unsigned int size, const DBlock *
content);
+
+/**
+ * Test if the file with the given ID is
+ * indexed.
+ * @return GNUNET_YES if so, GNUNET_NO if not.
+ */
+int
+GNUNET_FS_ONDEMAND_test_indexed_file (GNUNET_Datastore_ServiceAPI * datastore,
+ const GNUNET_HashCode * fileId);
+
+/**
+ * Unindex the file with the given ID. Removes the file from the
+ * filesystem and all of the corresponding obd blocks from the
+ * datastore. Note that the IBlocks are NOT removed by this function.
+ *
+ * @param blocksize the size of each of the
+ * indexed blocks (required to break
+ * up the file properly when computing
+ * the keys of the odb blocks).
+ */
+int
+GNUNET_FS_ONDEMAND_delete_indexed_content (struct GNUNET_GE_Context *cectx,
+ GNUNET_Datastore_ServiceAPI * datastore,
+ unsigned int blocksize, const
GNUNET_HashCode * fileId);
+
+int GNUNET_FS_ONDEMAND_init(GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_FS_ONDEMAND_done(void);
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/ondemand.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/pid_table.c
===================================================================
--- GNUnet/src/applications/fs/gap/pid_table.c (rev 0)
+++ GNUnet/src/applications/fs/gap/pid_table.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,201 @@
+/*
+ This file is part of GNUnet
+ (C) 2006, 2008 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/pid_table.c
+ * @brief peer-ID table that assigns integer IDs to peer-IDs to save memory
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "pid_table.h"
+
+/**
+ * Statistics service.
+ */
+static GNUNET_Stats_ServiceAPI *stats;
+
+static int stat_pid_entries;
+
+static int stat_pid_rc;
+
+static struct GNUNET_GE_Context *ectx;
+
+typedef struct
+{
+ /**
+ * the identifier itself
+ */
+ GNUNET_HashCode id;
+
+ /**
+ * reference counter
+ */
+ unsigned int rc;
+} PID_Entry;
+
+static unsigned int size;
+
+static PID_Entry *table;
+
+static struct GNUNET_Mutex *lock;
+
+
+PID_INDEX
+GNUNET_FS_PT_intern (const GNUNET_PeerIdentity * pid)
+{
+ PID_INDEX ret;
+ PID_INDEX zero;
+
+ if (pid == NULL)
+ return 0;
+ zero = size;
+ GNUNET_mutex_lock (lock);
+ for (ret = 1; ret < size; ret++)
+ {
+ if (0 == memcmp (&pid->hashPubKey,
+ &table[ret].id, sizeof (GNUNET_HashCode)))
+ {
+ table[ret].rc++;
+ if (stats != NULL)
+ {
+ stats->change (stat_pid_rc, 1);
+ if (table[ret].rc == 1)
+ stats->change (stat_pid_entries, 1);
+ }
+ GNUNET_mutex_unlock (lock);
+ return ret;
+ }
+ else if ((zero == size) && (table[ret].rc == 0))
+ {
+ zero = ret;
+ }
+ }
+ ret = zero;
+ if (ret == size)
+ {
+ GNUNET_array_grow (table, size, size + 16);
+ }
+ if (ret == 0)
+ ret = 1;
+ GNUNET_GE_ASSERT (ectx, ret < size);
+ table[ret].id = pid->hashPubKey;
+ table[ret].rc = 1;
+ GNUNET_mutex_unlock (lock);
+ if (stats != NULL)
+ {
+ stats->change (stat_pid_rc, 1);
+ stats->change (stat_pid_entries, 1);
+ }
+ return ret;
+}
+
+void
+GNUNET_FS_PT_decrement_rcs (const PID_INDEX * ids, unsigned int count)
+{
+ int i;
+ PID_INDEX id;
+ if (count == 0)
+ return;
+ GNUNET_mutex_lock (lock);
+ for (i = count - 1; i >= 0; i--)
+ {
+ id = ids[i];
+ GNUNET_GE_ASSERT (ectx, id < size);
+ GNUNET_GE_ASSERT (ectx, table[id].rc > 0);
+ table[id].rc--;
+ if ((table[id].rc == 0) && (stats != NULL))
+ stats->change (stat_pid_entries, -1);
+ }
+ GNUNET_mutex_unlock (lock);
+ if (stats != NULL)
+ stats->change (stat_pid_rc, -count);
+}
+
+void
+GNUNET_FS_PT_change_rc (PID_INDEX id, int delta)
+{
+ if (id == 0)
+ return;
+ GNUNET_mutex_lock (lock);
+ GNUNET_GE_ASSERT (ectx, id < size);
+ GNUNET_GE_ASSERT (ectx, table[id].rc > 0);
+ table[id].rc += delta;
+ if (stats != NULL)
+ {
+ stats->change (stat_pid_rc, delta);
+ if (table[id].rc == 0)
+ stats->change (stat_pid_entries, -1);
+ }
+ GNUNET_mutex_unlock (lock);
+}
+
+void
+GNUNET_FS_PT_resolve (PID_INDEX id, GNUNET_PeerIdentity * pid)
+{
+ if (id == 0)
+ {
+ memset (pid, 0, sizeof (GNUNET_PeerIdentity));
+ GNUNET_GE_BREAK (ectx, 0);
+ return;
+ }
+ GNUNET_mutex_lock (lock);
+ GNUNET_GE_ASSERT (ectx, id < size);
+ GNUNET_GE_ASSERT (ectx, table[id].rc > 0);
+ pid->hashPubKey = table[id].id;
+ GNUNET_mutex_unlock (lock);
+}
+
+
+void
+GNUNET_FS_PT_init (struct GNUNET_GE_Context *e, GNUNET_Stats_ServiceAPI * s)
+{
+ ectx = e;
+ stats = s;
+ if (stats != NULL)
+ {
+ stat_pid_entries
+ =
+ stats->
+ create (gettext_noop ("# distinct interned peer IDs in pid table"));
+ stat_pid_rc =
+ stats->
+ create (gettext_noop
+ ("# total RC of interned peer IDs in pid table"));
+ }
+ lock = GNUNET_mutex_create (GNUNET_NO);
+}
+
+
+void
+GNUNET_FS_PT_done ()
+{
+ unsigned int i;
+
+ for (i = 0;i<size; i++)
+ GNUNET_GE_ASSERT (ectx, table[i].rc == 0);
+ GNUNET_array_grow (table, size, 0);
+ stats = NULL;
+ GNUNET_mutex_destroy (lock);
+ lock = NULL;
+ ectx = NULL;
+}
+
+/* end of pid_table.c */
Property changes on: GNUnet/src/applications/fs/gap/pid_table.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/pid_table.h
===================================================================
--- GNUnet/src/applications/fs/gap/pid_table.h (rev 0)
+++ GNUnet/src/applications/fs/gap/pid_table.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,48 @@
+/*
+ This file is part of GNUnet
+ (C) 2006 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file gap/pid_table.h
+ * @brief peer-ID table that assigns integer IDs to peer-IDs to save memory
+ * @author Christian Grothoff
+ */
+
+#ifndef GNUNET_PID_TABLE_H
+#define GNUNET_PID_TABLE_H
+
+#include "gnunet_util.h"
+#include "gnunet_stats_service.h"
+
+void GNUNET_FS_PT_init (struct GNUNET_GE_Context *ectx,
+ GNUNET_Stats_ServiceAPI * s);
+
+void GNUNET_FS_PT_done (void);
+
+typedef unsigned int PID_INDEX;
+
+PID_INDEX GNUNET_FS_PT_intern (const GNUNET_PeerIdentity * pid);
+
+void GNUNET_FS_PT_change_rc (PID_INDEX id, int delta);
+
+void GNUNET_FS_PT_decrement_rcs (const PID_INDEX * ids, unsigned int count);
+
+void GNUNET_FS_PT_resolve (PID_INDEX id, GNUNET_PeerIdentity * pid);
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/pid_table.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/plan.c
===================================================================
--- GNUnet/src/applications/fs/gap/plan.c (rev 0)
+++ GNUnet/src/applications/fs/gap/plan.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,730 @@
+/*
+ This file is part of GNUnet
+ (C) 2008 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/plan.c
+ * @brief code to plan when to send requests where
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - code to clean up plans (remove
+ * plans for peers that we are no longer
+ * connected to) -- using cron?
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "plan.h"
+#include "pid_table.h"
+#include "fs_dht.h"
+#include "fs.h"
+#include "shared.h"
+
+/**
+ * Linked list summarizing how good other peers
+ * were at producing responses for a client.
+ */
+struct PeerHistoryList {
+
+ /**
+ * This is a linked list.
+ */
+ struct PeerHistoryList * next;
+
+ /**
+ * Last time we transmitted a request to this peer.
+ */
+ GNUNET_CronTime last_request_time;
+
+ /**
+ * Last time we received a response from this peer.
+ */
+ GNUNET_CronTime last_response_time;
+
+ /**
+ * What peer is this history entry for?
+ */
+ PID_INDEX peer;
+
+ /**
+ * Total number of requests send to the peer so far.
+ */
+ unsigned int request_count;
+
+ /**
+ * Total number of replies received from this peer so far.
+ */
+ unsigned int response_count;
+
+ /**
+ * TTL value used for last successful request.
+ */
+ int last_good_ttl;
+
+ /**
+ * Priority value used for last successful request.
+ */
+ unsigned int last_good_prio;
+
+ /**
+ * (Relative) TTL used in the last request.
+ */
+ int last_ttl_used;
+
+ /**
+ * Priority used for the last request.
+ */
+ unsigned int last_prio_used;
+
+};
+
+/**
+ * Linked list with information for each client.
+ */
+struct ClientInfoList {
+
+ /**
+ * This is a linked list.
+ */
+ struct ClientInfoList * next;
+
+ /**
+ * For which client is this data kept (NULL
+ * if the "client" is another peer).
+ */
+ struct GNUNET_ClientHandle * client;
+
+ /**
+ * List of the history of reactions of other peers
+ * to queries from this client.
+ */
+ struct PeerHistoryList * history;
+
+ /**
+ * If "client" is NULL, this is the peer for
+ * which this is the history.
+ */
+ PID_INDEX peer;
+
+};
+
+/**
+ * Linked list of rankings given to connected peers. This list is
+ * used to determine which peers should be considered for forwarding
+ * of the query.
+ */
+struct PeerRankings
+{
+ /**
+ * This is a linked list.
+ */
+ struct PeerRankings * next;
+
+ /**
+ * Peer that is being ranked.
+ */
+ PID_INDEX peer;
+
+ /**
+ * Recommended priority for this peer.
+ */
+ unsigned int prio;
+
+ /**
+ * Recommended Time-to-live for this peer.
+ */
+ int ttl;
+
+ /**
+ * Client score (higher is better).
+ */
+ unsigned int score;
+
+};
+
+
+static GNUNET_CoreAPIForPlugins * coreAPI;
+
+/**
+ * Plan for query execution (for each peer, a list of
+ * requests and when we should consider transmitting
+ * them).
+ */
+static struct QueryPlanList * queries;
+
+/**
+ * Information about the performance of peers
+ * for requests from various clients.
+ */
+static struct ClientInfoList * clients;
+
+/**
+ * Find the entry in the client list corresponding
+ * to the given client information. If no such entry
+ * exists, create one.
+ */
+static struct ClientInfoList *
+find_or_create_client_entry(struct GNUNET_ClientHandle * client,
+ PID_INDEX peer)
+{
+ struct ClientInfoList * cl;
+
+ cl = clients;
+ while (cl != NULL)
+ {
+ if ( ( (cl->client != NULL) &&
+ (cl->client == client) ) ||
+ ( (cl->peer != 0) &&
+ (cl->peer == peer)) )
+ break;
+ cl = cl->next;
+ }
+ if (cl != NULL)
+ return cl;
+ cl = GNUNET_malloc(sizeof(struct ClientInfoList));
+ memset(cl, 0, sizeof(struct ClientInfoList));
+ cl->next = clients;
+ clients = cl;
+ cl->client = client;
+ cl->peer = peer;
+ return cl;
+}
+
+/**
+ * Find the entry in the history corresponding
+ * to the given peer ID. If no such entry
+ * exists, create one.
+ */
+static struct PeerHistoryList *
+find_or_create_history_entry(struct ClientInfoList * cl,
+ PID_INDEX responder)
+{
+ struct PeerHistoryList * hl;
+
+ hl = cl->history;
+ while (hl != NULL)
+ {
+ if (hl->peer == responder)
+ break;
+ hl = hl->next;
+ }
+ if (hl != NULL)
+ return hl;
+ hl = GNUNET_malloc(sizeof(struct PeerHistoryList));
+ memset(hl, 0, sizeof(struct PeerHistoryList));
+ hl->next = cl->history;
+ cl->history = hl;
+ hl->peer = responder;
+ GNUNET_FS_PT_change_rc(responder, 1);
+ return hl;
+}
+
+/**
+ * Add the given request to the list of pending requests for the
+ * specified target. A random position in the queue will
+ * be used.
+ *
+ * @param target what peer to send the request to
+ * @param request the request to send
+ * @param ttl time-to-live for the request
+ * @param prio priority to use for the request
+ */
+static void
+add_request(PID_INDEX target,
+ struct RequestList * request,
+ int ttl,
+ unsigned int prio)
+{
+ struct QueryPlanList * qpl;
+ struct QueryPlanEntry * entry;
+ struct QueryPlanEntry * pos;
+ unsigned int total;
+
+ /* find query plan for target */
+ qpl = queries;
+ while ( (qpl != NULL) &&
+ (qpl->peer != target) )
+ qpl = qpl->next;
+ if (qpl == NULL)
+ {
+ qpl = GNUNET_malloc(sizeof(struct QueryPlanList));
+ memset(qpl, 0, sizeof(struct QueryPlanList));
+ qpl->peer = target;
+ GNUNET_FS_PT_change_rc(target, 1);
+ qpl->next = queries;
+ queries = qpl;
+ }
+ /* construct entry */
+ entry = GNUNET_malloc(sizeof(struct QueryPlanEntry));
+ memset(entry, 0, sizeof(struct QueryPlanEntry));
+ entry->request = request;
+ entry->prio = prio;
+ entry->ttl = GNUNET_FS_HELPER_bound_ttl(ttl, prio);
+
+ /* insert entry into request plan entries list */
+ entry->plan_entries_next = request->plan_entries;
+ request->plan_entries = entry;
+
+ /* compute (random) insertion position in doubly-linked list */
+ total = 0;
+ pos = qpl->head;
+ while (pos != NULL)
+ {
+ total++;
+ pos = pos->next;
+ }
+ total = GNUNET_random_u32(GNUNET_RANDOM_QUALITY_WEAK, total + 1);
+ pos = qpl->head;
+ while (total-- > 0)
+ pos = pos->next;
+ /* insert into datastructure at pos */
+ if (pos == NULL)
+ {
+ qpl->tail->next = entry;
+ entry->prev = qpl->tail;
+ qpl->tail = entry;
+ }
+ else
+ {
+ entry->next = pos->next;
+ pos->next->prev = entry;
+ entry->prev = pos;
+ pos->next = entry;
+ }
+}
+
+/**
+ * Closure for rank_peers callback function.
+ */
+struct RankingPeerContext {
+ struct PeerRankings * rankings;
+ struct ClientInfoList * info;
+ struct RequestList * request;
+};
+
+/**
+ * Rank peers by their quality for a given
+ * request (using history with client,
+ * bandwidth availability, query proximity)
+ *
+ * @param identity the id of the node
+ */
+static void
+rank_peers(const GNUNET_PeerIdentity *
+ identity, void *data)
+{
+ struct RankingPeerContext * rpc = data;
+ struct PeerRankings * rank;
+ struct PeerHistoryList * history;
+
+ rank = GNUNET_malloc(sizeof(struct PeerRankings));
+ memset(rank, 0, sizeof(struct PeerRankings));
+ rank->peer = GNUNET_FS_PT_intern(identity);
+
+ history = NULL;
+ if (rpc->info != NULL)
+ {
+ history = rpc->info->history;
+ while ( (history != NULL) &&
+ (history->peer != rank->peer) )
+ history = history->next;
+ }
+ if (history != NULL)
+ {
+ /* how do we score the history? */
+ }
+ else
+ {
+ /* what are good start values? */
+ }
+
+ /* reserve response-bandwidth from core!
+ (also, don't forget to unreserve for
+ peers that were not selected!) */
+
+
+ /* check query proximity */
+
+ /* generate score, ttl and priority */
+ rank->prio = 42; /* FIXME */
+ rank->ttl = 112; /* FIXME */
+ rank->score = 1; /* FIXME */
+
+ /* insert into ranking list */
+ rank->next = rpc->rankings;
+ rpc->rankings = rank;
+}
+
+/**
+ * Plan the transmission of the given request. Use the history of the
+ * request and the client to schedule the request for transmission.<p>
+ *
+ * This method is probably the most important function in the
+ * anonymous file-sharing module. It determines for each query where
+ * it should be forwarded (to which peers, to how many peers) and what
+ * its TTL and priority values should be.<p>
+ *
+ * @param client maybe NULL, in which case peer is significant
+ * @param peer sender of the request (if not a local client)
+ * @param request to plan
+ */
+void
+GNUNET_FS_PLAN_request(struct GNUNET_ClientHandle * client,
+ PID_INDEX peer,
+ struct RequestList * request)
+{
+ struct ClientInfoList * info;
+ struct PeerRankings * rank;
+ struct RankingPeerContext rpc;
+ unsigned int target_count;
+ unsigned int i;
+ unsigned int total_peers;
+ unsigned long long total_score;
+ unsigned long long selector;
+
+ GNUNET_mutex_lock(GNUNET_FS_lock); /* needed? */
+ info = clients;
+ while ( (info != NULL) &&
+ ( (info->client != client) ||
+ (info->peer != peer) ) )
+ info = info->next;
+
+ /* for all connected peers compute ranking */
+ rpc.info = info;
+ rpc.request = request;
+ rpc.rankings = NULL;
+ total_peers = coreAPI->(rank_peers,
+ &rpc);
+ /* use request type, priority, system load and
+ entropy of ranking to determine number of peers
+ to queue */
+ target_count = 2; /* FIXME */
+
+ if (target_count > total_peers)
+ target_count = total_peers;
+
+ /* use biased random selection to select
+ peers according to ranking; add requests */
+ total_score = 0;
+ rank = rpc.rankings;
+ while (rank != NULL)
+ {
+ GNUNET_GE_ASSERT(NULL, rank->score > 0);
+ total_score += rank->score;
+ rank = rank->next;
+ }
+ /* select target_count peers */
+ for (i=0;i<target_count;i++)
+ {
+ selector = GNUNET_random_u64(GNUNET_RANDOM_QUALITY_WEAK, total_score);
+ rank = rpc.rankings;
+ while (rank != NULL)
+ {
+ if (rank->score > selector)
+ {
+ add_request(rank->peer,
+ request,
+ rank->ttl,
+ rank->prio);
+ total_score -= rank->score;
+ rank->score = 0; /* mark as used */
+ break;
+ }
+ selector -= rank->score;
+ rank = rank->next;
+ }
+ }
+ /* free rpc.rankings list */
+ while (rpc.rankings != NULL)
+ {
+ rank = rpc.rankings;
+ rpc.rankings = rank->next;
+ GNUNET_FS_PT_change_rc(rank->peer, -1);
+ GNUNET_free(rank);
+ }
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+}
+
+/**
+ * Try to add the given request to the buffer.
+ *
+ * @param available size of the buffer
+ * @return number of bytes written to the buffer
+ */
+static unsigned int
+try_add_request(struct RequestList * req,
+ unsigned int prio,
+ int ttl,
+ void * buf,
+ unsigned int available)
+{
+ P2P_gap_query_MESSAGE * msg = buf;
+ unsigned int size;
+
+ GNUNET_GE_ASSERT(NULL, req->key_count > 0);
+ size = sizeof(P2P_gap_query_MESSAGE)
+ + req->bloomfilter_size
+ + (req->key_count - 1) * sizeof(GNUNET_HashCode);
+ if (size > available)
+ return 0;
+ msg->header.size = htons(size);
+ msg->header.type = htons(GNUNET_P2P_PROTO_GAP_QUERY);
+ msg->type = htonl(req->type);
+ msg->priority = htonl(prio);
+ msg->ttl = htonl(ttl);
+ msg->filter_mutator = htonl(req->bloomfilter_mutator);
+ msg->number_of_queries = htonl(req->key_count);
+ msg->returnTo = *coreAPI->myIdentity; /* FIXME? */
+ memcpy(&msg->queries[0],
+ &req->queries[0],
+ req->key_count * sizeof(GNUNET_HashCode));
+ if (req->bloomfilter != NULL)
+ GNUNET_bloomfilter_get_raw_data(req->bloomfilter,
+ (char*) &msg->queries[req->key_count],
+ req->bloomfilter_size);
+
+ /* FIXME: update state tracking
+ what queries were sent with
+ what priorities/ ttls / etc */
+ req->last_request_time = GNUNET_get_time();
+ req->last_ttl_used = ttl;
+ req->value = prio;
+
+ return size;
+}
+
+/**
+ * The core has space for a query, find one!
+ *
+ * @param receiver the receiver of the message
+ * @param position is the reference to the
+ * first unused position in the buffer where GNUnet is building
+ * the message
+ * @param padding is the number of bytes left in that buffer.
+ * @return the number of bytes written to
+ * that buffer (must be a positive number).
+ */
+static unsigned int
+query_fill_callback (const GNUNET_PeerIdentity *
+ receiver, void *position,
+ unsigned int padding)
+{
+ char * buf = position;
+ struct QueryPlanList * pl;
+ struct QueryPlanEntry * e;
+ struct QueryPlanEntry * n;
+ struct QueryPlanEntry * pos;
+ struct QueryPlanEntry * prev;
+ struct PeerHistoryList * hl;
+ struct ClientInfoList * cl;
+ PID_INDEX peer;
+ unsigned int off;
+ unsigned int ret;
+
+ off = 0;
+ peer = GNUNET_FS_PT_intern(receiver);
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ pl = queries;
+ while ( (pl != NULL) &&
+ (pl->peer != peer) )
+ pl = pl->next;
+ if (pl != NULL)
+ {
+ e = pl->head;
+ while ( (e != NULL) &&
+ (padding - off >= sizeof(P2P_gap_query_MESSAGE)) )
+ {
+ ret = try_add_request(e->request,
+ e->prio,
+ e->ttl,
+ &buf[off],
+ padding - off);
+ n = e->next;
+ if (ret != 0)
+ {
+ /* remove e from e's doubly-linked list */
+ if (e->prev != NULL)
+ e->prev->next = e->next;
+ else
+ pl->head = e->next;
+ if (e->next != NULL)
+ e->next->prev = e->prev;
+ else
+ pl->tail = e->prev;
+ /* remove e from singly-linked list of request */
+ prev = NULL;
+ pos = e->request->plan_entries;
+ while (pos != e)
+ {
+ prev = pos;
+ pos = pos->plan_entries_next;
+ }
+ if (prev == NULL)
+ e->request->plan_entries = e->plan_entries_next;
+ else
+ prev->plan_entries_next = e->plan_entries_next;
+ GNUNET_free(e);
+ cl = find_or_create_client_entry(e->request->response_client,
+ e->request->response_target);
+ hl = find_or_create_history_entry(cl,
+ peer);
+ hl->last_request_time = GNUNET_get_time();
+ hl->request_count++;
+ }
+ off += ret;
+ e = n;
+ }
+ }
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+ GNUNET_FS_PT_change_rc(peer, -1);
+ return off;
+}
+
+/**
+ * Method called whenever a given client disconnects.
+ * Frees all of the associated data structures.
+ */
+static void
+handle_client_exit (struct GNUNET_ClientHandle *client)
+{
+ struct ClientInfoList * pos;
+ struct ClientInfoList * prev;
+ struct PeerHistoryList * ph;
+
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ pos = clients;
+ prev = NULL;
+ while (pos != NULL)
+ {
+ if (pos->client == client)
+ {
+ if (prev == NULL)
+ clients = pos->next;
+ else
+ prev->next = pos->next;
+ while (pos->history != NULL)
+ {
+ ph = pos->history;
+ pos->history = ph->next;
+ GNUNET_FS_PT_change_rc(ph->peer, -1);
+ GNUNET_free(ph);
+ }
+ GNUNET_FS_PT_change_rc(pos->peer, -1);
+ GNUNET_free(pos);
+ if (prev == NULL)
+ pos = clients;
+ else
+ pos = prev->next;
+ }
+ else
+ {
+ pos = pos->next;
+ }
+ }
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+}
+
+
+/**
+ * Notify the plan that a request succeeded.
+ */
+void
+GNUNET_FS_PLAN_success(PID_INDEX responder,
+ struct GNUNET_ClientHandle * client,
+ PID_INDEX peer,
+ const struct RequestList * success)
+{
+ struct ClientInfoList * cl;
+ struct PeerHistoryList * hl;
+
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ cl = find_or_create_client_entry(client, peer);
+ hl = find_or_create_history_entry(cl,
+ responder);
+ hl->response_count++;
+ hl->last_good_ttl = success->last_ttl_used;
+ hl->last_good_prio = success->value;
+ hl->last_response_time = GNUNET_get_time();
+ hl->response_count++;
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+}
+
+
+int
+GNUNET_FS_PLAN_init(GNUNET_CoreAPIForPlugins * capi)
+{
+ coreAPI = capi;
+ GNUNET_GE_ASSERT (capi->ectx,
+ GNUNET_SYSERR !=
+ capi->cs_exit_handler_register (&handle_client_exit));
+ GNUNET_GE_ASSERT (coreAPI->ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ connection_register_send_callback
(sizeof(P2P_gap_query_MESSAGE),
+
GNUNET_FS_GAP_QUERY_POLL_PRIORITY,
+ &query_fill_callback));
+ return 0;
+}
+
+int
+GNUNET_FS_PLAN_done()
+{
+ struct QueryPlanList * qpl;
+ struct QueryPlanEntry * el;
+ struct QueryPlanEntry * pred;
+
+ while (queries != NULL)
+ {
+ qpl = queries;
+ queries = qpl->next;
+ while (qpl->head != NULL)
+ {
+ el = qpl->head;
+ qpl->head = el->next;
+ pred = el->request->plan_entries;
+ if (pred == el)
+ el->request->plan_entries = el->plan_entries_next;
+ else
+ {
+ while (pred->plan_entries_next != el)
+ pred = pred->plan_entries_next;
+ pred->plan_entries_next = el->plan_entries_next;
+ }
+ GNUNET_free(el);
+ }
+ GNUNET_FS_PT_change_rc(qpl->peer, -1);
+ GNUNET_free(qpl);
+ }
+ /* clean up clients */
+ while (clients != NULL)
+ handle_client_exit(clients->client);
+ GNUNET_GE_ASSERT (coreAPI->ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ cs_exit_handler_unregister (&handle_client_exit));
+ GNUNET_GE_ASSERT (coreAPI->ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ connection_unregister_send_callback
(sizeof(P2P_gap_query_MESSAGE),
+ &query_fill_callback));
+ return 0;
+}
+
+/* end of plan.c */
Property changes on: GNUnet/src/applications/fs/gap/plan.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/plan.h
===================================================================
--- GNUnet/src/applications/fs/gap/plan.h (rev 0)
+++ GNUnet/src/applications/fs/gap/plan.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,57 @@
+
+/*
+ This file is part of GNUnet
+ (C) 2008 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/plan.h
+ * @brief code to plan when to send requests where
+ * @author Christian Grothoff
+ */
+
+#ifndef PLAN_H
+#define PLAN_H
+
+#include "gnunet_core.h"
+#include "shared.h"
+
+/**
+ * Plan the transmission of the given request.
+ * Use the history of the request and the client
+ * to schedule the request for transmission.
+ */
+void
+GNUNET_FS_PLAN_request(struct GNUNET_ClientHandle * client,
+ PID_INDEX peer,
+ struct RequestList * request);
+
+/**
+ * Notify the plan that a request succeeded.
+ */
+void
+GNUNET_FS_PLAN_success(PID_INDEX responder,
+ struct GNUNET_ClientHandle * client,
+ PID_INDEX peer,
+ const struct RequestList * success);
+
+int GNUNET_FS_PLAN_init(GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_FS_PLAN_done(void);
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/plan.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/querymanager.c
===================================================================
--- GNUnet/src/applications/fs/gap/querymanager.c
(rev 0)
+++ GNUnet/src/applications/fs/gap/querymanager.c 2008-02-06 06:52:39 UTC
(rev 6161)
@@ -0,0 +1,466 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Christian Grothoff
(and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/querymanager.c
+ * @brief management of queries from our clients
+ * @author Christian Grothoff
+ *
+ * This code forwards queries (using GAP and DHT) to other peers and
+ * passes replies (from GAP or DHT) back to clients.
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "querymanager.h"
+#include "fs.h"
+#include "fs_dht.h"
+#include "gap.h"
+#include "dht.h"
+#include "plan.h"
+#include "pid_table.h"
+#include "shared.h"
+
+#define CHECK_REPEAT_FREQUENCY (5 * GNUNET_CRON_SECONDS)
+
+/**
+ * Linked list with information for each client.
+ */
+struct ClientDataList {
+
+ /**
+ * This is a linked list.
+ */
+ struct ClientDataList * next;
+
+ /**
+ * For which client is this data kept?
+ */
+ struct GNUNET_ClientHandle * client;
+
+ /**
+ * List of active requests for the client.
+ */
+ struct RequestList * requests;
+
+};
+
+static GNUNET_CoreAPIForPlugins * coreAPI;
+
+/**
+ * List of all clients, their active requests and other
+ * per-client information.
+ */
+static struct ClientDataList * clients;
+
+/**
+ * A client is asking us to run a query. The query should be issued
+ * until either a unique response has been obtained or until the
+ * client disconnects.
+ *
+ * @param target peer known to have the content, maybe NULL.
+ */
+void
+GNUNET_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
+ unsigned int key_count,
+ unsigned int anonymityLevel,
+ unsigned int type,
+ struct GNUNET_ClientHandle * client,
+ const GNUNET_PeerIdentity * target)
+{
+ struct ClientDataList * cl;
+ struct RequestList * request;
+
+ GNUNET_GE_ASSERT(NULL, key_count > 0);
+
+ request = GNUNET_malloc(sizeof(struct RequestList) + (key_count-1) *
sizeof(GNUNET_HashCode));
+ memset(request, 0, sizeof(struct RequestList));
+ request->anonymityLevel = anonymityLevel;
+ request->key_count = key_count;
+ request->type = type;
+ request->primary_target = GNUNET_FS_PT_intern(target);
+ request->response_client = client;
+ memcpy(&request->queries[0],
+ query,
+ sizeof(GNUNET_HashCode)*key_count);
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ cl = clients;
+ while ( (cl != NULL) &&
+ (cl->client != client) )
+ cl = cl->next;
+ if (cl == NULL)
+ {
+ cl = GNUNET_malloc(sizeof(struct ClientDataList));
+ memset(cl, 0, sizeof(struct ClientDataList));
+ cl->next = clients;
+ clients = cl;
+ }
+ request->next = cl->requests;
+ cl->requests = request;
+ GNUNET_FS_PLAN_request(client, 0, request);
+ if (request->anonymityLevel == 0)
+ {
+ request->last_dht_get = GNUNET_get_time();
+ request->dht_back_off = MAX_DHT_DELAY;
+ GNUNET_FS_DHT_execute_query(request->type,
+ &request->queries[0]);
+ }
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+}
+
+/**
+ * How many bytes should a bloomfilter be if
+ * we have already seen entry_count responses?
+ * Note that GAP_BLOOMFILTER_K gives us the
+ * number of bits set per entry. Furthermore,
+ * we should not re-size the filter too often
+ * (to keep it cheap).
+ *
+ * Since other peers will also add entries but
+ * not resize the filter, we should generally
+ * pick a slightly larger size than what the
+ * strict math would suggest.
+ *
+ * @return must be a power of two and smaller
+ * or equal to 2^15.
+ */
+static unsigned int
+compute_bloomfilter_size(unsigned int entry_count)
+{
+ unsigned short size;
+ unsigned short max = 1 << 15;
+ unsigned int ideal = (entry_count * GAP_BLOOMFILTER_K) / 8;
+
+ size = 8;
+ while ( (size < max) &&
+ (size < ideal) )
+ size *= 2;
+ return size;
+}
+
+struct IteratorClosure {
+ struct ResponseList * pos;
+ int mingle_number;
+};
+
+/**
+ * Iterator over response list.
+ *
+ * @param arg pointer to a location where we
+ * have our current index into the linked list.
+ * @return GNUNET_YES if we have more,
+ * GNUNET_NO if this is the last entry
+ */
+static int
+response_bf_iterator(GNUNET_HashCode * next,
+ void *arg)
+{
+ struct IteratorClosure * cls = arg;
+ struct ResponseList * r = cls->pos;
+
+ if (NULL == r)
+ return GNUNET_NO;
+ GNUNET_FS_HELPER_mingle_hash(&r->hash,
+ cls->mingle_number,
+ next);
+ cls->pos = r->next;
+ return GNUNET_YES;
+}
+
+/**
+ * We got a response for a client request.
+ * Check if we have seen this response already.
+ * If not, check if it truly matches (namespace!).
+ * If so, transmit to client and update response
+ * lists and bloomfilter accordingly.
+ *
+ * @
+ * @param value how much is this response worth to us?
+ * the function should increment value accordingly
+ * @return GNUNET_OK if this was the last response
+ * and we should remove the request entry.
+ * GNUNET_NO if we should continue looking
+ * GNUNET_SYSERR on serious errors
+ */
+static int
+handle_response(PID_INDEX sender,
+ struct GNUNET_ClientHandle * client,
+ struct RequestList * rl,
+ const GNUNET_HashCode * primary_key,
+ GNUNET_CronTime expirationTime,
+ unsigned int size,
+ const DBlock * data,
+ unsigned int * value)
+{
+ struct IteratorClosure ic;
+ CS_fs_reply_content_MESSAGE * msg;
+ GNUNET_HashCode hc;
+ int ret;
+ unsigned int bf_size;
+
+ /* check that content matches query */
+ ret = GNUNET_FS_SHARED_test_valid_new_response(rl,
+ primary_key,
+ size,
+ data,
+ &hc);
+ if (ret != GNUNET_OK)
+ return ret;
+ if (sender == 0) /* dht produced response */
+ rl->dht_back_off = MAX_DHT_DELAY; /* go back! */
+ /* send to client */
+ msg = GNUNET_malloc(sizeof(CS_fs_reply_content_MESSAGE) + size);
+ msg->header.size = htons(sizeof(CS_fs_reply_content_MESSAGE) + size);
+ msg->header.type = htons(GNUNET_CS_PROTO_GAP_RESULT);
+ msg->anonymityLevel = htonl(0); /* unknown */
+ msg->expirationTime = GNUNET_htonll(expirationTime);
+ memcpy(&msg[1],
+ data,
+ size);
+ coreAPI->cs_send_to_client(client,
+ &msg->header,
+ (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
+ ? GNUNET_NO
+ : GNUNET_YES);
+ GNUNET_free(msg);
+
+ /* update *value */
+ *value += 1 + rl->value;
+ GNUNET_FS_PLAN_success(sender, client, 0, rl);
+
+ if (rl->type == GNUNET_ECRS_BLOCKTYPE_DATA)
+ return GNUNET_OK; /* the end */
+
+ /* update bloom filter */
+ rl->bloomfilter_entry_count++;
+ bf_size = compute_bloomfilter_size(rl->bloomfilter_entry_count);
+ if (rl->bloomfilter == NULL)
+ {
+ rl->bloomfilter_mutator
+ = GNUNET_random_u32(GNUNET_RANDOM_QUALITY_WEAK, -1);
+ rl->bloomfilter_size = bf_size;
+ rl->bloomfilter = GNUNET_bloomfilter_init(NULL,
+ NULL,
+ rl->bloomfilter_size,
+ GAP_BLOOMFILTER_K);
+ }
+ else if (rl->bloomfilter_size != bf_size)
+ {
+ rl->bloomfilter_mutator
+ = GNUNET_random_u32(GNUNET_RANDOM_QUALITY_WEAK, -1);
+ ic.pos = rl->responses;
+ ic.mingle_number = rl->bloomfilter_mutator;
+ GNUNET_bloomfilter_resize(rl->bloomfilter,
+ &response_bf_iterator,
+ &ic,
+ bf_size,
+ GAP_BLOOMFILTER_K);
+ }
+ GNUNET_FS_SHARED_mark_response_seen(rl,
+ &hc);
+
+ /* we want more */
+ return GNUNET_NO;
+}
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ * for future routing decisions)
+ * @param primary_query hash code used for lookup
+ * (note that namespace membership may
+ * require additional verification that has
+ * not yet been performed; checking the
+ * signature has already been done)
+ * @param size size of the data
+ * @param data the data itself (a DBlock)
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_FS_QUERYMANAGER_handle_response(const GNUNET_PeerIdentity * sender,
+ const GNUNET_HashCode * primary_query,
+ GNUNET_CronTime expirationTime,
+ unsigned int size,
+ const DBlock * data)
+{
+ struct ClientDataList * cl;
+ struct RequestList * rl;
+ struct RequestList * prev;
+ unsigned int value;
+ PID_INDEX rid;
+
+ rid = GNUNET_FS_PT_intern(sender);
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ value = 0;
+ cl = clients;
+ while (cl != NULL)
+ {
+ rl = cl->requests;
+ prev = NULL;
+ while (rl != NULL)
+ {
+ if (GNUNET_OK ==
+ handle_response(rid,
+ cl->client,
+ rl,
+ primary_query,
+ expirationTime,
+ size,
+ data,
+ &value))
+ {
+ if (prev != NULL)
+ prev->next = rl->next;
+ else
+ cl->requests = rl->next;
+ GNUNET_FS_SHARED_free_request_list(rl);
+ if (prev == NULL)
+ rl = cl->requests;
+ else
+ rl = prev->next;
+ }
+ else
+ {
+ prev = rl;
+ rl = rl->next;
+ }
+ }
+ cl = cl->next;
+ }
+
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+ GNUNET_FS_PT_change_rc(rid, -1);
+ return value;
+}
+
+/**
+ * Method called whenever a given client disconnects.
+ * Frees all of the associated data structures.
+ */
+static void
+handle_client_exit (struct GNUNET_ClientHandle *client)
+{
+ struct ClientDataList * cl;
+ struct ClientDataList * prev;
+ struct RequestList * rl;
+
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ cl = clients;
+ prev = NULL;
+ while ( (cl != NULL) &&
+ (cl->client != client) )
+ {
+ prev = cl;
+ cl = cl->next;
+ }
+ if (cl != NULL)
+ {
+ while (cl->requests != NULL)
+ {
+ rl = cl->requests;
+ cl->requests = rl->next;
+ GNUNET_FS_SHARED_free_request_list(rl);
+ }
+ if (prev == NULL)
+ clients = cl->next;
+ else
+ prev->next = cl->next;
+ GNUNET_free(cl);
+ }
+ GNUNET_mutex_unlock (GNUNET_FS_lock);
+}
+
+/**
+ * Cron-job to periodically check if we should
+ * repeat requests.
+ */
+static void
+repeat_requests_job(void * unused)
+{
+ struct ClientDataList * client;
+ struct RequestList * request;
+ GNUNET_CronTime now;
+
+ GNUNET_mutex_lock(GNUNET_FS_lock);
+ now = GNUNET_get_time();
+ client = clients;
+ while (client != NULL)
+ {
+ request = client->requests;
+ while (request != NULL)
+ {
+ if ( (NULL == request->plan_entries) &&
+ ( (request->expiration == 0) ||
+ (request->expiration > now) ) &&
+ (request->last_ttl_used * GNUNET_CRON_SECONDS +
+ request->last_request_time > now) )
+ GNUNET_FS_PLAN_request(client->client,
+ 0,
+ request);
+
+ if ( (request->anonymityLevel == 0) &&
+ (request->last_dht_get + request->dht_back_off < now) )
+ {
+ if (request->dht_back_off * 2 >
+ request->dht_back_off)
+ request->dht_back_off *= 2;
+ request->last_dht_get = now;
+ GNUNET_FS_DHT_execute_query(request->type,
+ &request->queries[0]);
+ }
+ request = request->next;
+ }
+ client = client->next;
+ }
+ GNUNET_mutex_unlock(GNUNET_FS_lock);
+}
+
+int
+GNUNET_FS_QUERYMANAGER_init(GNUNET_CoreAPIForPlugins * capi)
+{
+ coreAPI = capi;
+ GNUNET_GE_ASSERT (capi->ectx,
+ GNUNET_SYSERR !=
+ capi->cs_exit_handler_register (&handle_client_exit));
+ GNUNET_cron_add_job(capi->cron,
+ &repeat_requests_job,
+ CHECK_REPEAT_FREQUENCY,
+ CHECK_REPEAT_FREQUENCY,
+ NULL);
+ return 0;
+}
+
+int
+GNUNET_FS_QUERYMANAGER_done()
+{
+ GNUNET_cron_del_job(coreAPI->cron,
+ &repeat_requests_job,
+ CHECK_REPEAT_FREQUENCY,
+ NULL);
+ GNUNET_GE_ASSERT (coreAPI->ectx,
+ GNUNET_SYSERR !=
+ coreAPI->
+ cs_exit_handler_unregister (&handle_client_exit));
+ return 0;
+}
+
+/* end of querymanager.c */
Property changes on: GNUnet/src/applications/fs/gap/querymanager.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/querymanager.h
===================================================================
--- GNUnet/src/applications/fs/gap/querymanager.h
(rev 0)
+++ GNUnet/src/applications/fs/gap/querymanager.h 2008-02-06 06:52:39 UTC
(rev 6161)
@@ -0,0 +1,76 @@
+/*
+ This file is part of GNUnet
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008 Christian Grothoff (and
other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/querymanager.h
+ * @brief management of queries from our clients
+ * @author Christian Grothoff
+ */
+#ifndef QUERYMANAGER_H
+#define QUERYMANAGER_H
+
+#include "gnunet_util.h"
+#include "gnunet_core.h"
+#include "ecrs_core.h"
+
+int GNUNET_FS_QUERYMANAGER_init(GNUNET_CoreAPIForPlugins * capi);
+
+int GNUNET_FS_QUERYMANAGER_done(void);
+
+
+/**
+ * A client is asking us to run a query. The query should be issued
+ * until either a unique response has been obtained or until the
+ * client disconnects.
+ *
+ * @param target peer known to have the content, maybe NULL.
+ */
+void
+GNUNET_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
+ unsigned int key_count,
+ unsigned int anonymityLevel,
+ unsigned int type,
+ struct GNUNET_ClientHandle * client,
+ const GNUNET_PeerIdentity * target);
+
+/**
+ * Handle the given response (by forwarding it to
+ * other peers as necessary).
+ *
+ * @param sender who send the response (good too know
+ * for future routing decisions)
+ * @param primary_query hash code used for lookup
+ * (note that namespace membership may
+ * require additional verification that has
+ * not yet been performed; checking the
+ * signature has already been done)
+ * @param size size of the data
+ * @param data the data itself (a DBlock)
+ * @return how much was this content worth to us?
+ */
+unsigned int
+GNUNET_FS_QUERYMANAGER_handle_response(const GNUNET_PeerIdentity * sender,
+ const GNUNET_HashCode * primary_query,
+ GNUNET_CronTime expirationTime,
+ unsigned int size,
+ const DBlock * data);
+
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/querymanager.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/shared.c
===================================================================
--- GNUnet/src/applications/fs/gap/shared.c (rev 0)
+++ GNUnet/src/applications/fs/gap/shared.c 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,213 @@
+/*
+ This file is part of GNUnet
+ (C) 2008 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/shared.c
+ * @brief shared helper functions and data structures
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "shared.h"
+#include "fs.h"
+
+/**
+ * Free the request list, including the associated
+ * list of pending requests, its entries in the
+ * plans for various peers and known responses.
+ */
+void
+GNUNET_FS_SHARED_free_request_list(struct RequestList * rl)
+{
+ struct ResponseList * repl;
+ struct QueryPlanEntry * planl;
+
+ while (rl->responses != NULL)
+ {
+ repl = rl->responses;
+ rl->responses = repl->next;
+ GNUNET_free(repl);
+ }
+ while (rl->plan_entries != NULL)
+ {
+ planl = rl->plan_entries;
+ rl->plan_entries = planl->plan_entries_next;
+ if (planl->next != NULL)
+ planl->next->prev = planl->prev;
+ else
+ planl->list->tail = planl->prev;
+ if (planl->prev != NULL)
+ planl->prev->next = planl->next;
+ else
+ planl->list->head = planl->next;
+ GNUNET_free(planl);
+ }
+ if (rl->bloomfilter != NULL)
+ GNUNET_bloomfilter_free(rl->bloomfilter);
+ GNUNET_FS_PT_change_rc(rl->primary_target, -1);
+ GNUNET_FS_PT_change_rc(rl->response_target, -1);
+ GNUNET_free(rl);
+}
+
+
+
+/**
+ * Check if the given value is a valid
+ * and new response for the given request list
+ * entry.
+ *
+ * @param hc set to the hash of the data if successful
+ * @return GNUNET_OK if so, GNUNET_NO if not new or not
+ * applicable, GNUNET_SYSERR on error
+ */
+int
+GNUNET_FS_SHARED_test_valid_new_response(struct RequestList * rl,
+ const GNUNET_HashCode * primary_key,
+ unsigned int size,
+ const DBlock * data,
+ GNUNET_HashCode * hc)
+{
+ struct ResponseList * seen;
+ GNUNET_HashCode m;
+ int ret;
+
+ /* check that type and primary key match */
+ if ( (rl->type != ntohl(data->type)) ||
+ (0 != memcmp(primary_key,
+ &rl->queries[0],
+ sizeof(GNUNET_HashCode))) )
+ return GNUNET_NO;
+
+ /* check that content matches query */
+ ret = GNUNET_EC_is_block_applicable_for_query(rl->type,
+ size,
+ data,
+ &rl->queries[0],
+ rl->key_count,
+ &rl->queries[0]);
+ if (ret != GNUNET_OK)
+ return ret;
+
+ /* check that this is a new response */
+ GNUNET_hash(data, size,
+ hc);
+ GNUNET_FS_HELPER_mingle_hash(hc,
+ rl->bloomfilter_mutator,
+ &m);
+ if ( (rl->bloomfilter != NULL) &&
+ (GNUNET_YES == GNUNET_bloomfilter_test(rl->bloomfilter,
+ &m)) )
+ return GNUNET_NO; /* not useful */
+ seen = rl->responses;
+ while (seen != NULL)
+ {
+ if (0 == memcmp(hc,
+ &seen->hash,
+ sizeof(GNUNET_HashCode)))
+ return GNUNET_NO;
+ seen = seen->next;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Mark the response corresponding to the given
+ * hash code as seen (update linked list and bloom filter).
+ */
+void
+GNUNET_FS_SHARED_mark_response_seen(struct RequestList * rl,
+ GNUNET_HashCode * hc)
+{
+ struct ResponseList * seen;
+
+ GNUNET_bloomfilter_add(rl->bloomfilter,
+ hc);
+ /* update seen list */
+ seen = GNUNET_malloc(sizeof(struct ResponseList));
+ seen->hash = *hc;
+ seen->next = rl->responses;
+ rl->responses = seen;
+}
+
+
+/**
+ * If the data portion and type of the value match our value in the
+ * closure, copy the header (prio, anonymityLevel, expirationTime) and
+ * abort the iteration: we found what we're looing for. Otherwise
+ * continue.
+ */
+int
+GNUNET_FS_HELPER_complete_value_from_database_callback (const GNUNET_HashCode
* key,
+ const
GNUNET_DatastoreValue * value, void *closure,
+ unsigned long long uid)
+{
+ GNUNET_DatastoreValue *comp = closure;
+
+ if ((comp->size != value->size) ||
+ (0 != memcmp (&value[1],
+ &comp[1],
+ ntohl (value->size) - sizeof (GNUNET_DatastoreValue))))
+ return GNUNET_OK;
+ *comp = *value;
+ return GNUNET_SYSERR;
+}
+
+
+/**
+ * Mingle hash with the mingle_number to
+ * produce different bits.
+ */
+void
+GNUNET_FS_HELPER_mingle_hash(const GNUNET_HashCode * in,
+ int mingle_number,
+ GNUNET_HashCode * hc)
+{
+ GNUNET_HashCode m;
+
+ GNUNET_hash(&mingle_number, sizeof(int), &m);
+ GNUNET_hash_xor(&m, in, hc);
+}
+
+
+/**
+ * The priority level imposes a bound on the maximum
+ * value for the ttl that can be requested.
+ *
+ * @param ttl_in requested ttl
+ * @param prio given priority
+ * @return ttl_in if ttl_in is below the limit,
+ * otherwise the ttl-limit for the given priority
+ */
+int
+GNUNET_FS_HELPER_bound_ttl(int ttl_in,
+ unsigned int prio)
+{
+ if (ttl_in > ((unsigned long long) prio) * TTL_DECREMENT /
GNUNET_CRON_SECONDS)
+ {
+ if (((unsigned long long) prio) * TTL_DECREMENT / GNUNET_CRON_SECONDS >=
(1 <<30) )
+ return 1<<30;
+ return (int) ((unsigned long long) prio) * TTL_DECREMENT /
GNUNET_CRON_SECONDS;
+ }
+ return ttl_in;
+}
+
+
+/* end of shared.c */
Property changes on: GNUnet/src/applications/fs/gap/shared.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/shared.h
===================================================================
--- GNUnet/src/applications/fs/gap/shared.h (rev 0)
+++ GNUnet/src/applications/fs/gap/shared.h 2008-02-06 06:52:39 UTC (rev
6161)
@@ -0,0 +1,325 @@
+/*
+ This file is part of GNUnet
+ (C) 2008 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file fs/gap/shared.h
+ * @brief shared helper functions and data structures
+ * @author Christian Grothoff
+ */
+#ifndef SHARED_H
+#define SHARED_H
+
+#include "gnunet_util.h"
+#include "ecrs_core.h"
+#include "pid_table.h"
+
+/**
+ * Linked list of responses that we have gotten for
+ * this request. Used to avoid forwarding the same
+ * response to the client multiple times and to
+ * construct the bloom filter to block duplicates.
+ */
+struct ResponseList {
+
+ /**
+ * This is a linked list.
+ */
+ struct ResponseList * next;
+
+ /**
+ * Hash of the dblocks of the responses.
+ */
+ GNUNET_HashCode hash;
+
+};
+
+/**
+ * Linked list with the active requests of a client.
+ */
+struct RequestList {
+
+ /**
+ * This is a linked list.
+ */
+ struct RequestList * next;
+
+ /**
+ * Linked list of responses that we have
+ * already received for this request.
+ */
+ struct ResponseList * responses;
+
+ /**
+ * Linked list of query plan entries that this
+ * request is part of (when a request is done,
+ * these entries should be removed from the
+ * respective query plans). This is the head
+ * of a linked list that is constructed using
+ * the "plan_entries_next" field of QueryPlanEntry.
+ */
+ struct QueryPlanEntry * plan_entries;
+
+ /**
+ * Bloomfilter for the query (maybe NULL).
+ */
+ struct GNUNET_BloomFilter * bloomfilter;
+
+ /**
+ * NULL if this request is for another peer,
+ * otherwise the handle of the client for which
+ * this request is made.
+ */
+ struct GNUNET_ClientHandle * response_client;
+
+ /**
+ * Last time we tried to get a response for this
+ * query from the DHT (will always be zero for
+ * anonymous requests).
+ */
+ GNUNET_CronTime last_dht_get;
+
+ /**
+ * How long should we wait before re-trying the
+ * DHT-get operation?
+ */
+ GNUNET_CronTime dht_back_off;
+
+ /**
+ * When does this query record expire? (0 for never).
+ */
+ GNUNET_CronTime expiration;
+
+ /**
+ * When did we last issue this request? (0 for never).
+ */
+ GNUNET_CronTime last_request_time;
+
+ /**
+ * Size of the bloomfilter (in bytes); must be a power of 2.
+ */
+ unsigned int bloomfilter_size;
+
+ /**
+ * Number of entries in the bloomfilter (used to tell when
+ * we should grow its size).
+ */
+ unsigned int bloomfilter_entry_count;
+
+ /**
+ * Mutator used for the bloom filter.
+ */
+ int bloomfilter_mutator;
+
+ /**
+ * Desired level of (receiver) anonymity.
+ */
+ unsigned int anonymityLevel;
+
+ /**
+ * Number of queries at the end of this struct.
+ */
+ unsigned int key_count;
+
+ /**
+ * Type of the expected response.
+ */
+ unsigned int type;
+
+ /**
+ * If there is no peer that is suspected to have the result,
+ * the PID_INDEX will be zero.
+ */
+ PID_INDEX primary_target;
+
+ /**
+ * Where to send a response (if we get one).
+ * Maybe zero (if we are the peer that cares).
+ */
+ PID_INDEX response_target;
+
+ /**
+ * (Relative) TTL used in the last request.
+ */
+ int last_ttl_used;
+
+ /**
+ * Priority used for the last request.
+ */
+ unsigned int value;
+
+ /**
+ * The queries of this request. At least one,
+ * if there are more, the key count field will say
+ * so.
+ */
+ GNUNET_HashCode queries[1];
+
+};
+
+/**
+ * Doubly-linked list of the queries to consider for
+ * a peer. All QueryPlanEntries are ALSO part of a
+ * simple linked list starting at the respective
+ * RequestList.
+ */
+struct QueryPlanEntry {
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct QueryPlanEntry * next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct QueryPlanEntry * prev;
+
+ /**
+ * Query plan that this entry belongs to.
+ */
+ struct QueryPlanList * list;
+
+ /**
+ * Details about the request in the plan.
+ */
+ struct RequestList * request;
+
+ /**
+ * Other query plan entires for the same
+ * request (those entries will be part of
+ * other query plan lists).
+ */
+ struct QueryPlanEntry * plan_entries_next;
+
+ /**
+ * Request priority that should be used.
+ */
+ unsigned int prio;
+
+ /**
+ * Request TTL that should be used.
+ */
+ int ttl;
+
+};
+
+
+/**
+ * Linked list of queries to consider for each peer.
+ */
+struct QueryPlanList {
+
+ /**
+ * This is a linked list.
+ */
+ struct QueryPlanList * next;
+
+ /**
+ * Head of the doubly-linked list of queries to consider.
+ */
+ struct QueryPlanEntry * head;
+
+ /**
+ * Tail of the doubly-linked list of queries to consider.
+ */
+ struct QueryPlanEntry * tail;
+
+ /**
+ * For which peer is this the current plan?
+ */
+ PID_INDEX peer;
+
+};
+
+/**
+ * Lock used to synchronize access to
+ * all shared datastructures.
+ */
+extern struct GNUNET_Mutex * GNUNET_FS_lock;
+
+/**
+ * Free the request list, including the associated
+ * list of pending requests, its entries in the
+ * plans for various peers and known responses.
+ */
+void
+GNUNET_FS_SHARED_free_request_list(struct RequestList * rl);
+
+/**
+ * Check if the given value is a valid
+ * and new response for the given request list
+ * entry.
+ *
+ * @param hc set to the hash of the data if successful
+ * @return GNUNET_OK if so, GNUNET_NO if not new or not
+ * applicable, GNUNET_SYSERR on error
+ */
+int
+GNUNET_FS_SHARED_test_valid_new_response(struct RequestList * rl,
+ const GNUNET_HashCode * primary_key,
+ unsigned int size,
+ const DBlock * data,
+ GNUNET_HashCode * hc);
+
+/**
+ * Mark the response corresponding to the given
+ * hash code as seen (update linked list and bloom filter).
+ */
+void
+GNUNET_FS_SHARED_mark_response_seen(struct RequestList * rl,
+ GNUNET_HashCode * hc);
+
+/**
+ * If the data portion and type of the value match our value in the
+ * closure, copy the header (prio, anonymityLevel, expirationTime) and
+ * abort the iteration: we found what we're looing for. Otherwise
+ * continue.
+ */
+int
+GNUNET_FS_HELPER_complete_value_from_database_callback (const GNUNET_HashCode
* key,
+ const
GNUNET_DatastoreValue * value, void *closure,
+ unsigned long long uid);
+
+
+/**
+ * Mingle hash with the mingle_number to
+ * produce different bits. We use this
+ * to generate many different bloomfilters
+ * for the same data.
+ */
+void
+GNUNET_FS_HELPER_mingle_hash(const GNUNET_HashCode * in,
+ int mingle_number,
+ GNUNET_HashCode * hc);
+
+/**
+ * The priority level imposes a bound on the maximum
+ * value for the ttl that can be requested.
+ *
+ * @param ttl_in requested ttl
+ * @param prio given priority
+ * @return ttl_in if ttl_in is below the limit,
+ * otherwise the ttl-limit for the given priority
+ */
+int
+GNUNET_FS_HELPER_bound_ttl(int ttl_in,
+ unsigned int prio);
+
+#endif
Property changes on: GNUnet/src/applications/fs/gap/shared.h
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/test_linear_topology.c
===================================================================
--- GNUnet/src/applications/fs/gap/test_linear_topology.c
(rev 0)
+++ GNUnet/src/applications/fs/gap/test_linear_topology.c 2008-02-06
06:52:39 UTC (rev 6161)
@@ -0,0 +1,325 @@
+/*
+ This file is part of GNUnet.
+ (C) 2005, 2006, 2007 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/gap/gaptest2.c
+ * @brief GAP routing testcase, linear topology
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "gnunet_ecrs_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_stats_lib.h"
+#include "gnunet_util.h"
+#include "gnunet_stats_lib.h"
+
+
+#define START_PEERS 1
+
+#define PEER_COUNT 4
+
+#define SIZE 1024 * 1024 * 2
+
+static struct GNUNET_GE_Context *ectx;
+
+static struct GNUNET_GC_Configuration *cfg;
+
+static int
+testTerminate (void *unused)
+{
+ return GNUNET_OK;
+}
+
+static void
+uprogress (unsigned long long totalBytes,
+ unsigned long long completedBytes, GNUNET_CronTime eta,
+ void *closure)
+{
+ fprintf (stderr, totalBytes == completedBytes ? "\n" : ".");
+}
+
+static void
+dprogress (unsigned long long totalBytes,
+ unsigned long long completedBytes,
+ GNUNET_CronTime eta,
+ unsigned long long lastBlockOffset,
+ const char *lastBlock, unsigned int lastBlockSize, void *closure)
+{
+ fprintf (stderr, totalBytes == completedBytes ? "\n" : ".");
+}
+
+static char *
+makeName (unsigned int i)
+{
+ char *fn;
+
+ fn = GNUNET_malloc (strlen ("/tmp/gnunet-gaptest/GAPTEST") + 14);
+ GNUNET_snprintf (fn,
+ strlen ("/tmp/gnunet-gaptest/GAPTEST") + 14,
+ "/tmp/gnunet-gaptest/GAPTEST%u", i);
+ GNUNET_disk_directory_create_for_file (NULL, fn);
+ return fn;
+}
+
+static struct GNUNET_ECRS_URI *
+uploadFile (unsigned int size)
+{
+ int ret;
+ char *name;
+ int fd;
+ char *buf;
+ struct GNUNET_ECRS_URI *uri;
+ int i;
+
+ name = makeName (size);
+ fd =
+ GNUNET_disk_file_open (ectx, name, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR);
+ buf = GNUNET_malloc (size);
+ memset (buf, size / 253, sizeof (GNUNET_HashCode));
+ for (i = 0; i < size - sizeof (GNUNET_HashCode);
+ i += sizeof (GNUNET_HashCode))
+ GNUNET_hash (&buf[i], sizeof (GNUNET_HashCode),
+ (GNUNET_HashCode *) & buf[i + sizeof (GNUNET_HashCode)]);
+ WRITE (fd, buf, size);
+ GNUNET_free (buf);
+ GNUNET_disk_file_close (ectx, name, fd);
+ ret = GNUNET_ECRS_file_upload (ectx, cfg, name, GNUNET_YES, /* index */
+ 1, /* anon */
+ 0, /* prio */
+ GNUNET_get_time () + 100 *
GNUNET_CRON_MINUTES, /* expire */
+ &uprogress, /* progress */
+ NULL, &testTerminate, NULL, &uri);
+ if (ret != GNUNET_SYSERR)
+ {
+ struct GNUNET_ECRS_MetaData *meta;
+ struct GNUNET_ECRS_URI *key;
+ const char *keywords[2];
+
+ keywords[0] = name;
+ keywords[1] = NULL;
+
+ meta = GNUNET_ECRS_meta_data_create ();
+ key = GNUNET_ECRS_keyword_strings_to_uri (keywords);
+ ret = GNUNET_ECRS_publish_under_keyword (ectx, cfg, key, 0, 0,
GNUNET_get_time () + 100 * GNUNET_CRON_MINUTES, /* expire */
+ uri, meta);
+ GNUNET_ECRS_meta_data_destroy (meta);
+ GNUNET_ECRS_uri_destroy (uri);
+ GNUNET_free (name);
+ if (ret == GNUNET_OK)
+ {
+ return key;
+ }
+ else
+ {
+ GNUNET_ECRS_uri_destroy (key);
+ return NULL;
+ }
+ }
+ else
+ {
+ GNUNET_free (name);
+ return NULL;
+ }
+}
+
+static int
+searchCB (const GNUNET_ECRS_FileInfo * fi,
+ const GNUNET_HashCode * key, int isRoot, void *closure)
+{
+ struct GNUNET_ECRS_URI **my = closure;
+ char *tmp;
+
+ tmp = GNUNET_ECRS_uri_to_string (fi->uri);
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Search found URI `%s'\n", tmp);
+ GNUNET_free (tmp);
+ GNUNET_GE_ASSERT (ectx, NULL == *my);
+ *my = GNUNET_ECRS_uri_duplicate (fi->uri);
+ return GNUNET_SYSERR; /* abort search */
+}
+
+/**
+ * @param *uri In: keyword URI, out: file URI
+ * @return GNUNET_OK on success
+ */
+static int
+searchFile (struct GNUNET_ECRS_URI **uri)
+{
+ int ret;
+ struct GNUNET_ECRS_URI *myURI;
+
+ myURI = NULL;
+ ret = GNUNET_ECRS_search (ectx,
+ cfg,
+ *uri,
+ 1,
+ 1450 * GNUNET_CRON_SECONDS,
+ &searchCB, &myURI, &testTerminate, NULL);
+ GNUNET_ECRS_uri_destroy (*uri);
+ *uri = myURI;
+ if ((ret != GNUNET_SYSERR) && (myURI != NULL))
+ return GNUNET_OK;
+ else
+ return GNUNET_SYSERR;
+}
+
+static int
+downloadFile (unsigned int size, const struct GNUNET_ECRS_URI *uri)
+{
+ int ret;
+ char *tmpName;
+ int fd;
+ char *buf;
+ char *in;
+ int i;
+ char *tmp;
+
+ tmp = GNUNET_ECRS_uri_to_string (uri);
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Starting download of `%s'\n", tmp);
+ GNUNET_free (tmp);
+ tmpName = makeName (0);
+ ret = GNUNET_SYSERR;
+ if (GNUNET_OK == GNUNET_ECRS_file_download (ectx,
+ cfg,
+ uri,
+ tmpName,
+ 1, &dprogress, NULL,
+ &testTerminate, NULL))
+ {
+
+ fd = GNUNET_disk_file_open (ectx, tmpName, O_RDONLY);
+ buf = GNUNET_malloc (size);
+ in = GNUNET_malloc (size);
+ memset (buf, size / 253, sizeof (GNUNET_HashCode));
+ for (i = 0; i < size - sizeof (GNUNET_HashCode);
+ i += sizeof (GNUNET_HashCode))
+ GNUNET_hash (&buf[i], sizeof (GNUNET_HashCode),
+ (GNUNET_HashCode *) & buf[i + sizeof (GNUNET_HashCode)]);
+ if (size != READ (fd, in, size))
+ ret = GNUNET_SYSERR;
+ else if (0 == memcmp (buf, in, size))
+ ret = GNUNET_OK;
+ GNUNET_free (buf);
+ GNUNET_free (in);
+ GNUNET_disk_file_close (ectx, tmpName, fd);
+ }
+ UNLINK (tmpName);
+ GNUNET_free (tmpName);
+ return ret;
+}
+
+static int
+unindexFile (unsigned int size)
+{
+ int ret;
+ char *name;
+
+ name = makeName (size);
+ ret =
+ GNUNET_ECRS_file_uninde (ectx, cfg, name, NULL, NULL, &testTerminate,
+ NULL);
+ if (0 != UNLINK (name))
+ ret = GNUNET_SYSERR;
+ GNUNET_free (name);
+ return ret;
+}
+
+#define CHECK(a) if (!(a)) { ret = 1; GNUNET_GE_BREAK(ectx, 0); goto FAILURE; }
+
+/**
+ * Testcase to test gap routing (2 peers only).
+ * @return 0: ok, -1: error
+ */
+int
+main (int argc, char **argv)
+{
+ struct GNUNET_TESTING_DaemonContext *peers;
+ int ret;
+ struct GNUNET_ECRS_URI *uri;
+ int i;
+ char buf[128];
+ GNUNET_CronTime start;
+
+ ret = 0;
+ cfg = GNUNET_GC_create ();
+ if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+ {
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#if START_PEERS
+ peers = GNUNET_TESTING_start_daemons ("tcp",
+ "advertising topology fs stats",
+ "/tmp/gnunet-gap-test2",
+ 2087, 10, PEER_COUNT);
+ if (peers == NULL)
+ {
+ fprintf (stderr, "Failed to start the gnunetd daemons!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#endif
+ for (i = 1; i < PEER_COUNT; i++)
+ {
+ if (GNUNET_OK != GNUNET_TESTING_connect_daemons (2077 + (10 * i),
+ 2087 + (10 * i)))
+ {
+ GNUNET_TESTING_stop_daemons (peers);
+ fprintf (stderr, "Failed to connect the peers!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+ }
+
+ printf ("Uploading...\n");
+ uri = uploadFile (SIZE);
+ CHECK (NULL != uri);
+ GNUNET_snprintf (buf, 128, "localhost:%u", 2077 + PEER_COUNT * 10);
+ GNUNET_GC_set_configuration_value_string (cfg, ectx, "NETWORK", "HOST",
+ buf);
+ CHECK (GNUNET_OK == searchFile (&uri));
+ printf ("Search successful!\n");
+ start = GNUNET_get_time ();
+ printf ("Downloading...\n");
+ CHECK (GNUNET_OK == downloadFile (SIZE, uri));
+ printf ("Download successful at %llu kbps!\n",
+ (SIZE / 1024) / ((GNUNET_get_time () - start) /
+ GNUNET_CRON_SECONDS));
+ GNUNET_ECRS_uri_destroy (uri);
+ GNUNET_GC_set_configuration_value_string (cfg,
+ ectx,
+ "NETWORK", "HOST",
+ "localhost:2087");
+ CHECK (GNUNET_OK == unindexFile (SIZE));
+
+FAILURE:
+#if START_PEERS
+ GNUNET_TESTING_stop_daemons (peers);
+#endif
+
+ GNUNET_GC_free (cfg);
+ return ret;
+}
+
+/* end of gaptest2.c */
Property changes on: GNUnet/src/applications/fs/gap/test_linear_topology.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/test_loopback.c
===================================================================
--- GNUnet/src/applications/fs/gap/test_loopback.c
(rev 0)
+++ GNUnet/src/applications/fs/gap/test_loopback.c 2008-02-06 06:52:39 UTC
(rev 6161)
@@ -0,0 +1,310 @@
+/*
+ This file is part of GNUnet.
+ (C) 2005, 2006 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/gap/gaptest.c
+ * @brief GAP routing testcase
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "gnunet_ecrs_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_stats_lib.h"
+#include "gnunet_util.h"
+#include "gnunet_stats_lib.h"
+
+static struct GNUNET_GE_Context *ectx;
+
+static struct GNUNET_GC_Configuration *cfg;
+
+static int
+testTerminate (void *unused)
+{
+ return GNUNET_OK;
+}
+
+
+static void
+uprogress (unsigned long long totalBytes,
+ unsigned long long completedBytes, GNUNET_CronTime eta,
+ void *closure)
+{
+ fprintf (stderr, totalBytes == completedBytes ? "\n" : ".");
+}
+
+static void
+dprogress (unsigned long long totalBytes,
+ unsigned long long completedBytes,
+ GNUNET_CronTime eta,
+ unsigned long long lastBlockOffset,
+ const char *lastBlock, unsigned int lastBlockSize, void *closure)
+{
+ fprintf (stderr, totalBytes == completedBytes ? "\n" : ".");
+}
+
+
+static char *
+makeName (unsigned int i)
+{
+ char *fn;
+
+ fn = GNUNET_malloc (strlen ("/tmp/gnunet-gaptest/GAPTEST") + 14);
+ GNUNET_snprintf (fn,
+ strlen ("/tmp/gnunet-gaptest/GAPTEST") + 14,
+ "/tmp/gnunet-gaptest/GAPTEST%u", i);
+ GNUNET_disk_directory_create_for_file (NULL, fn);
+ return fn;
+}
+
+static struct GNUNET_ECRS_URI *
+uploadFile (unsigned int size)
+{
+ int ret;
+ char *name;
+ int fd;
+ char *buf;
+ struct GNUNET_ECRS_URI *uri;
+ int i;
+
+ name = makeName (size);
+ fd =
+ GNUNET_disk_file_open (ectx, name, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR);
+ buf = GNUNET_malloc (size);
+ memset (buf, size + size / 253, size);
+ for (i = 0; i < (int) (size - 42 - sizeof (GNUNET_HashCode));
+ i += sizeof (GNUNET_HashCode))
+ GNUNET_hash (&buf[i + sizeof (GNUNET_HashCode)], 42,
+ (GNUNET_HashCode *) & buf[i]);
+ WRITE (fd, buf, size);
+ GNUNET_free (buf);
+ GNUNET_disk_file_close (ectx, name, fd);
+ ret = GNUNET_ECRS_file_upload (ectx, cfg, name, GNUNET_YES, /* index */
+ 0, /* anon */
+ 0, /* prio */
+ GNUNET_get_time () + 10 *
GNUNET_CRON_MINUTES, /* expire */
+ &uprogress, NULL, &testTerminate, NULL,
+ &uri);
+ if (ret != GNUNET_SYSERR)
+ {
+ struct GNUNET_ECRS_MetaData *meta;
+ struct GNUNET_ECRS_URI *key;
+ const char *keywords[2];
+
+ keywords[0] = name;
+ keywords[1] = NULL;
+
+ meta = GNUNET_ECRS_meta_data_create ();
+ key = GNUNET_ECRS_keyword_strings_to_uri (keywords);
+ ret = GNUNET_ECRS_publish_under_keyword (ectx, cfg, key, 0, 0,
GNUNET_get_time () + 10 * GNUNET_CRON_MINUTES, /* expire */
+ uri, meta);
+ GNUNET_ECRS_meta_data_destroy (meta);
+ GNUNET_ECRS_uri_destroy (uri);
+ GNUNET_free (name);
+ if (ret == GNUNET_OK)
+ {
+ return key;
+ }
+ else
+ {
+ GNUNET_ECRS_uri_destroy (key);
+ return NULL;
+ }
+ }
+ else
+ {
+ GNUNET_free (name);
+ return NULL;
+ }
+}
+
+static int
+searchCB (const GNUNET_ECRS_FileInfo * fi,
+ const GNUNET_HashCode * key, int isRoot, void *closure)
+{
+ struct GNUNET_ECRS_URI **my = closure;
+ char *tmp;
+
+ tmp = GNUNET_ECRS_uri_to_string (fi->uri);
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Search found URI `%s'\n", tmp);
+ GNUNET_free (tmp);
+ GNUNET_GE_ASSERT (ectx, NULL == *my);
+ *my = GNUNET_ECRS_uri_duplicate (fi->uri);
+ return GNUNET_SYSERR; /* abort search */
+}
+
+/**
+ * @param *uri In: keyword URI, out: file URI
+ * @return GNUNET_OK on success
+ */
+static int
+searchFile (struct GNUNET_ECRS_URI **uri)
+{
+ int ret;
+ struct GNUNET_ECRS_URI *myURI;
+
+ myURI = NULL;
+ ret = GNUNET_ECRS_search (ectx,
+ cfg,
+ *uri,
+ 0,
+ 15 * GNUNET_CRON_SECONDS,
+ &searchCB, &myURI, &testTerminate, NULL);
+ GNUNET_ECRS_uri_destroy (*uri);
+ *uri = myURI;
+ if ((ret != GNUNET_SYSERR) && (myURI != NULL))
+ return GNUNET_OK;
+ else
+ return GNUNET_SYSERR;
+}
+
+static int
+downloadFile (unsigned int size, const struct GNUNET_ECRS_URI *uri)
+{
+ int ret;
+ char *tmpName;
+ int fd;
+ char *buf;
+ char *in;
+ int i;
+ char *tmp;
+
+ tmp = GNUNET_ECRS_uri_to_string (uri);
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Starting download of `%s'\n", tmp);
+ GNUNET_free (tmp);
+ tmpName = makeName (0);
+ ret = GNUNET_SYSERR;
+ if (GNUNET_OK == GNUNET_ECRS_file_download (ectx,
+ cfg,
+ uri,
+ tmpName,
+ 0, &dprogress, NULL,
+ &testTerminate, NULL))
+ {
+
+ fd = GNUNET_disk_file_open (ectx, tmpName, O_RDONLY);
+ buf = GNUNET_malloc (size);
+ in = GNUNET_malloc (size);
+ memset (buf, size + size / 253, size);
+ for (i = 0; i < (int) (size - 42 - sizeof (GNUNET_HashCode));
+ i += sizeof (GNUNET_HashCode))
+ GNUNET_hash (&buf[i + sizeof (GNUNET_HashCode)], 42,
+ (GNUNET_HashCode *) & buf[i]);
+ if (size != READ (fd, in, size))
+ ret = GNUNET_SYSERR;
+ else if (0 == memcmp (buf, in, size))
+ ret = GNUNET_OK;
+ GNUNET_free (buf);
+ GNUNET_free (in);
+ GNUNET_disk_file_close (ectx, tmpName, fd);
+ }
+ UNLINK (tmpName);
+ GNUNET_free (tmpName);
+ return ret;
+}
+
+static int
+unindexFile (unsigned int size)
+{
+ int ret;
+ char *name;
+
+ name = makeName (size);
+ ret =
+ GNUNET_ECRS_file_uninde (ectx, cfg, name, NULL, NULL, &testTerminate,
+ NULL);
+ if (0 != UNLINK (name))
+ ret = GNUNET_SYSERR;
+ GNUNET_free (name);
+ return ret;
+}
+
+#define CHECK(a) if (!(a)) { ret = 1; GNUNET_GE_BREAK(ectx, 0); goto FAILURE; }
+
+#define START_PEERS 1
+
+/**
+ * Testcase to test gap routing (2 peers only).
+ * @return 0: ok, -1: error
+ */
+int
+main (int argc, char **argv)
+{
+ struct GNUNET_TESTING_DaemonContext *peers;
+ int ret;
+ struct GNUNET_ECRS_URI *uri;
+
+ ret = 0;
+ cfg = GNUNET_GC_create ();
+ if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+ {
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#if START_PEERS
+ peers = GNUNET_TESTING_start_daemons ("tcp",
+ "advertising topology fs stats",
+ "/tmp/gnunet-gap-test",
+ 2087, 10000, 2);
+ if (peers == NULL)
+ {
+ fprintf (stderr, "Failed to start the gnunetd daemons!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#endif
+ if (GNUNET_OK != GNUNET_TESTING_connect_daemons (2087, 12087))
+ {
+ GNUNET_TESTING_stop_daemons (peers);
+ fprintf (stderr, "Failed to connect the peers!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+
+ uri = uploadFile (12345);
+ CHECK (NULL != uri);
+ GNUNET_GC_set_configuration_value_string (cfg,
+ ectx,
+ "NETWORK", "HOST",
+ "localhost:12087");
+ CHECK (GNUNET_OK == searchFile (&uri));
+ CHECK (GNUNET_OK == downloadFile (12345, uri));
+ GNUNET_ECRS_uri_destroy (uri);
+ GNUNET_GC_set_configuration_value_string (cfg,
+ ectx,
+ "NETWORK", "HOST",
+ "localhost:2087");
+ CHECK (GNUNET_OK == unindexFile (12345));
+
+FAILURE:
+#if START_PEERS
+ GNUNET_TESTING_stop_daemons (peers);
+#endif
+
+ GNUNET_GC_free (cfg);
+ return ret;
+}
+
+/* end of gaptest.c */
Property changes on: GNUnet/src/applications/fs/gap/test_loopback.c
___________________________________________________________________
Name: svn:eol-style
+ native
Added: GNUnet/src/applications/fs/gap/test_star_topology.c
===================================================================
--- GNUnet/src/applications/fs/gap/test_star_topology.c
(rev 0)
+++ GNUnet/src/applications/fs/gap/test_star_topology.c 2008-02-06 06:52:39 UTC
(rev 6161)
@@ -0,0 +1,293 @@
+/*
+ This file is part of GNUnet.
+ (C) 2005, 2006, 2007 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file applications/gap/gaptest3.c
+ * @brief GAP economy testcase, download from star topology
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_protocols.h"
+#include "gnunet_ecrs_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_identity_lib.h"
+#include "gnunet_stats_lib.h"
+#include "gnunet_util.h"
+#include "gnunet_stats_lib.h"
+
+#define PEER_COUNT 10
+
+#define START_PEERS 1
+
+#define SIZE 1024*1024*2
+
+static struct GNUNET_GE_Context *ectx;
+
+static struct GNUNET_GC_Configuration *cfg;
+
+static int
+testTerminate (void *unused)
+{
+ return GNUNET_OK;
+}
+
+static void
+uprogress (unsigned long long totalBytes,
+ unsigned long long completedBytes, GNUNET_CronTime eta,
+ void *closure)
+{
+ fprintf (stderr, totalBytes == completedBytes ? "\n" : ".");
+}
+
+static void
+dprogress (unsigned long long totalBytes,
+ unsigned long long completedBytes,
+ GNUNET_CronTime eta,
+ unsigned long long lastBlockOffset,
+ const char *lastBlock, unsigned int lastBlockSize, void *closure)
+{
+ fprintf (stderr, totalBytes == completedBytes ? "\n" : ".");
+}
+
+static char *
+makeName (unsigned int i)
+{
+ char *fn;
+
+ fn = GNUNET_malloc (strlen ("/tmp/gnunet-gaptest/GAPTEST") + 14);
+ GNUNET_snprintf (fn,
+ strlen ("/tmp/gnunet-gaptest/GAPTEST") + 14,
+ "/tmp/gnunet-gaptest/GAPTEST%u", i);
+ GNUNET_disk_directory_create_for_file (NULL, fn);
+ return fn;
+}
+
+static struct GNUNET_ECRS_URI *
+uploadFile (unsigned int size)
+{
+ int ret;
+ char *name;
+ int fd;
+ char *buf;
+ struct GNUNET_ECRS_URI *uri;
+ int i;
+
+ name = makeName (size);
+ fd =
+ GNUNET_disk_file_open (ectx, name, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR);
+ buf = GNUNET_malloc_large (size);
+ memset (buf, size + size / 253, size);
+ for (i = 0; i < (int) (size - 42 - sizeof (GNUNET_HashCode));
+ i += sizeof (GNUNET_HashCode))
+ GNUNET_hash (&buf[i + sizeof (GNUNET_HashCode)], 42,
+ (GNUNET_HashCode *) & buf[i]);
+ WRITE (fd, buf, size);
+ GNUNET_free (buf);
+ GNUNET_disk_file_close (ectx, name, fd);
+ ret = GNUNET_ECRS_file_upload (ectx, cfg, name, GNUNET_YES, /* index */
+ 1, /* anon */
+ 0, /* prio */
+ GNUNET_get_time () + 100 *
GNUNET_CRON_MINUTES, /* expire */
+ &uprogress, NULL, &testTerminate, NULL,
+ &uri);
+ GNUNET_free (name);
+ if (ret != GNUNET_SYSERR)
+ return uri;
+ return NULL;
+}
+
+static int
+downloadFile (unsigned int size, const struct GNUNET_ECRS_URI *uri)
+{
+ int ret;
+ char *tmpName;
+ int fd;
+ char *buf;
+ char *in;
+ int i;
+ char *tmp;
+
+ tmp = GNUNET_ECRS_uri_to_string (uri);
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Starting download of `%s'\n", tmp);
+ GNUNET_free (tmp);
+ tmpName = makeName (0);
+ ret = GNUNET_SYSERR;
+ if (GNUNET_OK == GNUNET_ECRS_file_download (ectx,
+ cfg,
+ uri,
+ tmpName,
+ 1, &dprogress, NULL,
+ &testTerminate, NULL))
+ {
+
+ fd = GNUNET_disk_file_open (ectx, tmpName, O_RDONLY);
+ buf = GNUNET_malloc (size);
+ in = GNUNET_malloc (size);
+ memset (buf, size + size / 253, size);
+ for (i = 0; i < (int) (size - 42 - sizeof (GNUNET_HashCode));
+ i += sizeof (GNUNET_HashCode))
+ GNUNET_hash (&buf[i + sizeof (GNUNET_HashCode)], 42,
+ (GNUNET_HashCode *) & buf[i]);
+ if (size != READ (fd, in, size))
+ ret = GNUNET_SYSERR;
+ else if (0 == memcmp (buf, in, size))
+ ret = GNUNET_OK;
+ GNUNET_free (buf);
+ GNUNET_free (in);
+ GNUNET_disk_file_close (ectx, tmpName, fd);
+ }
+ UNLINK (tmpName);
+ GNUNET_free (tmpName);
+ return ret;
+}
+
+#define CHECK(a) if (!(a)) { ret = 1; GNUNET_GE_BREAK(ectx, 0); goto FAILURE; }
+
+static GNUNET_PeerIdentity goodPeers[PEER_COUNT];
+static unsigned int goodPeerPos;
+
+static int
+infoCallback (void *data,
+ const GNUNET_PeerIdentity * identity,
+ const void *address,
+ unsigned int addr_len,
+ GNUNET_CronTime last_seen, unsigned int trust,
+ unsigned int bpmFromPeer)
+{
+ int i;
+ int good;
+ GNUNET_EncName enc;
+
+ good = 0;
+ for (i = 0; i < goodPeerPos; i++)
+ if (0 == memcmp (&goodPeers[i], identity, sizeof (GNUNET_PeerIdentity)))
+ good = 1;
+ GNUNET_hash_to_enc (&identity->hashPubKey, &enc);
+ if (good)
+ printf ("Good peer `%8s' has trust %u and bandwidth %u\n",
+ (const char *) &enc, trust, bpmFromPeer);
+ else
+ printf ("Poor peer `%8s' has trust %u and bandwidth %u\n",
+ (const char *) &enc, trust, bpmFromPeer);
+ return GNUNET_OK;
+}
+
+/**
+ * Testcase to test gap routing (2 peers only).
+ * @return 0: ok, -1: error
+ */
+int
+main (int argc, char **argv)
+{
+ struct GNUNET_TESTING_DaemonContext *peers;
+ int ret;
+ struct GNUNET_ECRS_URI *uri;
+ int i;
+ char buf[128];
+ GNUNET_MessageHello *hello;
+ struct GNUNET_ClientServerConnection *sock;
+ GNUNET_CronTime start;
+ GNUNET_EncName enc;
+
+ ret = 0;
+ cfg = GNUNET_GC_create ();
+ if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+ {
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#if START_PEERS
+ peers = GNUNET_TESTING_start_daemons ("tcp",
+ "advertising topology fs stats",
+ "/tmp/gnunet-gap-test3",
+ 2087, 10, PEER_COUNT);
+ if (peers == NULL)
+ {
+ fprintf (stderr, "Failed to start the gnunetd daemons!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+#endif
+ /* connect as star-topology */
+ for (i = 1; i < PEER_COUNT; i++)
+ {
+ if (GNUNET_OK != GNUNET_TESTING_connect_daemons (2087, 2087 + 10 * i))
+ {
+ GNUNET_TESTING_stop_daemons (peers);
+ fprintf (stderr, "Failed to connect the peers!\n");
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+ }
+
+ uri = NULL;
+ goodPeerPos = 0;
+ for (i = 1; i < PEER_COUNT; i += 2)
+ {
+ GNUNET_snprintf (buf, 128, "localhost:%u", 2087 + i * 10);
+ GNUNET_GC_set_configuration_value_string (cfg, ectx, "NETWORK", "HOST",
+ buf);
+ sock = GNUNET_client_connection_create (NULL, cfg);
+ if (GNUNET_OK != GNUNET_IDENTITY_get_self (sock, &hello))
+ {
+ GNUNET_client_connection_destroy (sock);
+ GNUNET_GE_BREAK (NULL, 0);
+ break;
+ }
+ GNUNET_client_connection_destroy (sock);
+ if (uri != NULL)
+ GNUNET_ECRS_uri_destroy (uri);
+ GNUNET_hash_to_enc (&hello->senderIdentity.hashPubKey, &enc);
+ printf ("Uploading to peer `%8s'\n", (const char *) &enc);
+ uri = uploadFile (SIZE);
+ CHECK (NULL != uri);
+
+ goodPeers[goodPeerPos++] = hello->senderIdentity;
+ GNUNET_free (hello);
+
+ }
+ GNUNET_GC_set_configuration_value_string (cfg,
+ ectx,
+ "NETWORK", "HOST",
+ "localhost:2087");
+ printf ("Downloading...\n");
+ start = GNUNET_get_time ();
+ CHECK (GNUNET_OK == downloadFile (SIZE, uri));
+ printf ("Download complete - %f kbps.\n",
+ SIZE / 1024 * 1.0 * GNUNET_CRON_SECONDS / (1 + GNUNET_get_time () -
+ start));
+ /* verify trust values have developed as expected */
+
+ sock = GNUNET_client_connection_create (NULL, cfg);
+ GNUNET_IDENTITY_request_peer_infos (sock, &infoCallback, NULL);
+ GNUNET_client_connection_destroy (sock);
+
+FAILURE:
+#if START_PEERS
+ GNUNET_TESTING_stop_daemons (peers);
+#endif
+ GNUNET_GC_free (cfg);
+ return ret;
+}
+
+/* end of gaptest3.c */
Property changes on: GNUnet/src/applications/fs/gap/test_star_topology.c
___________________________________________________________________
Name: svn:eol-style
+ native
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r6161 - in GNUnet/src/applications/fs: . gap,
gnunet <=