[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated (72bc94764 -> 83dda01ce)
From: |
gnunet |
Subject: |
[gnunet] branch master updated (72bc94764 -> 83dda01ce) |
Date: |
Fri, 23 Feb 2024 15:42:48 +0100 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a change to branch master
in repository gnunet.
from 72bc94764 update submodules
new d8a247e34 PEERSTORE: Align API with NAMESTORE iterator and monitor.
new 83dda01ce PEERSTORE: Deprecate and remove flat plugin
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
src/include/gnunet_peerstore_plugin.h | 18 +-
src/include/gnunet_peerstore_service.h | 185 ++--
src/include/gnunet_protocols.h | 29 +-
src/plugin/peerstore/Makefile.am | 25 +-
src/plugin/peerstore/plugin_peerstore_flat.c | 606 -----------
src/plugin/peerstore/plugin_peerstore_sqlite.c | 183 +++-
.../peerstore/test_plugin_peerstore_flat.conf | 5 -
src/service/cadet/gnunet-service-cadet_hello.c | 57 +-
src/service/dhtu/plugin_dhtu_gnunet.c | 63 +-
src/service/fs/gnunet-service-fs_cp.c | 27 +-
.../hostlist/gnunet-daemon-hostlist_server.c | 53 +-
src/service/namestore/gnunet-service-namestore.c | 17 +-
src/service/peerstore/gnunet-service-peerstore.c | 1048 +++++++++++++++-----
src/service/peerstore/meson.build | 3 +-
src/service/peerstore/peerstore.h | 174 +++-
src/service/peerstore/peerstore_api.c | 671 +++----------
src/service/peerstore/peerstore_api_monitor.c | 297 ++++++
src/service/peerstore/peerstore_common.c | 20 +-
src/service/peerstore/peerstore_common.h | 2 +-
src/service/peerstore/perf_peerstore_store.c | 28 +-
src/service/peerstore/test_peerstore_api_iterate.c | 43 +-
src/service/peerstore/test_peerstore_api_store.c | 39 +-
src/service/peerstore/test_peerstore_api_watch.c | 47 +-
src/service/topology/gnunet-daemon-topology.c | 63 +-
src/service/transport/gnunet-communicator-tcp.c | 54 +-
src/service/transport/gnunet-service-transport.c | 219 ++--
src/service/transport/transport-testing2.c | 31 +-
.../transport/transport_api_cmd_start_peer.c | 22 +-
.../transport/transport_api_cmd_stop_peer.c | 2 +-
29 files changed, 2145 insertions(+), 1886 deletions(-)
delete mode 100644 src/plugin/peerstore/plugin_peerstore_flat.c
delete mode 100644 src/plugin/peerstore/test_plugin_peerstore_flat.conf
create mode 100644 src/service/peerstore/peerstore_api_monitor.c
diff --git a/src/include/gnunet_peerstore_plugin.h
b/src/include/gnunet_peerstore_plugin.h
index 2636c3009..6ecb9135a 100644
--- a/src/include/gnunet_peerstore_plugin.h
+++ b/src/include/gnunet_peerstore_plugin.h
@@ -46,6 +46,19 @@ extern "C"
#endif
#endif
+/**
+ * Function called by PEERSTORE for each matching record.
+ *
+ * @param cls closure
+ * @param seq sequence in interation
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
+ */
+typedef void (*GNUNET_PEERSTORE_PluginProcessor) (
+ void *cls,
+ uint64_t seq,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg);
/**
* @brief struct returned by the initialization function of the plugin
@@ -93,6 +106,7 @@ struct GNUNET_PEERSTORE_PluginFunctions
* @param sub_system name of sub system
* @param peer Peer identity (can be NULL)
* @param key entry key string (can be NULL)
+ * @param limit max number of results to give
* @param iter function to call asynchronously with the results, terminated
* by a NULL result
* @param iter_cls closure for @a iter
@@ -104,7 +118,9 @@ struct GNUNET_PEERSTORE_PluginFunctions
const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
- GNUNET_PEERSTORE_Processor iter,
+ uint64_t serial,
+ uint64_t limit,
+ GNUNET_PEERSTORE_PluginProcessor iter,
void *iter_cls);
/**
diff --git a/src/include/gnunet_peerstore_service.h
b/src/include/gnunet_peerstore_service.h
index 966abb7a6..0284d46dd 100644
--- a/src/include/gnunet_peerstore_service.h
+++ b/src/include/gnunet_peerstore_service.h
@@ -66,32 +66,32 @@ extern "C" {
* messages.
*/
#define GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME \
- "transport-backchannel-monotonic-time"
+ "transport-backchannel-monotonic-time"
/**
* Key used to store sender's monotonic time from DV learn
* messages.
*/
#define GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME \
- "transport-dv-learn-monotonic-time"
+ "transport-dv-learn-monotonic-time"
/**
* Key used to store sender's monotonic time from handshake message.
*/
#define GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE \
- "transport-tcp-communicator-handshake"
+ "transport-tcp-communicator-handshake"
/**
* Key used to store sender's monotonic time from handshake ack message.
*/
#define GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE_ACK \
- "transport-tcp-communicator-handshake-ack"
+ "transport-tcp-communicator-handshake-ack"
/**
* Key used to store sender's monotonic time from rekey message.
*/
#define GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_REKEY \
- "transport-tcp-communicator-rekey"
+ "transport-tcp-communicator-rekey"
/**
@@ -108,7 +108,12 @@ enum GNUNET_PEERSTORE_StoreOption
* Delete any previous values for the given key before
* storing the given value.
*/
- GNUNET_PEERSTORE_STOREOPTION_REPLACE = 1
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE = 1,
+
+ /**
+ * Upserts values. Replaces if expiry is later than existing.
+ */
+ GNUNET_PEERSTORE_STOREOPTION_UPSERT_LATER_EXPIRY = 2
};
/**
@@ -161,11 +166,6 @@ struct GNUNET_PEERSTORE_Record
*/
struct GNUNET_TIME_Absolute expiry;
- /**
- * Client from which this record originated.
- * NOTE: This is internal to the service.
- */
- struct GNUNET_SERVICE_Client *client;
};
@@ -208,16 +208,6 @@ struct GNUNET_PEERSTORE_StoreHelloContext
*/
void *cont_cls;
- /**
- * Map with all store contexts started during adding hello.
- */
- struct GNUNET_CONTAINER_MultiPeerMap *store_context_map;
-
- /**
- * Active watch to be notified about conflicting hello uri add requests.
- */
- struct GNUNET_PEERSTORE_WatchContext *wc;
-
/**
* Hello uri which was request for storing.
*/
@@ -229,9 +219,10 @@ struct GNUNET_PEERSTORE_StoreHelloContext
struct GNUNET_PeerIdentity *pid;
/**
- * Was this request successful.
+ * The iteration for the merge
*/
- int success;
+ struct GNUNET_PEERSTORE_StoreContext *sc;
+
};
/**
@@ -249,6 +240,7 @@ struct GNUNET_PEERSTORE_StoreHelloContextClosure
* Function called by PEERSTORE for each matching record.
*
* @param cls closure
+ * @param seq sequence in interation
* @param record peerstore record information
* @param emsg error message, or NULL if no errors
*/
@@ -269,38 +261,6 @@ typedef void (*GNUNET_PEERSTORE_hello_notify_cb) (
const struct GNUNET_MessageHeader *hello,
const char *err_msg);
-/**
- * Call a method whenever our known information about peers
- * changes. Initially calls the given function for all known
- * peers and then only signals changes.
- *
- * If @a include_friend_only is set to #GNUNET_YES peerinfo will include HELLO
- * messages which are intended for friend to friend mode and which do not
- * have to be gossiped. Otherwise these messages are skipped. //FIXME Not
implemented atm!
- *
- * @param h Handle to the PEERSTORE service
- * @param include_friend_only include HELLO messages for friends only (not
used at the moment)
- * @param callback the method to call for getting the hello.
- * @param callback_cls closure for @a callback
- * @return NULL on error
- */
-struct GNUNET_PEERSTORE_NotifyContext *
-GNUNET_PEERSTORE_hello_changed_notify (struct GNUNET_PEERSTORE_Handle *h,
- int include_friend_only,
- GNUNET_PEERSTORE_hello_notify_cb
callback,
- void *callback_cls);
-
-
-/**
- * Stop notifying about changes.
- *
- * @param nc context to stop notifying
- */
-void
-GNUNET_PEERSTORE_hello_changed_notify_cancel (struct
- GNUNET_PEERSTORE_NotifyContext
*nc);
-
-
/**
* Add hello to peerstore.
*
@@ -385,65 +345,130 @@ GNUNET_PEERSTORE_store_cancel (struct
GNUNET_PEERSTORE_StoreContext *sc);
/**
- * Iterate over records matching supplied key information
+ * Iterate over peerstore entries.
+ * The iteration can be filtered to contain only records
+ * matching @a peer and/or @a key.
+ * The @a sub_system to match must be provided.
+ * @a callback will be called with (each) matching record.
+ * #GNUNET_PEERSTORE_iteration_next() must be invoked
+ * to continue processing until the end of the iteration is
+ * reached.
*
* @param h handle to the PEERSTORE service
* @param sub_system name of sub system
* @param peer Peer identity (can be NULL)
* @param key entry key string (can be NULL)
- * @param callback function called with each matching record, all NULL's on end
+ * @param callback function called with each matching record. The record will
be NULL to indicate end.
* @param callback_cls closure for @a callback
* @return Handle to iteration request
*/
struct GNUNET_PEERSTORE_IterateContext *
-GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls);
+GNUNET_PEERSTORE_iteration_start (struct GNUNET_PEERSTORE_Handle *h,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls);
/**
- * Cancel an iterate request
- * Please do not call after the iterate request is done
+ * Continue an iteration.
+ * Do NOT call after the iterate request is done.
*
- * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
+ * @param ic Iterate request context as returned by
#GNUNET_PEERSTORE_iteration_start()
+ * @param limit how many records to return max until
#GNUNET_PEERSTORE_iterate_next() needs to be called again.
*/
void
-GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic);
+GNUNET_PEERSTORE_iteration_next (struct GNUNET_PEERSTORE_IterateContext *ic,
+ uint64_t limit);
+
+/**
+ * Cancel an iteration.
+ * Do NOT call after the iterate request is done
+ *
+ * @param ic Iterate request context as returned by
#GNUNET_PEERSTORE_iteration_start()
+ */
+void
+GNUNET_PEERSTORE_iteration_stop (struct GNUNET_PEERSTORE_IterateContext *ic);
/**
* Request watching a given key
- * User will be notified with any new values added to key,
- * all existing entries are supplied beforehand.
+ * The monitoring can be filtered to contain only records
+ * matching @a peer and/or @a key.
+ * The @a sub_system to match must be provided.
+ * @a callback will be called with (each) matching new record.
+ * #GNUNET_PEERSTORE_monitor_next() must be invoked
+ * to continue processing until @a sync_cb is
+ * called, indicating that the caller should be up-to-date.
+ * The caller will be notified with any new values added to key
+ * through @a callback.
+ * If @a iterate_first is set to GNUNET_YES, the monitor will first
+ * iterate over all existing, matching records. In any case,
+ * after @a sync_cb is called the first time monitoring starts.
*
* @param h handle to the PEERSTORE service
+ * @param iterate_first first iterated of all results if GNUNET_YES
* @param sub_system name of sub system
* @param peer Peer identity
* @param key entry key string
+ * @param error_cb function to call on error (i.e. disconnect); note that
+ * unlike the other error callbacks in this API, a call to this
+ * function does NOT destroy the monitor handle, it merely signals
+ * that monitoring is down. You need to still explicitly call
+ * #GNUNET_PEERSTORE_monitor_stop().
+ * @param error_cb_cls closure for @a error_cb
+ * @param sync_cb function called when we're in sync with the peerstore
+ * @param sync_cb_cls closure for @a sync_cb
* @param callback function called with each new value
* @param callback_cls closure for @a callback
* @return Handle to watch request
*/
-struct GNUNET_PEERSTORE_WatchContext *
-GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls);
-
+struct GNUNET_PEERSTORE_Monitor *
+GNUNET_PEERSTORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ int iterate_first,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_SCHEDULER_TaskCallback error_cb,
+ void *error_cb_cls,
+ GNUNET_SCHEDULER_TaskCallback sync_cb,
+ void *sync_cb_cls,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls);
/**
- * Cancel a watch request
+ * Calls the monitor processor specified in #GNUNET_PEERSTORE_monitor_start
+ * for the next record(s). This function is used to allow clients that merely
+ * monitor the NAMESTORE to still throttle namestore operations, so we can be
+ * sure that the monitors can keep up.
*
- * @param wc handle to the watch request
+ * Note that #GNUNET_PEERSTORE_store() only waits for this
+ * call if the previous limit set by the client was already reached.
+ * Thus, by using a @a limit greater than 1, monitors basically enable
+ * a queue of notifications to be processed asynchronously with some
+ * delay. Note that even with a limit of 1 the
+ * #GNUNET_PEERSTORE_store() function will run asynchronously
+ * and the continuation may be invoked before the monitors completed
+ * (or even started) processing the notification. Thus, monitors will
+ * only closely track the current state of the peerstore, but not
+ * be involved in the transactions.
+ *
+ * @param zm the monitor
+ * @param limit number of records to return to the iterator in one shot
+ * (before #GNUNET_PEERSTORE_monitor_next is to be called again)
*/
void
-GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc);
+GNUNET_PEERSTORE_monitor_next (struct GNUNET_PEERSTORE_Monitor *zm,
+ uint64_t limit);
+/**
+ * Stop monitoring.
+ *
+ * @param zm handle to the monitor activity to stop
+ */
+void
+GNUNET_PEERSTORE_monitor_stop (struct GNUNET_PEERSTORE_Monitor *zm);
#if 0 /* keep Emacsens' auto-indent happy */
{
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 42524d490..8638703db 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2599,14 +2599,14 @@ extern "C" {
#define GNUNET_MESSAGE_TYPE_PEERSTORE_STORE 820
/**
- * Iteration request
+ * Iteration request (see also 828, 829)
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE 821
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_START 821
/**
- * Iteration record message
+ * Record result message
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD 822
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD 822
/**
* Iteration end message
@@ -2614,25 +2614,34 @@ extern "C" {
#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END 823
/**
- * Watch request
+ * Monitor request
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH 824
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_START 824
/**
- * Watch response
+ * Monitor sync
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD 825
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_SYNC 825
/**
- * Watch cancel request
+ * Monitor next request
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL 826
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT 826
/**
* Store result message
*/
#define GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT 827
+/**
+ * Iteration request (see also 821, 829)
+ */
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_NEXT 828
+
+/**
+ * Iteration request (see also 821, 828)
+ */
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_STOP 829
/*******************************************************************************
* SOCIAL message types
diff --git a/src/plugin/peerstore/Makefile.am b/src/plugin/peerstore/Makefile.am
index 225e9a71e..7202aef84 100644
--- a/src/plugin/peerstore/Makefile.am
+++ b/src/plugin/peerstore/Makefile.am
@@ -11,18 +11,6 @@ if USE_COVERAGE
AM_CFLAGS = -fprofile-arcs -ftest-coverage
endif
-if HAVE_EXPERIMENTAL
-FLAT_PLUGIN = libgnunet_plugin_peerstore_flat.la
-FLAT_TESTS = test_plugin_peerstore_flat
-libgnunet_plugin_peerstore_flat_la_SOURCES = \
- plugin_peerstore_flat.c
-libgnunet_plugin_peerstore_flat_la_LIBADD = \
- $(top_builddir)/src/lib/util/libgnunetutil.la $(XLIBS) \
- $(LTLIBINTL)
-libgnunet_plugin_peerstore_flat_la_LDFLAGS = \
- $(GN_PLUGIN_LDFLAGS)
-endif
-
if HAVE_SQLITE
SQLITE_PLUGIN = libgnunet_plugin_peerstore_sqlite.la
SQLITE_TESTS = test_plugin_peerstore_sqlite
@@ -38,8 +26,7 @@ libgnunet_plugin_peerstore_sqlite_la_LDFLAGS = \
endif
plugin_LTLIBRARIES = \
- $(SQLITE_PLUGIN) \
- $(FLAT_PLUGIN)
+ $(SQLITE_PLUGIN)
test_plugin_peerstore_sqlite_SOURCES = \
test_plugin_peerstore.c
@@ -47,18 +34,10 @@ test_plugin_peerstore_sqlite_LDADD = \
$(top_builddir)/src/service/testing/libgnunettesting.la \
$(top_builddir)/src/lib/util/libgnunetutil.la
-test_plugin_peerstore_flat_SOURCES = \
- test_plugin_peerstore.c
-test_plugin_peerstore_flat_LDADD = \
- $(top_builddir)/src/service/testing/libgnunettesting.la \
- $(top_builddir)/src/lib/util/libgnunetutil.la
-
check_PROGRAMS = \
- $(SQLITE_TESTS) \
- $(FLAT_TESTS)
+ $(SQLITE_TESTS)
EXTRA_DIST = \
- test_plugin_peerstore_flat.conf \
test_plugin_peerstore_sqlite.conf
if ENABLE_TEST_RUN
diff --git a/src/plugin/peerstore/plugin_peerstore_flat.c
b/src/plugin/peerstore/plugin_peerstore_flat.c
deleted file mode 100644
index cc304d4db..000000000
--- a/src/plugin/peerstore/plugin_peerstore_flat.c
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * This file is part of GNUnet
- * Copyright (C) 2015 Christian Grothoff (and other contributing authors)
- *
- * GNUnet is free software: you can redistribute it and/or modify it
- * under the terms of the GNU Affero General Public License as published
- * by the Free Software Foundation, either version 3 of the License,
- * or (at your option) any later version.
- *
- * GNUnet is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
-
- SPDX-License-Identifier: AGPL3.0-or-later
- */
-
-/**
- * @file peerstore/plugin_peerstore_flat.c
- * @brief flat file-based peerstore backend
- * @author Martin Schanzenbach
- */
-
-#include "platform.h"
-#include "gnunet_peerstore_plugin.h"
-#include "gnunet_peerstore_service.h"
-#include "../../service/peerstore/peerstore.h"
-
-/**
- * Context for all functions in this plugin.
- */
-struct Plugin
-{
- /**
- * Configuration handle
- */
- const struct GNUNET_CONFIGURATION_Handle *cfg;
-
- /**
- * HashMap
- */
- struct GNUNET_CONTAINER_MultiHashMap *hm;
-
- /**
- * Iterator
- */
- GNUNET_PEERSTORE_Processor iter;
-
- /**
- * Iterator cls
- */
- void *iter_cls;
-
- /**
- * iterator key
- */
- const char *iter_key;
-
- /**
- * Iterator peer
- */
- const struct GNUNET_PeerIdentity *iter_peer;
-
- /**
- * Iterator subsystem
- */
- const char *iter_sub_system;
-
- /**
- * Iterator time
- */
- struct GNUNET_TIME_Absolute iter_now;
-
- /**
- * Deleted entries
- */
- uint64_t deleted_entries;
-
- /**
- * Expired entries
- */
- uint64_t exp_changes;
-
- /**
- * Database filename.
- */
- char *fn;
-
- /**
- * Result found bool
- */
- int iter_result_found;
-};
-
-
-static int
-delete_entries (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct Plugin *plugin = cls;
- struct GNUNET_PEERSTORE_Record *entry = value;
-
- if (0 != strcmp (plugin->iter_key, entry->key))
- return GNUNET_YES;
- if (0 != memcmp (plugin->iter_peer,
- &entry->peer,
- sizeof(struct GNUNET_PeerIdentity)))
- return GNUNET_YES;
- if (0 != strcmp (plugin->iter_sub_system, entry->sub_system))
- return GNUNET_YES;
-
- GNUNET_CONTAINER_multihashmap_remove (plugin->hm, key, value);
- plugin->deleted_entries++;
- return GNUNET_YES;
-}
-
-
-/**
- * Delete records with the given key
- *
- * @param cls closure (internal context for the plugin)
- * @param sub_system name of sub system
- * @param peer Peer identity (can be NULL)
- * @param key entry key string (can be NULL)
- * @return number of deleted records
- */
-static int
-peerstore_flat_delete_records (void *cls, const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key)
-{
- struct Plugin *plugin = cls;
-
- plugin->iter_sub_system = sub_system;
- plugin->iter_peer = peer;
- plugin->iter_key = key;
- plugin->deleted_entries = 0;
-
- GNUNET_CONTAINER_multihashmap_iterate (plugin->hm,
- &delete_entries,
- plugin);
- return plugin->deleted_entries;
-}
-
-
-static int
-expire_entries (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct Plugin *plugin = cls;
- struct GNUNET_PEERSTORE_Record *entry = value;
-
- if (entry->expiry.abs_value_us < plugin->iter_now.abs_value_us)
- {
- GNUNET_CONTAINER_multihashmap_remove (plugin->hm, key, value);
- plugin->exp_changes++;
- }
- return GNUNET_YES;
-}
-
-
-/**
- * Delete expired records (expiry < now)
- *
- * @param cls closure (internal context for the plugin)
- * @param now time to use as reference
- * @param cont continuation called with the number of records expired
- * @param cont_cls continuation closure
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and cont is not
- * called
- */
-static int
-peerstore_flat_expire_records (void *cls, struct GNUNET_TIME_Absolute now,
- GNUNET_PEERSTORE_Continuation cont,
- void *cont_cls)
-{
- struct Plugin *plugin = cls;
-
- plugin->exp_changes = 0;
- plugin->iter_now = now;
-
- GNUNET_CONTAINER_multihashmap_iterate (plugin->hm,
- &expire_entries,
- plugin);
- if (NULL != cont)
- {
- cont (cont_cls, plugin->exp_changes);
- }
- return GNUNET_OK;
-}
-
-
-static int
-iterate_entries (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct Plugin *plugin = cls;
- struct GNUNET_PEERSTORE_Record *entry = value;
-
- if ((NULL != plugin->iter_peer) &&
- (0 != memcmp (plugin->iter_peer,
- &entry->peer,
- sizeof(struct GNUNET_PeerIdentity))))
- {
- return GNUNET_YES;
- }
- if ((NULL != plugin->iter_key) &&
- (0 != strcmp (plugin->iter_key,
- entry->key)))
- {
- return GNUNET_YES;
- }
- if (NULL != plugin->iter)
- plugin->iter (plugin->iter_cls, entry, NULL);
- plugin->iter_result_found = GNUNET_YES;
- return GNUNET_YES;
-}
-
-
-/**
- * Iterate over the records given an optional peer id
- * and/or key.
- *
- * @param cls closure (internal context for the plugin)
- * @param sub_system name of sub system
- * @param peer Peer identity (can be NULL)
- * @param key entry key string (can be NULL)
- * @param iter function to call asynchronously with the results, terminated
- * by a NULL result
- * @param iter_cls closure for @a iter
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and iter is not
- * called
- */
-static int
-peerstore_flat_iterate_records (void *cls, const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor iter,
- void *iter_cls)
-{
- struct Plugin *plugin = cls;
-
- plugin->iter = iter;
- plugin->iter_cls = iter_cls;
- plugin->iter_peer = peer;
- plugin->iter_sub_system = sub_system;
- plugin->iter_key = key;
-
- GNUNET_CONTAINER_multihashmap_iterate (plugin->hm,
- &iterate_entries,
- plugin);
- if (NULL != iter)
- iter (iter_cls, NULL, NULL);
- return GNUNET_OK;
-}
-
-
-/**
- * Store a record in the peerstore.
- * Key is the combination of sub system and peer identity.
- * One key can store multiple values.
- *
- * @param cls closure (internal context for the plugin)
- * @param sub_system name of the GNUnet sub system responsible
- * @param peer peer identity
- * @param key record key string
- * @param value value to be stored
- * @param size size of value to be stored
- * @param expiry absolute time after which the record is (possibly) deleted
- * @param options options related to the store operation
- * @param cont continuation called when record is stored
- * @param cont_cls continuation closure
- * @return #GNUNET_OK on success, else #GNUNET_SYSERR and cont is not called
- */
-static int
-peerstore_flat_store_record (void *cls, const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key, const void *value, size_t size,
- struct GNUNET_TIME_Absolute expiry,
- enum GNUNET_PEERSTORE_StoreOption options,
- GNUNET_PEERSTORE_Continuation cont,
- void *cont_cls)
-{
- struct Plugin *plugin = cls;
- struct GNUNET_HashCode hkey;
- struct GNUNET_PEERSTORE_Record *entry;
- const char *peer_id;
-
-
- entry = GNUNET_new (struct GNUNET_PEERSTORE_Record);
- entry->sub_system = GNUNET_strdup (sub_system);
- entry->key = GNUNET_strdup (key);
- entry->value = GNUNET_malloc (size);
- GNUNET_memcpy (entry->value, value, size);
- entry->value_size = size;
- entry->peer = *peer;
- entry->expiry = expiry;
-
- peer_id = GNUNET_i2s (peer);
- GNUNET_CRYPTO_hash (peer_id,
- strlen (peer_id),
- &hkey);
-
- if (GNUNET_PEERSTORE_STOREOPTION_REPLACE == options)
- {
- peerstore_flat_delete_records (cls, sub_system, peer, key);
- }
-
- GNUNET_CONTAINER_multihashmap_put (plugin->hm,
- &hkey,
- entry,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- if (NULL != cont)
- {
- cont (cont_cls, GNUNET_OK);
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Initialize the database connections and associated
- * data structures (create tables and indices
- * as needed as well).
- *
- * @param plugin the plugin context (state for this module)
- * @return GNUNET_OK on success
- */
-static int
-database_setup (struct Plugin *plugin)
-{
- char *afsdir;
- char *key;
- char *sub_system;
- const char *peer_id;
- char *peer;
- char *value;
- char *expiry;
- struct GNUNET_DISK_FileHandle *fh;
- struct GNUNET_PEERSTORE_Record *entry;
- struct GNUNET_HashCode hkey;
- uint64_t size;
- char *buffer;
- char *line;
-
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "peerstore-flat",
- "FILENAME", &afsdir))
- {
- GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "peerstore-flat",
- "FILENAME");
- return GNUNET_SYSERR;
- }
- if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
- {
- if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
- {
- GNUNET_break (0);
- GNUNET_free (afsdir);
- return GNUNET_SYSERR;
- }
- }
- /* afsdir should be UTF-8-encoded. If it isn't, it's a bug */
- plugin->fn = afsdir;
-
- fh = GNUNET_DISK_file_open (afsdir,
- GNUNET_DISK_OPEN_CREATE
- | GNUNET_DISK_OPEN_READWRITE,
- GNUNET_DISK_PERM_USER_WRITE
- | GNUNET_DISK_PERM_USER_READ);
- if (NULL == fh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _ ("Unable to initialize file: %s.\n"),
- afsdir);
- return GNUNET_SYSERR;
- }
-
- /* Load data from file into hashmap */
- plugin->hm = GNUNET_CONTAINER_multihashmap_create (10,
- GNUNET_NO);
-
- if (GNUNET_SYSERR == GNUNET_DISK_file_size (afsdir,
- &size,
- GNUNET_YES,
- GNUNET_YES))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _ ("Unable to get filesize: %s.\n"),
- afsdir);
- return GNUNET_SYSERR;
- }
-
- buffer = GNUNET_malloc (size + 1);
-
- if (GNUNET_SYSERR == GNUNET_DISK_file_read (fh,
- buffer,
- size))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _ ("Unable to read file: %s.\n"),
- afsdir);
- GNUNET_DISK_file_close (fh);
- GNUNET_free (buffer);
- return GNUNET_SYSERR;
- }
-
- buffer[size] = '\0';
- GNUNET_DISK_file_close (fh);
- if (0 < size)
- {
- line = strtok (buffer, "\n");
- while (line != NULL)
- {
- sub_system = strtok (line, ",");
- if (NULL == sub_system)
- break;
- peer = strtok (NULL, ",");
- if (NULL == peer)
- break;
- key = strtok (NULL, ",");
- if (NULL == key)
- break;
- value = strtok (NULL, ",");
- if (NULL == value)
- break;
- expiry = strtok (NULL, ",");
- if (NULL == expiry)
- break;
- entry = GNUNET_new (struct GNUNET_PEERSTORE_Record);
- entry->sub_system = GNUNET_strdup (sub_system);
- entry->key = GNUNET_strdup (key);
- {
- size_t s;
- char *o;
-
- o = NULL;
- s = GNUNET_STRINGS_base64_decode (peer,
- strlen (peer),
- (void **) &o);
- if (sizeof(struct GNUNET_PeerIdentity) == s)
- GNUNET_memcpy (&entry->peer,
- o,
- s);
- else
- GNUNET_break (0);
- GNUNET_free (o);
- }
- entry->value_size = GNUNET_STRINGS_base64_decode (value,
- strlen (value),
- (void **)
&entry->value);
- if (GNUNET_SYSERR ==
- GNUNET_STRINGS_fancy_time_to_absolute (expiry,
- &entry->expiry))
- {
- GNUNET_free (entry->sub_system);
- GNUNET_free (entry->key);
- GNUNET_free (entry);
- break;
- }
- peer_id = GNUNET_i2s (&entry->peer);
- GNUNET_CRYPTO_hash (peer_id,
- strlen (peer_id),
- &hkey);
-
- GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put
(plugin->hm,
- &hkey,
- entry,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- }
- }
- GNUNET_free (buffer);
- return GNUNET_OK;
-}
-
-
-static int
-store_and_free_entries (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GNUNET_DISK_FileHandle *fh = cls;
- struct GNUNET_PEERSTORE_Record *entry = value;
- char *line;
- char *peer;
- const char *expiry;
- char *val;
-
- GNUNET_STRINGS_base64_encode (entry->value,
- entry->value_size,
- &val);
- expiry = GNUNET_STRINGS_absolute_time_to_string (entry->expiry);
- GNUNET_STRINGS_base64_encode ((char *) &entry->peer,
- sizeof(struct GNUNET_PeerIdentity),
- &peer);
- GNUNET_asprintf (&line,
- "%s,%s,%s,%s,%s",
- entry->sub_system,
- peer,
- entry->key,
- val,
- expiry);
- GNUNET_free (val);
- GNUNET_free (peer);
- GNUNET_DISK_file_write (fh,
- line,
- strlen (line));
- GNUNET_free (entry->sub_system);
- GNUNET_free (entry->key);
- GNUNET_free (entry->value);
- GNUNET_free (entry);
- GNUNET_free (line);
- return GNUNET_YES;
-}
-
-
-/**
- * Shutdown database connection and associate data
- * structures.
- * @param plugin the plugin context (state for this module)
- */
-static void
-database_shutdown (struct Plugin *plugin)
-{
- struct GNUNET_DISK_FileHandle *fh;
-
- fh = GNUNET_DISK_file_open (plugin->fn,
- GNUNET_DISK_OPEN_CREATE
- | GNUNET_DISK_OPEN_TRUNCATE
- | GNUNET_DISK_OPEN_READWRITE,
- GNUNET_DISK_PERM_USER_WRITE
- | GNUNET_DISK_PERM_USER_READ);
- if (NULL == fh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _ ("Unable to initialize file: %s.\n"),
- plugin->fn);
- return;
- }
- GNUNET_CONTAINER_multihashmap_iterate (plugin->hm,
- &store_and_free_entries,
- fh);
- GNUNET_CONTAINER_multihashmap_destroy (plugin->hm);
- GNUNET_DISK_file_close (fh);
-}
-
-
-/**
- * Entry point for the plugin.
- *
- * @param cls The struct GNUNET_CONFIGURATION_Handle.
- * @return NULL on error, otherwise the plugin context
- */
-void *
-libgnunet_plugin_peerstore_flat_init (void *cls)
-{
- static struct Plugin plugin;
- const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
- struct GNUNET_PEERSTORE_PluginFunctions *api;
-
- if (NULL != plugin.cfg)
- return NULL; /* can only initialize once! */
- memset (&plugin, 0, sizeof(struct Plugin));
- plugin.cfg = cfg;
- if (GNUNET_OK != database_setup (&plugin))
- {
- database_shutdown (&plugin);
- return NULL;
- }
- api = GNUNET_new (struct GNUNET_PEERSTORE_PluginFunctions);
- api->cls = &plugin;
- api->store_record = &peerstore_flat_store_record;
- api->iterate_records = &peerstore_flat_iterate_records;
- api->expire_records = &peerstore_flat_expire_records;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Flat plugin is running\n");
- return api;
-}
-
-
-/**
- * Exit point from the plugin.
- *
- * @param cls The plugin context (as returned by "init")
- * @return Always NULL
- */
-void *
-libgnunet_plugin_peerstore_flat_done (void *cls)
-{
- struct GNUNET_PEERSTORE_PluginFunctions *api = cls;
- struct Plugin *plugin = api->cls;
-
- database_shutdown (plugin);
- plugin->cfg = NULL;
- GNUNET_free (api);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Flat plugin is finished\n");
- return NULL;
-}
-
-
-/* end of plugin_peerstore_sqlite.c */
diff --git a/src/plugin/peerstore/plugin_peerstore_sqlite.c
b/src/plugin/peerstore/plugin_peerstore_sqlite.c
index 7d06d2c63..373315bab 100644
--- a/src/plugin/peerstore/plugin_peerstore_sqlite.c
+++ b/src/plugin/peerstore/plugin_peerstore_sqlite.c
@@ -105,6 +105,11 @@ struct Plugin
*/
sqlite3_stmt *select_peerstoredata_by_all;
+ /**
+ * Precompiled SQL for selecting from peerstoredata
+ */
+ sqlite3_stmt *upsert_peerstoredata_later_expiry;
+
/**
* Precompiled SQL for deleting expired
* records from peerstoredata
@@ -243,13 +248,17 @@ peerstore_sqlite_iterate_records (void *cls,
const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
- GNUNET_PEERSTORE_Processor iter,
+ uint64_t serial,
+ uint64_t limit,
+ GNUNET_PEERSTORE_PluginProcessor iter,
void *iter_cls)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt;
int err = 0;
int sret;
+ int ret;
+ uint64_t seq;
struct GNUNET_PEERSTORE_Record rec;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -260,6 +269,8 @@ peerstore_sqlite_iterate_records (void *cls,
{
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -272,6 +283,8 @@ peerstore_sqlite_iterate_records (void *cls,
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_string (key),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -287,6 +300,8 @@ peerstore_sqlite_iterate_records (void *cls,
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_auto_from_type (peer),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -300,6 +315,8 @@ peerstore_sqlite_iterate_records (void *cls,
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_auto_from_type (peer),
GNUNET_SQ_query_param_string (key),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -320,46 +337,56 @@ peerstore_sqlite_iterate_records (void *cls,
}
err = 0;
- while (SQLITE_ROW == (sret = sqlite3_step (stmt)))
+ ret = GNUNET_OK;
+ for (uint64_t i = 0; i < limit; i++)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Returning a matched record.\n");
- struct GNUNET_SQ_ResultSpec rs[] = {
- GNUNET_SQ_result_spec_string (&rec.sub_system),
- GNUNET_SQ_result_spec_auto_from_type (&rec.peer),
- GNUNET_SQ_result_spec_string (&rec.key),
- GNUNET_SQ_result_spec_variable_size (&rec.value, &rec.value_size),
- GNUNET_SQ_result_spec_absolute_time (&rec.expiry),
- GNUNET_SQ_result_spec_end
- };
-
- if (GNUNET_OK !=
- GNUNET_SQ_extract_result (stmt,
- rs))
+ sret = sqlite3_step (stmt);
+ if (SQLITE_DONE == sret)
{
- GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Iteration done (no results)\n");
+ ret = GNUNET_NO;
break;
}
- if (NULL != iter)
- iter (iter_cls,
- &rec,
- NULL);
- GNUNET_SQ_cleanup_result (rs);
- }
- if (SQLITE_DONE != sret)
- {
- LOG_SQLITE (plugin,
- GNUNET_ERROR_TYPE_ERROR,
- "sqlite_step");
- err = 1;
+ if (SQLITE_ROW != sret)
+ {
+ LOG_SQLITE (plugin,
+ GNUNET_ERROR_TYPE_ERROR,
+ "sqlite_step");
+ ret = GNUNET_SYSERR;
+ break;
+ }
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Returning a matched record.\n");
+ struct GNUNET_SQ_ResultSpec rs[] = {
+ GNUNET_SQ_result_spec_uint64 (&seq),
+ GNUNET_SQ_result_spec_string (&rec.sub_system),
+ GNUNET_SQ_result_spec_auto_from_type (&rec.peer),
+ GNUNET_SQ_result_spec_string (&rec.key),
+ GNUNET_SQ_result_spec_variable_size (&rec.value, &rec.value_size),
+ GNUNET_SQ_result_spec_absolute_time (&rec.expiry),
+ GNUNET_SQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_SQ_extract_result (stmt,
+ rs))
+ {
+ GNUNET_break (0);
+ break;
+ }
+ if (NULL != iter)
+ iter (iter_cls,
+ seq,
+ &rec,
+ NULL);
+ GNUNET_SQ_cleanup_result (rs);
+ }
}
GNUNET_SQ_reset (plugin->dbh,
stmt);
- if (NULL != iter)
- iter (iter_cls,
- NULL,
- err ? "sqlite error" : NULL);
- return GNUNET_OK;
+ return ret;
}
@@ -393,7 +420,7 @@ peerstore_sqlite_store_record (void *cls,
void *cont_cls)
{
struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->insert_peerstoredata;
+ sqlite3_stmt *stmt;
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_auto_from_type (peer),
@@ -410,13 +437,43 @@ peerstore_sqlite_store_record (void *cls,
peer,
key);
}
- if (GNUNET_OK !=
- GNUNET_SQ_bind (stmt,
- params))
- LOG_SQLITE (plugin,
- GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_bind");
- else if (SQLITE_DONE != sqlite3_step (stmt))
+
+ if (GNUNET_PEERSTORE_STOREOPTION_UPSERT_LATER_EXPIRY == options)
+ {
+ struct GNUNET_SQ_QueryParam params_upsert[] = {
+ GNUNET_SQ_query_param_fixed_size (value, size),
+ GNUNET_SQ_query_param_absolute_time (&expiry),
+ GNUNET_SQ_query_param_string (sub_system),
+ GNUNET_SQ_query_param_auto_from_type (peer),
+ GNUNET_SQ_query_param_string (key),
+ GNUNET_SQ_query_param_absolute_time (&expiry),
+ GNUNET_SQ_query_param_end
+ };
+ stmt = plugin->upsert_peerstoredata_later_expiry;
+ if (GNUNET_OK !=
+ GNUNET_SQ_bind (stmt,
+ params_upsert))
+ {
+ LOG_SQLITE (plugin,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ GNUNET_assert (0);
+ }
+ }
+ else
+ {
+ stmt = plugin->insert_peerstoredata;
+ if (GNUNET_OK !=
+ GNUNET_SQ_bind (stmt,
+ params))
+ {
+ LOG_SQLITE (plugin,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ GNUNET_assert (0);
+ }
+ }
+ if (SQLITE_DONE != sqlite3_step (stmt))
{
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
@@ -558,6 +615,7 @@ database_setup (struct Plugin *plugin)
/* Create tables */
sql_exec (plugin->dbh,
"CREATE TABLE IF NOT EXISTS peerstoredata (\n"
+ " uid INTEGER PRIMARY KEY,"
" sub_system TEXT NOT NULL,\n"
" peer_id BLOB NOT NULL,\n"
" key TEXT NOT NULL,\n"
@@ -566,7 +624,7 @@ database_setup (struct Plugin *plugin)
/* Create Indices */
if (SQLITE_OK !=
sqlite3_exec (plugin->dbh,
- "CREATE INDEX IF NOT EXISTS peerstoredata_key_index ON
peerstoredata (sub_system, peer_id, key)",
+ "CREATE INDEX IF NOT EXISTS peerstoredata_key_index ON
peerstoredata (sub_system, peer_id, key, uid)",
NULL,
NULL,
NULL))
@@ -583,23 +641,46 @@ database_setup (struct Plugin *plugin)
" VALUES (?,?,?,?,?);",
&plugin->insert_peerstoredata);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
- " WHERE sub_system = ?",
+ "UPDATE peerstoredata"
+ " SET"
+ " value = ?,"
+ " expiry = ?"
+ " WHERE sub_system = ?"
+ " AND peer_id = ?"
+ " AND key = ?"
+ " AND expiry < ?",
+ &plugin->upsert_peerstoredata_later_expiry);
+ sql_prepare (plugin->dbh,
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
+ " WHERE sub_system = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
" WHERE sub_system = ?"
- " AND peer_id = ?",
+ " AND peer_id = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata_by_pid);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
" WHERE sub_system = ?"
- " AND key = ?",
+ " AND key = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata_by_key);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
" WHERE sub_system = ?"
- " AND peer_id = ?" " AND key = ?",
+ " AND peer_id = ?"
+ " AND key = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata_by_all);
sql_prepare (plugin->dbh,
"DELETE FROM peerstoredata"
diff --git a/src/plugin/peerstore/test_plugin_peerstore_flat.conf
b/src/plugin/peerstore/test_plugin_peerstore_flat.conf
deleted file mode 100644
index c55b1e9d6..000000000
--- a/src/plugin/peerstore/test_plugin_peerstore_flat.conf
+++ /dev/null
@@ -1,5 +0,0 @@
-[peerstore-flat]
-FILENAME = $GNUNET_TMP/gnunet-test-plugin-namestore-flat/flatdb
-
-[peerstore]
-# PREFIX = valgrind --log-file=/home/schanzen/dev/gnunet/src/peerstore/vg_log
diff --git a/src/service/cadet/gnunet-service-cadet_hello.c
b/src/service/cadet/gnunet-service-cadet_hello.c
index ac84fab1e..cc28f0fde 100644
--- a/src/service/cadet/gnunet-service-cadet_hello.c
+++ b/src/service/cadet/gnunet-service-cadet_hello.c
@@ -24,6 +24,7 @@
* @author Bartlomiej Polot
* @author Christian Grothoff
*/
+#include "gnunet_common.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_statistics_service.h"
@@ -50,7 +51,7 @@ static struct GNUNET_PEERSTORE_Handle *peerstore;
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
-static struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+static struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
@@ -63,17 +64,20 @@ static struct GNUNET_PEERSTORE_NotifyContext
*peerstore_notify;
*/
static void
got_hello (void *cls,
- const struct GNUNET_PeerIdentity *id,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *err_msg)
{
struct CadetPeer *peer;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
- if ((NULL == id) ||
- (NULL == hello))
+ if (NULL == record->value)
+ {
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
return;
- if (0 == GNUNET_memcmp (id,
+ }
+ hello = record->value;
+ if (0 == GNUNET_memcmp (&record->peer,
&my_full_id))
{
GNUNET_free (mine);
@@ -83,19 +87,37 @@ got_hello (void *cls,
GNUNET_TIME_UNIT_ZERO);
GNUNET_HELLO_builder_free (builder);
GCD_hello_update ();
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Hello for %s (%d bytes), expires on %s\n",
- GNUNET_i2s (id),
+ GNUNET_i2s (&record->peer),
ntohs (hello->size),
GNUNET_STRINGS_absolute_time_to_string (
GNUNET_HELLO_builder_get_expiration_time (hello)));
- peer = GCP_get (id,
+ peer = GCP_get (&record->peer,
GNUNET_YES);
GCP_set_hello (peer,
hello);
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
+}
+
+
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
}
@@ -110,8 +132,17 @@ GCH_init (const struct GNUNET_CONFIGURATION_Handle *c)
GNUNET_assert (NULL == peerstore_notify);
peerstore = GNUNET_PEERSTORE_connect (c);
peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (peerstore, GNUNET_NO, &got_hello,
- NULL);
+ GNUNET_PEERSTORE_monitor_start (c,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &got_hello,
+ NULL);
}
@@ -123,7 +154,7 @@ GCH_shutdown ()
{
if (NULL != peerstore_notify)
{
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify = NULL;
}
if (NULL != peerstore)
@@ -153,8 +184,8 @@ GCH_get_mine (void)
{
builder = GNUNET_HELLO_builder_new (&my_full_id);
mine = GNUNET_HELLO_builder_to_dht_hello_msg (builder,
- my_private_key,
- GNUNET_TIME_UNIT_ZERO);
+ my_private_key,
+ GNUNET_TIME_UNIT_ZERO);
GNUNET_HELLO_builder_free (builder);
}
return mine;
diff --git a/src/service/dhtu/plugin_dhtu_gnunet.c
b/src/service/dhtu/plugin_dhtu_gnunet.c
index 7c02fdd15..826c33527 100644
--- a/src/service/dhtu/plugin_dhtu_gnunet.c
+++ b/src/service/dhtu/plugin_dhtu_gnunet.c
@@ -161,7 +161,7 @@ struct Plugin
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
- struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+ struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
* Identity of this peer.
@@ -176,7 +176,7 @@ struct Plugin
};
-//#include "../peerinfo-tool/gnunet-peerinfo_plugins.c"
+// #include "../peerinfo-tool/gnunet-peerinfo_plugins.c"
/**
@@ -198,7 +198,7 @@ gnunet_try_connect (void *cls,
int pfx_len;
eou = strstr (address,
- "://");
+ "://");
if (NULL == eou)
{
GNUNET_break (0);
@@ -379,7 +379,7 @@ add_addr (void *cls,
{
struct Plugin *plugin = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG ,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"peerinfo_cb addr %s\n",
addr);
plugin->env->address_add_cb (plugin->env->cls,
@@ -403,26 +403,44 @@ add_addr (void *cls,
*/
static void
peerinfo_cb (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *emsg)
{
struct Plugin *plugin = cls;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
+ hello = record->value;
if (NULL == hello)
return;
- if (NULL == peer)
- return;
if (0 !=
- GNUNET_memcmp (peer,
+ GNUNET_memcmp (&record->peer,
&plugin->my_identity))
+ {
+ GNUNET_PEERSTORE_monitor_next (plugin->peerstore_notify, 1);
return;
+ }
builder = GNUNET_HELLO_builder_from_msg (hello);
GNUNET_HELLO_builder_iterate (builder,
add_addr,
plugin);
GNUNET_HELLO_builder_free (builder);
+ GNUNET_PEERSTORE_monitor_next (plugin->peerstore_notify, 1);
+}
+
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
}
@@ -439,17 +457,23 @@ peerinfo_cb (void *cls,
* @param my_identity ID of this peer, NULL if we failed
*/
static void
- core_init_cb (void *cls,
+core_init_cb (void *cls,
const struct GNUNET_PeerIdentity *my_identity)
{
struct Plugin *plugin = cls;
plugin->my_identity = *my_identity;
- plugin->peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (plugin->peerstore,
- GNUNET_NO,
- &peerinfo_cb,
- plugin);
+ plugin->peerstore_notify = GNUNET_PEERSTORE_monitor_start (plugin->env->cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+
GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &peerinfo_cb,
+ plugin);
}
@@ -538,10 +562,10 @@ libgnunet_plugin_dhtu_gnunet_done (void *cls)
if (NULL != plugin->transport)
GNUNET_TRANSPORT_application_done (plugin->transport);
if (NULL != plugin->peerstore_notify)
- GNUNET_PEERSTORE_hello_changed_notify_cancel (plugin->peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (plugin->peerstore_notify);
if (NULL != plugin->peerstore)
GNUNET_PEERSTORE_disconnect (plugin->peerstore);
- //GPI_plugins_unload ();
+ // GPI_plugins_unload ();
GNUNET_free (plugin->my_priv);
GNUNET_free (plugin);
GNUNET_free (api);
@@ -570,7 +594,8 @@ libgnunet_plugin_dhtu_gnunet_init (void *cls)
};
plugin = GNUNET_new (struct Plugin);
- plugin->my_priv = GNUNET_CRYPTO_eddsa_key_create_from_configuration
(env->cfg);
+ plugin->my_priv = GNUNET_CRYPTO_eddsa_key_create_from_configuration (
+ env->cfg);
plugin->env = env;
api = GNUNET_new (struct GNUNET_DHTU_PluginFunctions);
api->cls = plugin;
@@ -598,6 +623,6 @@ libgnunet_plugin_dhtu_gnunet_init (void *cls)
libgnunet_plugin_dhtu_gnunet_done (plugin);
return NULL;
}
- //GPI_plugins_load (env->cfg);
+ // GPI_plugins_load (env->cfg);
return api;
}
diff --git a/src/service/fs/gnunet-service-fs_cp.c
b/src/service/fs/gnunet-service-fs_cp.c
index b2b73d0a5..18a25a030 100644
--- a/src/service/fs/gnunet-service-fs_cp.c
+++ b/src/service/fs/gnunet-service-fs_cp.c
@@ -44,7 +44,7 @@
* How often do we flush respect values to disk?
*/
#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
- GNUNET_TIME_UNIT_MINUTES, 5)
+ GNUNET_TIME_UNIT_MINUTES, 5)
/**
* After how long do we discard a reply?
@@ -394,6 +394,11 @@ peer_respect_cb (void *cls,
struct GSF_ConnectedPeer *cp = cls;
GNUNET_assert (NULL != cp->respect_iterate_req);
+ if ((NULL == record) && (NULL == emsg))
+ {
+ cp->respect_iterate_req = NULL;
+ return;
+ }
if ((NULL != record) &&
(sizeof(cp->disk_respect) == record->value_size))
{
@@ -402,8 +407,12 @@ peer_respect_cb (void *cls,
}
GSF_push_start_ (cp);
if (NULL != record)
- GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
+ {
+ GNUNET_PEERSTORE_iteration_stop (cp->respect_iterate_req);
+ return;
+ }
cp->respect_iterate_req = NULL;
+ GNUNET_PEERSTORE_iteration_next (cp->respect_iterate_req, 1);
}
@@ -476,12 +485,12 @@ GSF_peer_connect_handler (void *cls,
GNUNET_CONTAINER_multipeermap_size (cp_map),
GNUNET_NO);
cp->respect_iterate_req
- = GNUNET_PEERSTORE_iterate (peerstore,
- "fs",
- peer,
- "respect",
- &peer_respect_cb,
- cp);
+ = GNUNET_PEERSTORE_iteration_start (peerstore,
+ "fs",
+ peer,
+ "respect",
+ &peer_respect_cb,
+ cp);
GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
cp);
return cp;
@@ -1370,7 +1379,7 @@ GSF_peer_disconnect_handler (void *cls,
GNUNET_NO);
if (NULL != cp->respect_iterate_req)
{
- GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
+ GNUNET_PEERSTORE_iteration_stop (cp->respect_iterate_req);
cp->respect_iterate_req = NULL;
}
GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
diff --git a/src/service/hostlist/gnunet-daemon-hostlist_server.c
b/src/service/hostlist/gnunet-daemon-hostlist_server.c
index d2ef8dd7a..f243deb00 100644
--- a/src/service/hostlist/gnunet-daemon-hostlist_server.c
+++ b/src/service/hostlist/gnunet-daemon-hostlist_server.c
@@ -25,6 +25,7 @@
* @author David Barksdale
* @brief application to provide an integrated hostlist HTTP server
*/
+#include "gnunet_common.h"
#include "platform.h"
#include <microhttpd.h>
#include "gnunet-daemon-hostlist_server.h"
@@ -40,7 +41,7 @@
* time out?
*/
#define GNUNET_ADV_TIMEOUT \
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
/**
* Map with hellos we build the hostlist with.
@@ -82,7 +83,7 @@ struct GNUNET_SCHEDULER_Task *peerstore_notify_task;
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
-static struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+static struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
* Our primary task for IPv4.
@@ -464,13 +465,13 @@ connect_handler (void *cls,
*/
static void
process_notify (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *err_msg)
{
unsigned int map_size;
struct GNUNET_MessageHeader *hello_cpy;
struct GNUNET_PeerIdentity *peer_cpy;
+ struct GNUNET_MessageHeader *hello;
map_size = GNUNET_CONTAINER_multipeermap_size (hellos);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -483,6 +484,7 @@ process_notify (void *cls,
err_msg);
return;
}
+ hello = record->value;
if (NULL != builder)
{
GNUNET_free (builder->data);
@@ -495,7 +497,7 @@ process_notify (void *cls,
}
peer_cpy = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
- GNUNET_memcpy (peer_cpy, peer, sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (peer_cpy, &record->peer, sizeof (struct GNUNET_PeerIdentity));
hello_cpy = GNUNET_malloc (ntohs (hello->size));
GNUNET_memcpy (hello_cpy, hello, ntohs (hello->size));
GNUNET_assert (GNUNET_YES ==
@@ -513,7 +515,8 @@ process_notify (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"1 Peerstore is notifying us to rebuild our hostlist map size %u
peer %s\n",
map_size,
- GNUNET_i2s (peer));
+ GNUNET_i2s (&record->peer));
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
}
@@ -594,6 +597,22 @@ prepare_daemon (struct MHD_Daemon *daemon_handle)
}
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
+}
+
+
static void
start_notify (void *cls)
{
@@ -601,9 +620,16 @@ start_notify (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting to process new hellos to add to hostlist.\n");
- peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (peerstore, GNUNET_NO,
- &process_notify, NULL);
+ peerstore_notify = GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+
GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &process_notify, NULL);
}
@@ -820,9 +846,10 @@ GNUNET_HOSTLIST_server_start (const struct
GNUNET_CONFIGURATION_Handle *c,
hostlist_task_v4 = prepare_daemon (daemon_handle_v4);
if (NULL != daemon_handle_v6)
hostlist_task_v6 = prepare_daemon (daemon_handle_v6);
- peerstore_notify_task = GNUNET_SCHEDULER_add_delayed
(GNUNET_TIME_UNIT_MINUTES,
- start_notify,
- NULL);
+ peerstore_notify_task = GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_UNIT_MINUTES,
+ start_notify,
+ NULL);
return GNUNET_OK;
}
@@ -861,7 +888,7 @@ GNUNET_HOSTLIST_server_stop ()
}
if (NULL != peerstore_notify)
{
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify = NULL;
}
else if (NULL != peerstore_notify_task)
diff --git a/src/service/namestore/gnunet-service-namestore.c
b/src/service/namestore/gnunet-service-namestore.c
index b159ba769..f29058d0e 100644
--- a/src/service/namestore/gnunet-service-namestore.c
+++ b/src/service/namestore/gnunet-service-namestore.c
@@ -109,16 +109,10 @@ struct ZoneIteration
*/
uint32_t offset;
- /**
- * Number of pending cache operations triggered by this zone iteration which
we
- * need to wait for before allowing the client to continue.
- */
- unsigned int cache_ops;
-
/**
* Set to #GNUNET_YES if the last iteration exhausted the limit set by the
* client and we should send the
#GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
- * message and free the data structure once @e cache_ops is zero.
+ * message and free the data structure.
*/
int send_end;
};
@@ -1360,7 +1354,8 @@ handle_edit_record_set (void *cls, const struct
EditRecordSetMessage *er_msg)
env =
GNUNET_MQ_msg_extra (rer_msg,
rlc.rd_ser_len + old_editor_hint_len,
-
GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_RESPONSE);
+
GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_RESPONSE)
+ ;
rer_msg->editor_hint_len = htons (old_editor_hint_len);
rer_msg->gns_header.r_id = er_msg->gns_header.r_id;
rer_msg->rd_count = htons (rlc.res_rd_count);
@@ -1859,7 +1854,8 @@ store_record_set (struct NamestoreClient *nc,
lctx.exp.abs_value_us;
rd_nf[rd_nf_count].data = NULL;
rd_nf[rd_nf_count].data_size = 0;
- rd_nf[rd_nf_count].flags = GNUNET_GNSRECORD_RF_PRIVATE |
GNUNET_GNSRECORD_RF_MAINTENANCE;
+ rd_nf[rd_nf_count].flags = GNUNET_GNSRECORD_RF_PRIVATE
+ | GNUNET_GNSRECORD_RF_MAINTENANCE;
rd_nf_count++;
}
if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
@@ -2285,8 +2281,7 @@ run_zone_iteration_round (struct ZoneIteration *zi,
uint64_t limit)
"Returned %llu results, more results available\n",
(unsigned long long) limit);
zi->send_end = (0 != proc.limit);
- if (0 == zi->cache_ops)
- zone_iteration_done_client_continue (zi);
+ zone_iteration_done_client_continue (zi);
}
diff --git a/src/service/peerstore/gnunet-service-peerstore.c
b/src/service/peerstore/gnunet-service-peerstore.c
index 90b4e8d88..ee9847164 100644
--- a/src/service/peerstore/gnunet-service-peerstore.c
+++ b/src/service/peerstore/gnunet-service-peerstore.c
@@ -23,6 +23,8 @@
* @brief peerstore service implementation
* @author Omar Tarabai
*/
+#include "gnunet_peerstore_service.h"
+#include "gnunet_protocols.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "peerstore.h"
@@ -36,6 +38,214 @@
*/
#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
+
+/**
+ * A namestore client
+ */
+struct PeerstoreClient;
+
+/**
+ * A peerstore monitor.
+ */
+struct Monitor
+{
+ /**
+ * Next element in the DLL
+ */
+ struct Monitor *next;
+
+ /**
+ * Previous element in the DLL
+ */
+ struct Monitor *prev;
+
+ /**
+ * Namestore client which intiated this zone monitor
+ */
+ struct PeerstoreClient *pc;
+
+ /**
+ * Task active during initial iteration.
+ */
+ struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * Task to warn about slow monitors.
+ */
+ struct GNUNET_SCHEDULER_Task *sa_wait_warning;
+
+ /**
+ * Since when are we blocked on this monitor?
+ */
+ struct GNUNET_TIME_Absolute sa_waiting_start;
+
+ /**
+ * Last sequence number in the zone iteration used to address next
+ * result of the iteration in the store
+ *
+ * Initially set to 0.
+ * Updated in #monitor_iterate_cb()
+ */
+ uint64_t seq;
+
+ /**
+ * Current limit of how many more messages we are allowed
+ * to queue to this monitor.
+ */
+ uint64_t limit;
+
+ /**
+ * How many more requests may we receive from the iterator
+ * before it is at the limit we gave it? Will be below or
+ * equal to @e limit. The effective limit for monitor
+ * events is thus @e iteration_cnt - @e limit!
+ */
+ uint64_t iteration_cnt;
+
+ /**
+ * Are we (still) in the initial iteration pass?
+ */
+ int in_first_iteration;
+
+ /**
+ * Is the peer set?
+ */
+ int peer_set;
+
+ /**
+ * Responsible sub system string
+ */
+ char *sub_system;
+
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Record key string
+ */
+ char *key;
+};
+
+/**
+ * A peerstore iteration operation.
+ */
+struct Iteration
+{
+ /**
+ * Next element in the DLL
+ */
+ struct Iteration *next;
+
+ /**
+ * Previous element in the DLL
+ */
+ struct Iteration *prev;
+
+ /**
+ * Namestore client which intiated this zone iteration
+ */
+ struct PeerstoreClient *pc;
+
+ /**
+ * Last sequence number in the zone iteration used to address next
+ * result of the zone iteration in the store
+ *
+ * Initially set to 0.
+ * Updated in #zone_iterate_proc()
+ */
+ uint64_t seq;
+
+ /**
+ * The operation id for the zone iteration in the response for the client
+ */
+ uint32_t request_id;
+
+ /**
+ * Offset of the zone iteration used to address next result of the zone
+ * iteration in the store
+ *
+ * Initially set to 0 in #handle_iteration_start
+ * Incremented with by every call to #handle_iteration_next
+ */
+ uint32_t offset;
+
+ /**
+ * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
+ * client and we should send the
#GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD_RESULT_END
+ * message and free the data structure.
+ */
+ int send_end;
+
+ /**
+ * Responsible sub system string
+ */
+ char *sub_system;
+
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Record key string
+ */
+ char *key;
+
+ /**
+ * Peer is set?
+ */
+ int peer_set;
+};
+
+/**
+ * A peerstore client
+ */
+struct PeerstoreClient
+{
+ /**
+ * The client
+ */
+ struct GNUNET_SERVICE_Client *client;
+
+ /**
+ * Message queue for transmission to @e client
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Head of the DLL of
+ * Zone iteration operations in progress initiated by this client
+ */
+ struct Iteration *op_head;
+
+ /**
+ * Tail of the DLL of
+ * Zone iteration operations in progress initiated by this client
+ */
+ struct Iteration *op_tail;
+};
+
+
+struct StoreRecordContext
+{
+ /**
+ * The record that was stored.
+ */
+ struct GNUNET_PEERSTORE_Record *record;
+
+ /**
+ * The request ID
+ */
+ uint32_t rid;
+
+ /**
+ * The client
+ */
+ struct PeerstoreClient *pc;
+};
+
/**
* Our configuration.
*/
@@ -51,26 +261,25 @@ static char *db_lib_name;
*/
static struct GNUNET_PEERSTORE_PluginFunctions *db;
-/**
- * Hashmap with all watch requests
- */
-static struct GNUNET_CONTAINER_MultiHashMap *watchers;
-
/**
* Task run to clean up expired records.
*/
static struct GNUNET_SCHEDULER_Task *expire_task;
/**
- * Are we in the process of shutting down the service? #GNUNET_YES / #GNUNET_NO
+ * Monitor DLL
*/
-static int in_shutdown;
+static struct Monitor *monitors_head;
/**
- * Number of connected clients.
+ * Monitor DLL
*/
-static unsigned int num_clients;
+static struct Monitor *monitors_tail;
+/**
+ * Notification context shared by all monitors.
+ */
+static struct GNUNET_NotificationContext *monitor_nc;
/**
* Perform the actual shutdown operations
@@ -86,53 +295,20 @@ do_shutdown ()
GNUNET_free (db_lib_name);
db_lib_name = NULL;
}
- if (NULL != watchers)
- {
- GNUNET_CONTAINER_multihashmap_destroy (watchers);
- watchers = NULL;
- }
if (NULL != expire_task)
{
GNUNET_SCHEDULER_cancel (expire_task);
expire_task = NULL;
}
+ if (NULL != monitor_nc)
+ {
+ GNUNET_notification_context_destroy (monitor_nc);
+ monitor_nc = NULL;
+ }
GNUNET_SCHEDULER_shutdown ();
}
-struct IterationContext
-{
- /**
- * The record that was stored.
- */
- struct GNUNET_PEERSTORE_Record *record;
-
- /**
- * The request ID
- */
- uint32_t rid;
-
-};
-
-struct StoreRecordContext
-{
- /**
- * The record that was stored.
- */
- struct GNUNET_PEERSTORE_Record *record;
-
- /**
- * The request ID
- */
- uint32_t rid;
-
- /**
- * The client
- */
- struct GNUNET_SERVICE_Client *client;
-};
-
-
/**
* Task run during shutdown.
*
@@ -143,9 +319,7 @@ shutdown_task (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Priming PEERSTORE for shutdown.\n");
- in_shutdown = GNUNET_YES;
- if (0 == num_clients) /* Only when no connected clients. */
- do_shutdown ();
+ do_shutdown ();
}
@@ -201,114 +375,123 @@ expire_records_continuation (void *cls, int success)
/**
- * A client disconnected. Remove all of its data structure entries.
+ * Send 'sync' message to zone monitor, we're now in sync.
*
- * @param cls closure, NULL
- * @param client identification of the client
- * @param mq the message queue
- * @return
+ * @param zm monitor that is now in sync
*/
-static void *
-client_connect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- struct GNUNET_MQ_Handle *mq)
+static void
+monitor_sync (struct Monitor *mc)
{
- num_clients++;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *sync;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "A client connected (now %u)\n", num_clients);
- return client;
+ "Synching zone monitor %p\n", mc);
+
+ env = GNUNET_MQ_msg (sync, GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_SYNC);
+ GNUNET_MQ_send (mc->pc->mq, env);
+ /* mark iteration done */
+ mc->in_first_iteration = GNUNET_NO;
+ mc->iteration_cnt = 0;
}
/**
- * Search for a disconnected client and remove it
+ * Given a new record, notifies watchers
*
- * @param cls closuer, a `struct GNUNET_SERVICE_Client`
- * @param key hash of record key
- * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
- * @return #GNUNET_OK to continue iterating
+ * @param record changed record to update watchers with
*/
-static int
-client_disconnect_it (void *cls, const struct GNUNET_HashCode *key, void
*value)
+static void
+watch_notifier (struct GNUNET_PEERSTORE_Record *record)
{
- if (value == cls)
+ struct GNUNET_MQ_Envelope *env;
+ struct Monitor *mc;
+
+ // FIXME this is very inefficient, we may want to use a hash
+ // map again.
+ for (mc = monitors_head; NULL != mc; mc = mc->next)
{
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (watchers, key,
value));
- num_clients++; /* Watchers do not count */
+ if ((GNUNET_YES == mc->peer_set) &&
+ (0 != memcmp (&mc->peer, &record->peer, sizeof (record->peer))))
+ continue;
+ if ((NULL != mc->sub_system) &&
+ (0 != strcmp (mc->sub_system, record->sub_system)))
+ continue;
+ if ((NULL != mc->key) &&
+ (0 != strcmp (mc->key, record->key)))
+ continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found watcher %p to update.\n", mc);
+ env = PEERSTORE_create_record_mq_envelope (
+ 0,
+ record->sub_system,
+ &record->peer,
+ record->key,
+ record->value,
+ record->value_size,
+ record->expiry,
+ 0,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD);
+ GNUNET_MQ_send (mc->pc->mq, env);
}
- return GNUNET_OK;
}
/**
- * A client disconnected. Remove all of its data structure entries.
- *
- * @param cls closure, NULL
- * @param client identification of the client
+ * Context for iteration operations passed from
+ * #run_iteration_round to #iterate_proc as closure
*/
-static void
-client_disconnect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- void *app_cls)
+struct IterationProcResult
{
- num_clients--;
- if (NULL != watchers)
- GNUNET_CONTAINER_multihashmap_iterate (watchers,
- &client_disconnect_it,
- client);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "A client disconnected (%u remaining).\n",
- num_clients);
- if ((0 == num_clients) && in_shutdown)
- do_shutdown ();
-}
+ /**
+ * The zone iteration handle
+ */
+ struct Iteration *ic;
+ /**
+ * Number of results left to be returned in this iteration.
+ */
+ uint64_t limit;
+
+};
/**
- * Function called by for each matching record.
+ * Process results for zone iteration from database
*
- * @param cls closure
- * @param record peerstore record found
- * @param emsg error message or NULL if no errors
- * @return #GNUNET_YES to continue iteration
+ * @param cls struct ZoneIterationProcResult
+ * @param seq sequence number of the record, MUST NOT BE ZERO
+ * @param zone_key the zone key
+ * @param name name
+ * @param rd_count number of records for this name
+ * @param rd record data
*/
static void
-record_iterator (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
+iterate_proc (void *cls,
+ uint64_t seq,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
{
- struct IterationContext *ic = cls;
+ struct IterationProcResult *proc = cls;
struct GNUNET_MQ_Envelope *env;
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error iterating over peerstore: `%s'", emsg);
+ return;
+ }
if (NULL == record)
{
- /* No more records */
- struct PeerstoreResultMessage *endmsg;
-
- env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
- endmsg->rid = ic->rid;
- if (NULL == emsg)
- {
- endmsg->result = htonl (GNUNET_OK);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
- GNUNET_SERVICE_client_continue (ic->record->client);
- }
- else
- {
- endmsg->result = htonl (GNUNET_SYSERR);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
- GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to iterate: %s\n", emsg);
- GNUNET_SERVICE_client_drop (ic->record->client);
- }
- PEERSTORE_destroy_record (ic->record);
- GNUNET_free (ic);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration done\n");
return;
}
-
+ if (0 == proc->limit)
+ {
+ /* what is this!? should never happen */
+ GNUNET_break (0);
+ return;
+ }
+ proc->ic->seq = seq;
env = PEERSTORE_create_record_mq_envelope (
- ic->rid,
+ proc->ic->request_id,
record->sub_system,
&record->peer,
record->key,
@@ -316,132 +499,361 @@ record_iterator (void *cls,
record->value_size,
record->expiry,
0,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD);
+ GNUNET_MQ_send (proc->ic->pc->mq, env);
+ proc->limit--;
}
-
/**
- * Iterator over all watcher clients
- * to notify them of a new record
+ * Function called once we are done with the iteration and
+ * allow the zone iteration client to send us more messages.
*
- * @param cls closure, a `struct GNUNET_PEERSTORE_Record *`
- * @param key hash of record key
- * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
- * @return #GNUNET_YES to continue iterating
+ * @param zi zone iteration we are processing
*/
-static int
-watch_notifier_it (void *cls, const struct GNUNET_HashCode *key, void *value)
+static void
+iteration_done_client_continue (struct Iteration *ic)
{
- struct GNUNET_PEERSTORE_Record *record = cls;
- struct GNUNET_SERVICE_Client *client = value;
struct GNUNET_MQ_Envelope *env;
+ struct PeerstoreResultMessage *endmsg;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
- env = PEERSTORE_create_record_mq_envelope (
- 0,
- record->sub_system,
- &record->peer,
- record->key,
- record->value,
- record->value_size,
- record->expiry,
- 0,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
- return GNUNET_YES;
+ GNUNET_SERVICE_client_continue (ic->pc->client);
+ if (! ic->send_end)
+ return;
+ /* No more records */
+
+ env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+ endmsg->rid = htons (ic->request_id);
+ endmsg->result = htonl (GNUNET_OK);
+ GNUNET_MQ_send (ic->pc->mq, env);
+ GNUNET_free (ic->key);
+ GNUNET_free (ic->sub_system);
+ GNUNET_CONTAINER_DLL_remove (ic->pc->op_head, ic->pc->op_tail, ic);
+ GNUNET_free (ic);
+ return;
}
+
/**
- * Given a new record, notifies watchers
+ * Perform the next round of the zone iteration.
*
- * @param record changed record to update watchers with
+ * @param ic iterator to process
+ * @param limit number of results to return in one pass
*/
static void
-watch_notifier (struct GNUNET_PEERSTORE_Record *record)
+run_iteration_round (struct Iteration *ic, uint64_t limit)
{
- struct GNUNET_HashCode keyhash;
+ struct IterationProcResult proc;
+ struct GNUNET_TIME_Absolute start;
+ struct GNUNET_TIME_Relative duration;
- PEERSTORE_hash_key (record->sub_system, &record->peer, record->key,
&keyhash);
- GNUNET_CONTAINER_multihashmap_get_multiple (watchers,
- &keyhash,
- &watch_notifier_it,
- record);
+ memset (&proc, 0, sizeof(proc));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to return up to %llu records at position %llu\n",
+ (unsigned long long) limit,
+ (unsigned long long) ic->seq);
+ proc.ic = ic;
+ proc.limit = limit;
+ start = GNUNET_TIME_absolute_get ();
+ GNUNET_break (GNUNET_SYSERR !=
+ db->iterate_records (db->cls,
+ ic->sub_system,
+ (GNUNET_YES == ic->peer_set) ? &ic->peer :
+ NULL,
+ ic->key,
+ ic->seq,
+ proc.limit,
+ &iterate_proc,
+ &proc));
+ duration = GNUNET_TIME_absolute_get_duration (start);
+ duration = GNUNET_TIME_relative_divide (duration, limit - proc.limit);
+ if (0 == proc.limit)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Returned %llu results, more results available\n",
+ (unsigned long long) limit);
+ ic->send_end = (0 != proc.limit);
+ iteration_done_client_continue (ic);
}
/**
- * Handle a watch cancel request from client
+ * Check an iterate request from client
+ *
+ * @param cls client identification of the client
+ * @param srm the actual message
+ * @return #GNUNET_OK if @a srm is well-formed
+ */
+static int
+check_iterate_start (void *cls, const struct
+ PeerstoreIterationStartMessage *srm)
+{
+ uint16_t ss_size;
+ uint16_t key_size;
+ uint16_t size;
+
+ ss_size = ntohs (srm->sub_system_size);
+ key_size = ntohs (srm->key_size);
+ size = ntohs (srm->header.size);
+
+ if (size < key_size + ss_size + sizeof(*srm))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle an iterate request from client
*
* @param cls identification of the client
- * @param hm the actual message
+ * @param srm the actual message
*/
static void
-handle_watch_cancel (void *cls, const struct StoreKeyHashMessage *hm)
+handle_iterate_start (void *cls, const struct PeerstoreIterationStartMessage *
+ srm)
{
- struct GNUNET_SERVICE_Client *client = cls;
+ struct Iteration *ic = GNUNET_new (struct Iteration);
+ uint16_t ss_size;
+ char *ptr;
+
+ ss_size = ntohs (srm->sub_system_size);
+
+ ic->pc = cls;
+ ic->request_id = ntohs (srm->rid);
+ ic->offset = 0;
+ ic->peer_set = (ntohs (srm->peer_set)) ? GNUNET_YES : GNUNET_NO;
+ if (GNUNET_YES == ic->peer_set)
+ ic->peer = srm->peer;
+ ptr = (char*) &srm[1];
+ ic->sub_system = GNUNET_strdup (ptr);
+ ptr += ss_size;
+ if (0 < ntohs (srm->key_size))
+ ic->key = GNUNET_strdup (ptr);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Iterate request: ss `%s', peer `%s', key `%s'\n",
+ ic->sub_system,
+ GNUNET_i2s (&ic->peer),
+ (NULL == ic->key) ? "NULL" : ic->key);
+ GNUNET_CONTAINER_DLL_insert (ic->pc->op_head,
+ ic->pc->op_tail,
+ ic);
+ run_iteration_round (ic, 1);
+}
+
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request.\n");
- if (GNUNET_OK !=
- GNUNET_CONTAINER_multihashmap_remove (watchers, &hm->keyhash, client))
+/**
+ * Handles a #GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATION_STOP message
+ *
+ * @param cls the client sending the message
+ * @param zis_msg message from the client
+ */
+static void
+handle_iterate_stop (void *cls,
+ const struct PeerstoreIterationStopMessage *zis_msg)
+{
+ struct PeerstoreClient *pc = cls;
+ struct Iteration *ic;
+ uint32_t rid;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received ITERATION_STOP message\n");
+ rid = ntohl (zis_msg->rid);
+ for (ic = pc->op_head; NULL != ic; ic = ic->next)
+ if (ic->request_id == rid)
+ break;
+ if (NULL == ic)
{
GNUNET_break (0);
- GNUNET_SERVICE_client_drop (client);
+ GNUNET_SERVICE_client_drop (pc->client);
return;
}
- num_clients++;
- GNUNET_SERVICE_client_continue (client);
+ GNUNET_CONTAINER_DLL_remove (pc->op_head, pc->op_tail, ic);
+ GNUNET_free (ic);
+ GNUNET_SERVICE_client_continue (pc->client);
}
/**
- * Handle a watch request from client
+ * Handles a #GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATION_NEXT message
*
- * @param cls identification of the client
- * @param hm the actual message
+ * @param cls the client sending the message
+ * @param zis_msg message from the client
*/
static void
-handle_watch (void *cls, const struct StoreKeyHashMessage *hm)
+handle_iterate_next (void *cls,
+ const struct PeerstoreIterationNextMessage *is_msg)
{
- struct GNUNET_SERVICE_Client *client = cls;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch request.\n");
- num_clients--; /* do not count watchers */
- GNUNET_SERVICE_client_mark_monitor (client);
- GNUNET_CONTAINER_multihashmap_put (watchers,
- &hm->keyhash,
- client,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- GNUNET_SERVICE_client_continue (client);
+ struct PeerstoreClient *pc = cls;
+ struct Iteration *ic;
+ uint32_t rid;
+ uint64_t limit;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received ITERATION_NEXT message\n");
+ rid = ntohs (is_msg->rid);
+ limit = GNUNET_ntohll (is_msg->limit);
+ for (ic = pc->op_head; NULL != ic; ic = ic->next)
+ if (ic->request_id == rid)
+ break;
+ if (NULL == ic)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Not in iteration...\n");
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (pc->client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found iteration...\n");
+ run_iteration_round (ic, limit);
}
/**
- * Check an iterate request from client
+ * Obtain the next datum during the monitor's initial iteration.
*
- * @param cls client identification of the client
- * @param srm the actual message
- * @return #GNUNET_OK if @a srm is well-formed
+ * @param cls monitor that does its initial iteration
*/
-static int
-check_iterate (void *cls, const struct StoreRecordMessage *srm)
+static void
+monitor_iteration_next (void *cls);
+
+
+/**
+ * A #GNUNET_NAMESTORE_RecordIterator for monitors.
+ *
+ * @param cls a 'struct ZoneMonitor *' with information about the monitor
+ * @param seq sequence number of the record, MUST NOT BE ZERO
+ * @param zone_key zone key of the zone
+ * @param name name
+ * @param rd_count number of records in @a rd
+ * @param rd array of records
+ */
+static void
+monitor_iterate_cb (void *cls,
+ uint64_t seq,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
{
- struct GNUNET_PEERSTORE_Record *record;
+ struct Monitor *mc = cls;
+ struct GNUNET_MQ_Envelope *env;
- record = PEERSTORE_parse_record_message (srm);
+ GNUNET_assert (0 != seq);
+ mc->seq = seq;
+
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error iterating over peerstore: `%s'", emsg);
+ return;
+ }
if (NULL == record)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration done\n");
+ return;
+ }
+ if (0 == mc->limit)
+ {
+ /* what is this!? should never happen */
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return;
}
- if (NULL == record->sub_system)
+ env = PEERSTORE_create_record_mq_envelope (
+ 0,
+ record->sub_system,
+ &record->peer,
+ record->key,
+ record->value,
+ record->value_size,
+ record->expiry,
+ 0,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD);
+ GNUNET_MQ_send (mc->pc->mq, env);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sent records.\n");
+ mc->limit--;
+ mc->iteration_cnt--;
+ if ((0 == mc->iteration_cnt) && (0 != mc->limit))
+ {
+ /* We are done with the current iteration batch, AND the
+ client would right now accept more, so go again! */
+ GNUNET_assert (NULL == mc->task);
+ mc->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, mc);
+ }
+}
+
+
+/**
+ * Obtain the next datum during the zone monitor's zone initial iteration.
+ *
+ * @param cls zone monitor that does its initial iteration
+ */
+static void
+monitor_iteration_next (void *cls)
+{
+ struct Monitor *mc = cls;
+ int ret;
+
+ mc->task = NULL;
+ GNUNET_assert (0 == mc->iteration_cnt);
+ if (mc->limit > 16)
+ mc->iteration_cnt = mc->limit / 2; /* leave half for monitor events */
+ else
+ mc->iteration_cnt = mc->limit; /* use it all */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Running iteration\n");
+ ret = db->iterate_records (db->cls,
+ mc->sub_system,
+ (GNUNET_YES == mc->peer_set) ? &mc->peer : NULL,
+ mc->key,
+ mc->seq,
+ mc->iteration_cnt,
+ &monitor_iterate_cb,
+ mc);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_free (mc->key);
+ GNUNET_free (mc->sub_system);
+ GNUNET_free (mc);
+ GNUNET_SERVICE_client_drop (mc->pc->client);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Zone empty... syncing\n");
+ /* empty zone */
+ monitor_sync (mc);
+ return;
+ }
+}
+
+
+/**
+ * Check a monitor request from client
+ *
+ * @param cls client identification of the client
+ * @param srm the actual message
+ * @return #GNUNET_OK if @a srm is well-formed
+ */
+static int
+check_monitor_start (void *cls, const struct PeerstoreMonitorStartMessage *srm)
+{
+ uint16_t ss_size;
+ uint16_t key_size;
+ uint16_t size;
+
+ ss_size = ntohs (srm->sub_system_size);
+ key_size = ntohs (srm->key_size);
+ size = ntohs (srm->header.size);
+
+ if (size < key_size + ss_size + sizeof(*srm))
{
GNUNET_break (0);
- PEERSTORE_destroy_record (record);
return GNUNET_SYSERR;
}
- PEERSTORE_destroy_record (record);
return GNUNET_OK;
}
@@ -453,31 +865,87 @@ check_iterate (void *cls, const struct StoreRecordMessage
*srm)
* @param srm the actual message
*/
static void
-handle_iterate (void *cls, const struct StoreRecordMessage *srm)
+handle_monitor_start (void *cls, const struct PeerstoreMonitorStartMessage
*msm)
+{
+ struct Monitor *mc = GNUNET_new (struct Monitor);
+
+ uint16_t ss_size;
+ char *ptr;
+
+ ss_size = ntohs (msm->sub_system_size);
+
+ mc->pc = cls;
+ mc->peer_set = (ntohs (msm->peer_set)) ? GNUNET_YES : GNUNET_NO;
+ if (GNUNET_YES == mc->peer_set)
+ mc->peer = msm->peer;
+ ptr = (char*) &msm[1];
+ if (0 < ss_size)
+ mc->sub_system = GNUNET_strdup (ptr);
+ ptr += ss_size;
+ if (0 < ntohs (msm->key_size))
+ mc->key = GNUNET_strdup (ptr);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Monitor request: ss `%s', peer `%s', key `%s'\n",
+ mc->sub_system,
+ GNUNET_i2s (&mc->peer),
+ (NULL == mc->key) ? "NULL" : mc->key);
+ mc->in_first_iteration = (GNUNET_YES == ntohs (msm->iterate_first));
+ mc->limit = 1;
+ mc->peer_set = (ntohs (msm->peer_set)) ? GNUNET_YES : GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert (monitors_head,
+ monitors_tail,
+ mc);
+ GNUNET_SERVICE_client_mark_monitor (mc->pc->client);
+ GNUNET_notification_context_add (monitor_nc, mc->pc->mq);
+ if (mc->in_first_iteration)
+ mc->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, mc);
+ else
+ monitor_sync (mc);
+}
+
+
+/**
+ * Handles a #GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT message
+ *
+ * @param cls the client sending the message
+ * @param nm message from the client
+ */
+static void
+handle_monitor_next (void *cls, const struct PeerstoreMonitorNextMessage *nm)
{
- struct IterationContext *ic = GNUNET_new (struct IterationContext);
+ struct PeerstoreClient *pc = cls;
+ struct Monitor *mc;
+ uint64_t inc;
- ic->record = PEERSTORE_parse_record_message (srm);
+ inc = GNUNET_ntohll (nm->limit);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Iterate request: ss `%s', peer `%s', key `%s'\n",
- ic->record->sub_system,
- GNUNET_i2s (&ic->record->peer),
- (NULL == ic->record->key) ? "NULL" : ic->record->key);
- ic->record->client = cls;
- ic->rid = srm->rid;
- if (GNUNET_OK !=
- db->iterate_records (db->cls,
- ic->record->sub_system,
- (ntohs (srm->peer_set)) ? &ic->record->peer : NULL,
- ic->record->key,
- &record_iterator,
- ic))
+ "Received MONITOR_NEXT message with limit %llu\n",
+ (unsigned long long) inc);
+ for (mc = monitors_head; NULL != mc; mc = mc->next)
+ if (mc->pc == pc)
+ break;
+ if (NULL == mc)
{
GNUNET_break (0);
- GNUNET_SERVICE_client_drop (ic->record->client);
- PEERSTORE_destroy_record (ic->record);
- GNUNET_free (ic);
+ GNUNET_SERVICE_client_drop (pc->client);
+ return;
}
+ GNUNET_SERVICE_client_continue (pc->client);
+ if (mc->limit + inc < mc->limit)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (pc->client);
+ return;
+ }
+ mc->limit += inc;
+ if ((mc->in_first_iteration) && (mc->limit == inc))
+ {
+ /* We are still iterating, and the previous iteration must
+ have stopped due to the client's limit, so continue it! */
+ GNUNET_assert (NULL == mc->task);
+ mc->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, mc);
+ }
+ GNUNET_assert (mc->iteration_cnt <= mc->limit);
}
@@ -494,13 +962,12 @@ store_record_continuation (void *cls, int success)
struct PeerstoreResultMessage *msg;
struct GNUNET_MQ_Envelope *env;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT);
msg->rid = src->rid;
msg->result = htonl (success);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (src->client), env);
+ GNUNET_MQ_send (src->pc->mq, env);
watch_notifier (src->record);
- GNUNET_SERVICE_client_continue (src->client);
+ GNUNET_SERVICE_client_continue (src->pc->client);
PEERSTORE_destroy_record (src->record);
GNUNET_free (src);
}
@@ -514,7 +981,7 @@ store_record_continuation (void *cls, int success)
* @return #GNUNET_OK if @a srm is well-formed
*/
static int
-check_store (void *cls, const struct StoreRecordMessage *srm)
+check_store (void *cls, const struct PeerstoreRecordMessage *srm)
{
struct GNUNET_PEERSTORE_Record *record;
@@ -542,9 +1009,9 @@ check_store (void *cls, const struct StoreRecordMessage
*srm)
* @param srm the actual message
*/
static void
-handle_store (void *cls, const struct StoreRecordMessage *srm)
+handle_store (void *cls, const struct PeerstoreRecordMessage *srm)
{
- struct GNUNET_SERVICE_Client *client = cls;
+ struct PeerstoreClient *pc = cls;
struct StoreRecordContext *src = GNUNET_new (struct StoreRecordContext);
src->record = PEERSTORE_parse_record_message (srm);
GNUNET_log (
@@ -554,9 +1021,8 @@ handle_store (void *cls, const struct StoreRecordMessage
*srm)
GNUNET_i2s (&src->record->peer),
src->record->key,
(uint32_t) ntohl (srm->options));
- src->record->client = client;
src->rid = srm->rid;
- src->client = client;
+ src->pc = pc;
if (GNUNET_OK != db->store_record (db->cls,
src->record->sub_system,
&src->record->peer,
@@ -571,12 +1037,82 @@ handle_store (void *cls, const struct StoreRecordMessage
*srm)
GNUNET_break (0);
PEERSTORE_destroy_record (src->record);
GNUNET_free (src);
- GNUNET_SERVICE_client_drop (client);
+ GNUNET_SERVICE_client_drop (pc->client);
+ GNUNET_free (pc);
return;
}
}
+/**
+ * A client disconnected. Remove all of its data structure entries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ * @param mq the message queue
+ * @return
+ */
+static void *
+client_connect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ struct GNUNET_MQ_Handle *mq)
+{
+ struct PeerstoreClient *pc;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "A client %p connected\n", client);
+ pc = GNUNET_new (struct PeerstoreClient);
+ pc->client = client;
+ pc->mq = mq;
+ return pc;
+}
+
+
+/**
+ * A client disconnected. Remove all of its data structure entries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ */
+static void
+client_disconnect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ void *app_cls)
+{
+ struct PeerstoreClient *pc = app_cls;
+ struct Iteration *iter;
+ struct Monitor *mo;
+
+ (void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client %p disconnected.\n",
+ client);
+ for (mo = monitors_head; NULL != mo; mo = mo->next)
+ {
+ if (pc != mo->pc)
+ continue;
+ if (NULL != mo->task)
+ {
+ GNUNET_SCHEDULER_cancel (mo->task);
+ mo->task = NULL;
+ }
+ if (NULL != mo->sa_wait_warning)
+ {
+ GNUNET_SCHEDULER_cancel (mo->sa_wait_warning);
+ mo->sa_wait_warning = NULL;
+ }
+ GNUNET_free (mo);
+ break;
+ }
+ while (NULL != (iter = pc->op_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (pc->op_head, pc->op_tail, iter);
+ GNUNET_free (iter);
+ }
+ GNUNET_free (pc);
+}
+
+
static void
store_hello_continuation (void *cls, int success)
{
@@ -601,29 +1137,9 @@ hosts_directory_scan_callback (void *cls, const char
*fullname)
struct GNUNET_HELLO_Builder *builder;
const struct GNUNET_PeerIdentity *pid;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "1 hosts_directory_scan_callback filename %s\n",
- fullname);
-
if (GNUNET_YES != GNUNET_DISK_file_test (fullname))
return GNUNET_OK; /* ignore non-files */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "2 hosts_directory_scan_callback filename %s\n",
- fullname);
-
- /* filename = strrchr (fullname, DIR_SEPARATOR); */
- /* if ((NULL == filename) || (1 > strlen (filename))) */
- /* filename = fullname; */
- /* else */
- /* filename++; */
-
- /* GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, */
- /* "3 hosts_directory_scan_callback filename %s\n", */
- /* filename); */
-
- /* if (GNUNET_YES != GNUNET_DISK_file_test (filename)) */
- /* return GNUNET_OK; */
size_total = GNUNET_DISK_fn_read (fullname, buffer, sizeof(buffer));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Read %d bytes from `%s'\n",
@@ -687,8 +1203,6 @@ run (void *cls,
char *ip;
char *peerdir;
- in_shutdown = GNUNET_NO;
- num_clients = 0;
cfg = c;
if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -713,7 +1227,6 @@ run (void *cls,
GNUNET_SCHEDULER_shutdown ();
return;
}
- watchers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records, NULL);
use_included = GNUNET_CONFIGURATION_get_value_yesno (cfg,
@@ -741,6 +1254,7 @@ run (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_ ("Skipping import of included HELLOs\n"));
}
+ monitor_nc = GNUNET_notification_context_create (1);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
}
@@ -750,27 +1264,27 @@ run (void *cls,
* Define "main" method using service macro.
*/
GNUNET_SERVICE_MAIN (
- "peerstore",
- GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
- &run,
- &client_connect_cb,
- &client_disconnect_cb,
- NULL,
- GNUNET_MQ_hd_var_size (store,
- GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
- struct StoreRecordMessage,
- NULL),
- GNUNET_MQ_hd_var_size (iterate,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE,
- struct StoreRecordMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (watch,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH,
- struct StoreKeyHashMessage,
+ "peerstore", GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, &run, &client_connect_cb,
+ &client_disconnect_cb, NULL,
+ GNUNET_MQ_hd_var_size (store, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
+ struct PeerstoreRecordMessage, NULL),
+ GNUNET_MQ_hd_var_size (iterate_start,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_START,
+ struct PeerstoreIterationStartMessage, NULL),
+ GNUNET_MQ_hd_fixed_size (iterate_stop,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_STOP,
+ struct PeerstoreIterationStopMessage, NULL),
+ GNUNET_MQ_hd_fixed_size (iterate_next,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_NEXT,
+ struct PeerstoreIterationNextMessage,
NULL),
- GNUNET_MQ_hd_fixed_size (watch_cancel,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL,
- struct StoreKeyHashMessage,
+ GNUNET_MQ_hd_var_size (monitor_start,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_START,
+ struct PeerstoreMonitorStartMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (monitor_next,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT,
+ struct PeerstoreMonitorNextMessage,
NULL),
GNUNET_MQ_handler_end ());
diff --git a/src/service/peerstore/meson.build
b/src/service/peerstore/meson.build
index 2c6f7eba8..8f0950b93 100644
--- a/src/service/peerstore/meson.build
+++ b/src/service/peerstore/meson.build
@@ -1,4 +1,5 @@
libgnunetpeerstore_src = ['peerstore_api.c',
+ 'peerstore_api_monitor.c',
'peerstore_common.c']
gnunetservicepeerstore_src = ['gnunet-service-peerstore.c']
@@ -86,7 +87,7 @@ test('test_peerstore_api_watch', testpeerstore_api_watch,
suite: 'peerstore', workdir: meson.current_build_dir())
test('test_peerstore_api_iterate', testpeerstore_api_iterate,
suite: 'peerstore', workdir: meson.current_build_dir())
-test('perf_peerstore_store', testpeerstore_api_perf,
+test('perf_peerstore_api_store', testpeerstore_api_perf,
suite: 'peerstore', workdir: meson.current_build_dir())
diff --git a/src/service/peerstore/peerstore.h
b/src/service/peerstore/peerstore.h
index 26c656f00..c005d329b 100644
--- a/src/service/peerstore/peerstore.h
+++ b/src/service/peerstore/peerstore.h
@@ -34,7 +34,7 @@ GNUNET_NETWORK_STRUCT_BEGIN
/**
* Message carrying a PEERSTORE record message
*/
-struct StoreRecordMessage
+struct PeerstoreRecordMessage
{
/**
* GNUnet message header
@@ -42,25 +42,25 @@ struct StoreRecordMessage
struct GNUNET_MessageHeader header;
/**
- * #GNUNET_YES if peer id value set, #GNUNET_NO otherwise
+ * Peer Identity
*/
- uint16_t peer_set GNUNET_PACKED;
+ struct GNUNET_PeerIdentity peer;
/**
- * Size of the sub_system string
- * Allocated at position 0 after this struct
+ * Expiry time of entry
*/
- uint16_t sub_system_size GNUNET_PACKED;
+ struct GNUNET_TIME_AbsoluteNBO expiry;
- /**
- * Peer Identity
+/**
+ * Request id.
*/
- struct GNUNET_PeerIdentity peer;
+ uint32_t rid GNUNET_PACKED;
/**
- * Expiry time of entry
+ * Options, needed only in case of a
+ * store operation
*/
- struct GNUNET_TIME_AbsoluteNBO expiry;
+ uint32_t /* enum GNUNET_PEERSTORE_StoreOption */ options GNUNET_PACKED;
/**
* Size of the key string
@@ -74,16 +74,17 @@ struct StoreRecordMessage
*/
uint16_t value_size GNUNET_PACKED;
+
/**
- * Request id.
+ * Size of the sub_system string
+ * Allocated at position 0 after this struct
*/
- uint32_t rid GNUNET_PACKED;
+ uint16_t sub_system_size GNUNET_PACKED;
/**
- * Options, needed only in case of a
- * store operation
+ * reserved
*/
- uint32_t /* enum GNUNET_PEERSTORE_StoreOption */ options GNUNET_PACKED;
+ uint16_t reserved GNUNET_PACKED;
/* Followed by key and value */
};
@@ -130,7 +131,148 @@ struct StoreKeyHashMessage
* Hash of a record key
*/
struct GNUNET_HashCode keyhash;
+};
+
+/**
+ * Iteration start message
+ */
+struct PeerstoreMonitorStartMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+
+ /**
+ * Size of the key string
+ * Allocated at position 1 after this struct
+ */
+ uint16_t key_size GNUNET_PACKED;
+
+ /**
+ * #GNUNET_YES if peer id value set, #GNUNET_NO otherwise
+ */
+ uint16_t peer_set GNUNET_PACKED;
+
+ /**
+ * Size of the sub_system string
+ * Allocated at position 0 after this struct
+ */
+ uint16_t sub_system_size GNUNET_PACKED;
+
+ /**
+ * #GNUNET_YES if iterate first, #GNUNET_NO otherwise
+ */
+ uint16_t iterate_first GNUNET_PACKED;
+
+ /* Followed by key */
+};
+
+/**
+ * Iteration next message
+ */
+struct PeerstoreMonitorNextMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Number of records to return.
+ */
+ uint64_t limit GNUNET_PACKED;
+
+};
+
+/**
+ * Iteration start message
+ */
+struct PeerstoreIterationStartMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+
+ /**
+ * #GNUNET_YES if peer id value set, #GNUNET_NO otherwise
+ */
+ uint16_t peer_set GNUNET_PACKED;
+
+ /**
+ * Size of the sub_system string
+ * Allocated at position 0 after this struct
+ */
+ uint16_t sub_system_size GNUNET_PACKED;
+
+ /**
+ * reserved
+ */
+ uint16_t reserved GNUNET_PACKED;
+
+ /**
+ * Size of the key string
+ * Allocated at position 1 after this struct
+ */
+ uint16_t key_size GNUNET_PACKED;
+
+ /* Followed by subsystem and key */
+};
+
+
+/**
+ * Iteration next message
+ */
+struct PeerstoreIterationNextMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Number of records to return.
+ */
+ uint64_t limit GNUNET_PACKED;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+};
+
+struct PeerstoreIterationStopMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
};
GNUNET_NETWORK_STRUCT_END
diff --git a/src/service/peerstore/peerstore_api.c
b/src/service/peerstore/peerstore_api.c
index feb5aeb3c..f52879b5c 100644
--- a/src/service/peerstore/peerstore_api.c
+++ b/src/service/peerstore/peerstore_api.c
@@ -23,6 +23,8 @@
* @author Omar Tarabai
* @author Christian Grothoff
*/
+#include "gnunet_common.h"
+#include "gnunet_protocols.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_hello_uri_lib.h"
@@ -162,6 +164,11 @@ struct GNUNET_PEERSTORE_StoreContext
* Options for the store operation.
*/
enum GNUNET_PEERSTORE_StoreOption options;
+
+ /**
+ * Temporary envelope
+ */
+ struct GNUNET_MQ_Envelope *env;
};
/**
@@ -230,70 +237,13 @@ struct GNUNET_PEERSTORE_IterateContext
*/
uint32_t rid;
-};
-
-/**
- * Context for a watch request
- */
-struct GNUNET_PEERSTORE_WatchContext
-{
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_WatchContext *next;
-
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_WatchContext *prev;
-
- /**
- * Handle to the PEERSTORE service.
- */
- struct GNUNET_PEERSTORE_Handle *h;
-
/**
- * Callback with each record received
- */
- GNUNET_PEERSTORE_Processor callback;
-
- /**
- * Closure for @e callback
+ * Temporary envelope
*/
- void *callback_cls;
-
- /**
- * Hash of the combined key
- */
- struct GNUNET_HashCode keyhash;
-
- /**
- * The iteration context to deliver the actual values for the key.
- */
- struct GNUNET_PEERSTORE_IterateContext *ic;
-
- /**
- * The peer we are watching for values.
- */
- const struct GNUNET_PeerIdentity *peer;
-
- /**
- * The key we like to watch for values.
- */
- const char *key;
-
- /**
- * The sub system requested the watch.
- */
- const char *sub_system;
-
- /**
- * Request ID
- */
- uint32_t rid;
-
+ struct GNUNET_MQ_Envelope *env;
};
+
/**
* Context for the info handler.
*/
@@ -317,7 +267,7 @@ struct GNUNET_PEERSTORE_NotifyContext
/**
* The watch for this context.
*/
- struct GNUNET_PEERSTORE_WatchContext *wc;
+ struct GNUNET_PEERSTORE_Monitor *wc;
/**
* Is this request canceled.
@@ -419,30 +369,6 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
}
-/**
- * Iterator over previous watches to resend them
- *
- * @param cls the `struct GNUNET_PEERSTORE_Handle`
- * @param key key for the watch
- * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
-{
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_WatchContext *wc = value;
- struct StoreKeyHashMessage *hm;
- struct GNUNET_MQ_Envelope *ev;
-
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
- hm->keyhash = wc->keyhash;
- hm->rid = get_op_id (h);
- GNUNET_MQ_send (h->mq, ev);
- return GNUNET_YES;
-}
-
-
/**
* Connect to the PEERSTORE service.
*
@@ -548,6 +474,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
sc->rid = get_op_id (h);
sc->sub_system = GNUNET_strdup (sub_system);
+ GNUNET_assert (NULL != peer);
sc->peer = *peer;
sc->key = GNUNET_strdup (key);
sc->value = GNUNET_memdup (value, size);
@@ -569,7 +496,14 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
- GNUNET_MQ_send (h->mq, ev);
+ if (NULL == h->mq)
+ {
+ sc->env = ev;
+ }
+ else
+ {
+ GNUNET_MQ_send (h->mq, ev);
+ }
return sc;
}
@@ -589,8 +523,7 @@ handle_store_result (void *cls, const struct
PeerstoreResultMessage *msg)
LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerstoreResultMessage\n");
for (sc = h->store_head; NULL != sc; sc = sc->next)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Trying %u ?= %u\n", sc->rid, msg->rid);
- if (sc->rid == msg->rid)
+ if (sc->rid == ntohs (msg->rid))
break;
}
if (NULL == sc)
@@ -623,7 +556,7 @@ handle_iterate_end (void *cls, const struct
PeerstoreResultMessage *msg)
struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
for (ic = h->iterate_head; NULL != ic; ic = ic->next)
- if (ic->rid == msg->rid)
+ if (ic->rid == ntohs (msg->rid))
break;
if (NULL == ic)
{
@@ -646,7 +579,7 @@ handle_iterate_end (void *cls, const struct
PeerstoreResultMessage *msg)
* @param msg message received
*/
static int
-check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
+check_iterate_result (void *cls, const struct PeerstoreRecordMessage *msg)
{
/* we defer validation to #handle_iterate_result */
return GNUNET_OK;
@@ -660,15 +593,15 @@ check_iterate_result (void *cls, const struct
StoreRecordMessage *msg)
* @param msg message received
*/
static void
-handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
+handle_iterate_result (void *cls, const struct PeerstoreRecordMessage *msg)
{
struct GNUNET_PEERSTORE_Handle *h = cls;
struct GNUNET_PEERSTORE_IterateContext *ic;
struct GNUNET_PEERSTORE_Record *record;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received StoreRecordMessage\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received RecordMessage\n");
for (ic = h->iterate_head; NULL != ic; ic = ic->next)
- if (ic->rid == msg->rid)
+ if (ic->rid == ntohs (msg->rid))
break;
if (NULL == ic)
{
@@ -701,8 +634,49 @@ handle_iterate_result (void *cls, const struct
StoreRecordMessage *msg)
* @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
*/
void
-GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
+GNUNET_PEERSTORE_iteration_next (struct GNUNET_PEERSTORE_IterateContext *ic,
+ uint64_t limit)
{
+ struct GNUNET_MQ_Envelope *ev;
+ struct PeerstoreIterationNextMessage *inm;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending PEERSTORE_ITERATION_NEXT message\n");
+ ev = GNUNET_MQ_msg (inm, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_NEXT);
+ inm->rid = htons (ic->rid);
+ inm->limit = GNUNET_htonll (limit);
+ if (NULL == ic->h->mq)
+ {
+ ic->env = ev;
+ }
+ else
+ {
+ GNUNET_MQ_send (ic->h->mq, ev);
+ }
+}
+
+
+/**
+ * Cancel an iterate request
+ * Please do not call after the iterate request is done
+ *
+ * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
+ */
+void
+GNUNET_PEERSTORE_iteration_stop (struct GNUNET_PEERSTORE_IterateContext *ic)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct PeerstoreIterationStopMessage *ism;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending PEERSTORE_ITERATION_STOP message\n");
+ if (NULL != ic->h->mq)
+ {
+ ev = GNUNET_MQ_msg (ism, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+ ism->rid = htons (ic->rid);
+ if (NULL != ic->h->mq)
+ GNUNET_MQ_send (ic->h->mq, ev);
+ }
GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
GNUNET_free (ic->sub_system);
GNUNET_free (ic->key);
@@ -711,34 +685,50 @@ GNUNET_PEERSTORE_iterate_cancel (struct
GNUNET_PEERSTORE_IterateContext *ic)
struct GNUNET_PEERSTORE_IterateContext *
-GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls)
+GNUNET_PEERSTORE_iteration_start (struct GNUNET_PEERSTORE_Handle *h,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls)
{
struct GNUNET_MQ_Envelope *ev;
+ struct PeerstoreIterationStartMessage *srm;
struct GNUNET_PEERSTORE_IterateContext *ic;
+ size_t ss_size;
+ size_t key_size;
+ size_t msg_size;
+ void *dummy;
ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
ic->rid = get_op_id (h);
- ev =
- PEERSTORE_create_record_mq_envelope (ic->rid,
- sub_system,
- peer,
- key,
- NULL,
- 0,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0,
-
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
+
+ GNUNET_assert (NULL != sub_system);
+ ss_size = strlen (sub_system) + 1;
+ if (NULL == key)
+ key_size = 0;
+ else
+ key_size = strlen (key) + 1;
+ msg_size = ss_size + key_size;
+ ev = GNUNET_MQ_msg_extra (srm, msg_size,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_START);
+ srm->key_size = htons (key_size);
+ srm->rid = htons (ic->rid);
+ srm->sub_system_size = htons (ss_size);
+ dummy = &srm[1];
+ GNUNET_memcpy (dummy, sub_system, ss_size);
+ dummy += ss_size;
+ GNUNET_memcpy (dummy, key, key_size);
ic->callback = callback;
ic->callback_cls = callback_cls;
ic->h = h;
ic->sub_system = GNUNET_strdup (sub_system);
if (NULL != peer)
+ {
ic->peer = *peer;
+ srm->peer_set = htons (GNUNET_YES);
+ srm->peer = *peer;
+ }
if (NULL != key)
ic->key = GNUNET_strdup (key);
GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
@@ -750,63 +740,6 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle
*h,
}
-/******************************************************************************/
-/******************* WATCH FUNCTIONS
*********************/
-/******************************************************************************/
-
-/**
- * When a watch record is received, validate it is well-formed.
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
-static int
-check_watch_record (void *cls, const struct StoreRecordMessage *msg)
-{
- /* we defer validation to #handle_watch_result */
- return GNUNET_OK;
-}
-
-
-/**
- * When a watch record is received, process it.
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
-static void
-handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
-{
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_Record *record;
- struct GNUNET_HashCode keyhash;
- struct GNUNET_PEERSTORE_WatchContext *wc;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
- record = PEERSTORE_parse_record_message (msg);
- if (NULL == record)
- {
- disconnect_and_schedule_reconnect (h);
- return;
- }
- PEERSTORE_hash_key (record->sub_system, &record->peer, record->key,
&keyhash);
- // FIXME: what if there are multiple watches for the same key?
- wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
- if (NULL == wc)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _ ("Received a watch result for a non existing watch.\n"));
- PEERSTORE_destroy_record (record);
- disconnect_and_schedule_reconnect (h);
- return;
- }
- h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- if (NULL != wc->callback)
- wc->callback (wc->callback_cls, record, NULL);
- PEERSTORE_destroy_record (record);
-}
-
-
/**
* Close the existing connection to PEERSTORE and reconnect.
*
@@ -826,16 +759,11 @@ reconnect (void *cls)
struct PeerstoreResultMessage,
h),
GNUNET_MQ_hd_var_size (iterate_result,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
- struct StoreRecordMessage,
- h),
- GNUNET_MQ_hd_var_size (watch_record,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
- struct StoreRecordMessage,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD,
+ struct PeerstoreRecordMessage,
h),
GNUNET_MQ_handler_end ()
};
- struct GNUNET_MQ_Envelope *ev;
h->reconnect_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
@@ -853,272 +781,25 @@ reconnect (void *cls)
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Resending pending requests after reconnect.\n");
- if (NULL != h->watches)
- GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL !=
ic;
ic = ic->next)
{
- ic->rid = get_op_id (h);
- ev =
- PEERSTORE_create_record_mq_envelope (ic->rid,
- ic->sub_system,
- &ic->peer,
- ic->key,
- NULL,
- 0,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0,
-
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
- GNUNET_MQ_send (h->mq, ev);
+ GNUNET_MQ_send (h->mq, ic->env);
}
for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
sc = sc->next)
{
- sc->rid = get_op_id (h);
- ev =
- PEERSTORE_create_record_mq_envelope (sc->rid,
- sc->sub_system,
- &sc->peer,
- sc->key,
- sc->value,
- sc->size,
- sc->expiry,
- sc->options,
-
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
- GNUNET_MQ_send (h->mq, ev);
- }
-}
-
-
-/**
- * Cancel a watch request
- *
- * @param wc handle to the watch request
- */
-void
-GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
-{
- struct GNUNET_PEERSTORE_Handle *h = wc->h;
- struct GNUNET_MQ_Envelope *ev;
- struct StoreKeyHashMessage *hm;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Cancelling watch.\n");
- if (NULL != wc->ic)
- {
- GNUNET_PEERSTORE_iterate_cancel (wc->ic);
- GNUNET_free (wc);
- return;
+ GNUNET_MQ_send (h->mq, sc->env);
}
-
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
- hm->keyhash = wc->keyhash;
- hm->rid = get_op_id (h);
- GNUNET_MQ_send (h->mq, ev);
- GNUNET_assert (
- GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
- GNUNET_free (wc);
}
static void
-watch_iterate (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
+hello_store_success (void *cls, int success)
{
- struct GNUNET_PEERSTORE_WatchContext *wc = cls;
- struct GNUNET_PEERSTORE_Handle *h = wc->h;
- struct StoreKeyHashMessage *hm;
- struct GNUNET_MQ_Envelope *ev;
- const struct GNUNET_PeerIdentity *peer;
-
- if (NULL != emsg)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
- wc->callback (wc->callback_cls, NULL, emsg);
- return;
- }
- if ((NULL != record) &&
- (NULL != wc->callback))
- {
- wc->callback (wc->callback_cls, record, NULL);
- return;
- }
-
- if (NULL == wc->peer)
- peer = GNUNET_new (struct GNUNET_PeerIdentity);
- else
- peer = wc->peer;
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
- PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
- hm->rid = get_op_id (h);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Hash key we watch for %s\n",
- GNUNET_h2s_full (&hm->keyhash));
- wc->keyhash = hm->keyhash;
- if (NULL == h->watches)
- h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
- GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
- h->watches,
- &wc->keyhash,
- wc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
- wc->sub_system,
- GNUNET_i2s (peer),
- wc->key);
- GNUNET_MQ_send (h->mq, ev);
- wc->ic = NULL;
- if (NULL != wc->callback)
- wc->callback (wc->callback_cls, record, NULL);
- if (NULL == wc->peer)
- GNUNET_free_nz ((void *) peer);
- return;
-
-}
-
-
-/**
- * Request watching a given key
- * User will be notified with any new values added to key,
- * all existing entries are supplied beforehand.
- *
- * @param h handle to the PEERSTORE service
- * @param sub_system name of sub system
- * @param peer Peer identity
- * @param key entry key string
- * @param callback function called with each new value
- * @param callback_cls closure for @a callback
- * @return Handle to watch request
- */
-struct GNUNET_PEERSTORE_WatchContext *
-GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls)
-{
- struct GNUNET_PEERSTORE_WatchContext *wc;
-
- wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
- wc->callback = callback;
- wc->callback_cls = callback_cls;
- wc->h = h;
- wc->key = key;
- wc->peer = peer;
- wc->sub_system = sub_system;
-
- wc->ic = GNUNET_PEERSTORE_iterate (h,
- sub_system,
- peer,
- key,
- &watch_iterate,
- wc);
-
- return wc;
-}
-
-
-/******************************************************************************/
-/******************* HELLO FUNCTIONS
*********************/
-/******************************************************************************/
-
-
-static void
-hello_updated (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
-{
- struct GNUNET_PEERSTORE_NotifyContext *nc = cls;
- const struct GNUNET_MessageHeader *hello;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated\n");
- if (NULL != emsg)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
- nc->callback (nc->callback_cls, NULL, NULL, emsg);
- return;
- }
- if (NULL == record)
- return;
- hello = record->value;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated with expired %s and size %u for peer %s\n",
- GNUNET_STRINGS_absolute_time_to_string (
- GNUNET_HELLO_builder_get_expiration_time (hello)),
- ntohs (hello->size),
- GNUNET_i2s (&record->peer));
- if ((0 == record->value_size))
- {
- GNUNET_break (0);
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated call callback\n");
- nc->callback (nc->callback_cls, &record->peer, hello, NULL);
-}
-
-
-struct GNUNET_PEERSTORE_NotifyContext *
-GNUNET_PEERSTORE_hello_changed_notify (struct GNUNET_PEERSTORE_Handle *h,
- int include_friend_only,
- GNUNET_PEERSTORE_hello_notify_cb
callback,
- void *callback_cls)
-{
- struct GNUNET_PEERSTORE_NotifyContext *nc;
-
- nc = GNUNET_new (struct GNUNET_PEERSTORE_NotifyContext);
- nc->callback = callback;
- nc->callback_cls = callback_cls;
- nc->h = h;
-
- nc->wc = GNUNET_PEERSTORE_watch (h,
- "peerstore",
- NULL,
- GNUNET_PEERSTORE_HELLO_KEY,
- &hello_updated,
- nc);
-
- return nc;
-}
-
-
-/**
- * Stop notifying about changes.
- *
- * @param nc context to stop notifying
- */
-void
-GNUNET_PEERSTORE_hello_changed_notify_cancel (struct
- GNUNET_PEERSTORE_NotifyContext
*nc)
-{
- if (NULL != nc->wc)
- {
- GNUNET_PEERSTORE_watch_cancel (nc->wc);
- nc->wc = NULL;
- }
- GNUNET_free (nc);
-}
-
-
-static void
-merge_success (void *cls, int success)
-{
- struct StoreHelloCls *shu_cls = cls;
- struct GNUNET_PEERSTORE_StoreHelloContext *huc = shu_cls->huc;
+ struct GNUNET_PEERSTORE_StoreHelloContext *huc = cls;
- if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_remove
(huc->store_context_map,
- huc->pid,
shu_cls->sc))
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "There was no store context to be removed after storing hello for
peer %s\n",
- GNUNET_i2s (huc->pid));
+ huc->sc = NULL;
if (GNUNET_OK != success)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -1127,112 +808,12 @@ merge_success (void *cls, int success)
GNUNET_free (huc->hello);
GNUNET_free (huc->pid);
GNUNET_free (huc);
- GNUNET_free (shu_cls);
- return;
- }
- if (0 == GNUNET_CONTAINER_multipeermap_size (huc->store_context_map))
- {
- GNUNET_PEERSTORE_watch_cancel (huc->wc);
- huc->wc = NULL;
- huc->cont (huc->cont_cls, GNUNET_OK);
- huc->success = GNUNET_OK;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Storing hello uri succeeded for peer %s!\n",
- GNUNET_i2s (huc->pid));
- GNUNET_free (huc->hello);
- GNUNET_free (huc->pid);
- GNUNET_free (huc);
- GNUNET_free (shu_cls);
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got notified during storing hello uri for peer %s!\n",
- GNUNET_i2s (huc->pid));
- GNUNET_free (shu_cls);
-}
-
-
-static void
-store_hello (struct GNUNET_PEERSTORE_StoreHelloContext *huc,
- const struct GNUNET_MessageHeader *hello)
-{
- struct GNUNET_PEERSTORE_Handle *h = huc->h;
- struct GNUNET_PEERSTORE_StoreContext *sc;
- struct StoreHelloCls *shu_cls = GNUNET_new (struct StoreHelloCls);
- struct GNUNET_TIME_Absolute hello_exp;
-
- shu_cls->huc = huc;
- hello_exp = GNUNET_HELLO_builder_get_expiration_time (hello);
- sc = GNUNET_PEERSTORE_store (h,
- "peerstore",
- huc->pid,
- GNUNET_PEERSTORE_HELLO_KEY,
- hello,
- ntohs (hello->size),
- hello_exp,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- merge_success,
- shu_cls);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "store_hello with expiration %s\n",
- GNUNET_STRINGS_absolute_time_to_string (hello_exp));
- GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_put (
- huc->store_context_map,
- huc->pid,
- sc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- shu_cls->sc = sc;
-}
-
-
-// TODO Find a better name for the function. We do not merge, but replace, if
there is a storing process
-// during another store process with a newer hello.
-static void
-merge_uri (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
-{
- struct GNUNET_PEERSTORE_StoreHelloContext *huc = cls;
- struct GNUNET_MessageHeader *hello;
- struct GNUNET_TIME_Absolute huc_hello_exp_time;
- struct GNUNET_TIME_Absolute record_hello_exp_time;
-
- if (NULL != emsg)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
- return;
- }
-
- if (NULL == record && GNUNET_NO == huc->success)
- {
- huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "merge_uri just store for peer %s with expiration %s\n",
- GNUNET_i2s (huc->pid),
- GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
- store_hello (huc, huc->hello);
- }
- else if (GNUNET_NO == huc->success && 0 == GNUNET_memcmp (huc->pid,
- &record->peer))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "merge_uri record for peer %s\n",
- GNUNET_i2s (&record->peer));
- hello = record->value;
- if ((0 == record->value_size))
- {
- GNUNET_break (0);
- return;
- }
-
- huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
- record_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (hello);
-
- if (GNUNET_TIME_absolute_cmp (huc_hello_exp_time, >,
record_hello_exp_time))
- store_hello (huc, huc->hello);
- }
+ huc->cont (huc->cont_cls, GNUNET_OK);
+ GNUNET_free (huc->hello);
+ GNUNET_free (huc->pid);
+ GNUNET_free (huc);
}
@@ -1258,7 +839,6 @@ GNUNET_PEERSTORE_hello_add (struct GNUNET_PEERSTORE_Handle
*h,
return NULL;
huc = GNUNET_new (struct GNUNET_PEERSTORE_StoreHelloContext);
- huc->store_context_map = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
huc->h = h;
huc->cont = cont;
huc->cont_cls = cont_cls;
@@ -1275,39 +855,26 @@ GNUNET_PEERSTORE_hello_add (struct
GNUNET_PEERSTORE_Handle *h,
GNUNET_i2s (huc->pid),
GNUNET_STRINGS_absolute_time_to_string (huc_exp),
size_msg);
- huc->wc = GNUNET_PEERSTORE_watch (h,
+ huc->sc = GNUNET_PEERSTORE_store (h,
"peerstore",
- NULL,
+ huc->pid,
GNUNET_PEERSTORE_HELLO_KEY,
- &merge_uri,
+ huc->hello,
+ ntohs (huc->hello->size),
+ hello_exp,
+
GNUNET_PEERSTORE_STOREOPTION_UPSERT_LATER_EXPIRY,
+ &hello_store_success,
huc);
GNUNET_HELLO_builder_free (builder);
-
return huc;
}
-static enum GNUNET_GenericReturnValue
-free_store_context (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
-{
- (void) cls;
-
- GNUNET_PEERSTORE_store_cancel ((struct
- GNUNET_PEERSTORE_StoreContext *) value);
- return GNUNET_YES; // FIXME why is this a map anyway
-}
-
-
void
GNUNET_PEERSTORE_hello_add_cancel (struct
GNUNET_PEERSTORE_StoreHelloContext *huc)
{
- GNUNET_PEERSTORE_watch_cancel (huc->wc);
- GNUNET_CONTAINER_multipeermap_iterate (huc->store_context_map,
- free_store_context,
- NULL);
+ GNUNET_PEERSTORE_store_cancel (huc->sc);
GNUNET_free (huc->hello);
GNUNET_free (huc->pid);
GNUNET_free (huc);
diff --git a/src/service/peerstore/peerstore_api_monitor.c
b/src/service/peerstore/peerstore_api_monitor.c
new file mode 100644
index 000000000..8badb43eb
--- /dev/null
+++ b/src/service/peerstore/peerstore_api_monitor.c
@@ -0,0 +1,297 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2013-2024, 2019 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file service/peerstore/peerstore_api.c
+ * @brief API for peerstore
+ * @author Martin Schanzenbach
+ */
+#include "gnunet_common.h"
+#include "gnunet_protocols.h"
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "peerstore.h"
+#include "peerstore_common.h"
+#include "gnunet_peerstore_service.h"
+
+#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-monitor-api", \
+ __VA_ARGS__)
+/**
+ * Context for a monitor
+ */
+struct GNUNET_PEERSTORE_Monitor
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_Monitor *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_Monitor *prev;
+
+ /**
+ * Function to call on errors.
+ */
+ GNUNET_SCHEDULER_TaskCallback error_cb;
+
+ /**
+ * Closure for @e error_cb.
+ */
+ void *error_cb_cls;
+
+ /**
+ * Callback with each record received
+ */
+ GNUNET_PEERSTORE_Processor callback;
+
+ /**
+ * Closure for @e callback
+ */
+ void *callback_cls;
+
+ /**
+ * Hash of the combined key
+ */
+ struct GNUNET_HashCode keyhash;
+
+ /**
+ * The peer we are watching for values.
+ */
+ const struct GNUNET_PeerIdentity *peer;
+
+ /**
+ * The key we like to watch for values.
+ */
+ const char *key;
+
+ /**
+ * The sub system requested the watch.
+ */
+ const char *sub_system;
+
+ /**
+ * Request ID
+ */
+ uint32_t rid;
+
+ /**
+ * CFG
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Sync CB
+ */
+ GNUNET_SCHEDULER_TaskCallback sync_cb;
+
+ /**
+ * Sync CB cls
+ */
+ void *sync_cb_cls;
+
+ /**
+ * MQ
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Iterate first flag
+ */
+ int iterate_first;
+};
+
+static void
+handle_sync (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc = cls;
+
+ if (NULL != mc->sync_cb)
+ mc->sync_cb (mc->sync_cb_cls);
+}
+
+
+/**
+ * When a response for iterate request is received, check the
+ * message is well-formed.
+ *
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
+ * @param msg message received
+ */
+static int
+check_result (void *cls, const struct PeerstoreRecordMessage *msg)
+{
+ /* we defer validation to #handle_result */
+ return GNUNET_OK;
+}
+
+
+/**
+ * When a response to monitor is received
+ *
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
+ * @param msg message received
+ */
+static void
+handle_result (void *cls, const struct PeerstoreRecordMessage *msg)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc = cls;
+ struct GNUNET_PEERSTORE_Record *record;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Monitor received RecordMessage\n");
+ record = PEERSTORE_parse_record_message (msg);
+ if (NULL == record)
+ {
+ mc->callback (mc->callback_cls,
+ NULL,
+ _ ("Received a malformed response from service."));
+ }
+ else
+ {
+ mc->callback (mc->callback_cls, record, NULL);
+ PEERSTORE_destroy_record (record);
+ }
+}
+
+
+static void reconnect (struct GNUNET_PEERSTORE_Monitor *mc);
+
+static void
+mq_error_handler (void *cls, enum GNUNET_MQ_Error err)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc = cls;
+
+ reconnect (mc);
+}
+
+
+static void
+reconnect (struct GNUNET_PEERSTORE_Monitor *mc)
+{
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_fixed_size (sync,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_SYNC,
+ struct GNUNET_MessageHeader,
+ mc),
+ GNUNET_MQ_hd_var_size (result,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD,
+ struct PeerstoreRecordMessage, mc),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct PeerstoreMonitorStartMessage *sm;
+ size_t key_len = 0;
+
+ if (NULL != mc->mq)
+ {
+ GNUNET_MQ_destroy (mc->mq);
+ mc->error_cb (mc->error_cb_cls);
+ }
+ mc->mq = GNUNET_CLIENT_connect (mc->cfg,
+ "peerstore",
+ handlers,
+ &mq_error_handler,
+ mc);
+ if (NULL == mc->mq)
+ return;
+ if (NULL != mc->key)
+ key_len = strlen (mc->key) + 1;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending MONITOR_START\n");
+ env = GNUNET_MQ_msg_extra (sm,
+ htons (key_len),
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_START);
+ sm->iterate_first = htons (mc->iterate_first);
+ if (NULL != mc->peer)
+ {
+ sm->peer = *mc->peer;
+ sm->peer_set = htons (GNUNET_YES);
+ }
+ if (NULL != mc->key)
+ GNUNET_memcpy (&sm[1], mc->key, key_len);
+ sm->key_size = htons (key_len);
+ GNUNET_MQ_send (mc->mq, env);
+}
+
+
+struct GNUNET_PEERSTORE_Monitor *
+GNUNET_PEERSTORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ int iterate_first,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_SCHEDULER_TaskCallback error_cb,
+ void *error_cb_cls,
+ GNUNET_SCHEDULER_TaskCallback sync_cb,
+ void *sync_cb_cls,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc;
+
+ mc = GNUNET_new (struct GNUNET_PEERSTORE_Monitor);
+ mc->callback = callback;
+ mc->callback_cls = callback_cls;
+ mc->sync_cb = sync_cb;
+ mc->sync_cb_cls = sync_cb_cls;
+ mc->error_cb = error_cb;
+ mc->error_cb_cls = error_cb_cls;
+ mc->key = key;
+ mc->peer = peer;
+ mc->iterate_first = iterate_first;
+ mc->sub_system = sub_system;
+ mc->cfg = cfg;
+ reconnect (mc);
+ if (NULL == mc->mq)
+ {
+ GNUNET_free (mc);
+ return NULL;
+ }
+ return mc;
+}
+
+
+/**
+ * Stop monitoring.
+ *
+ * @param zm handle to the monitor activity to stop
+ */
+void
+GNUNET_PEERSTORE_monitor_stop (struct GNUNET_PEERSTORE_Monitor *zm)
+{
+ if (NULL != zm->mq)
+ {
+ GNUNET_MQ_destroy (zm->mq);
+ zm->mq = NULL;
+ }
+ GNUNET_free (zm);
+}
+
+
+void
+GNUNET_PEERSTORE_monitor_next (struct GNUNET_PEERSTORE_Monitor *zm,
+ uint64_t limit)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct PeerstoreMonitorNextMessage *nm;
+
+ env = GNUNET_MQ_msg (nm, GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT);
+ nm->limit = GNUNET_htonll (limit);
+ GNUNET_MQ_send (zm->mq, env);
+}
diff --git a/src/service/peerstore/peerstore_common.c
b/src/service/peerstore/peerstore_common.c
index 5d4d06c0c..435444917 100644
--- a/src/service/peerstore/peerstore_common.c
+++ b/src/service/peerstore/peerstore_common.c
@@ -70,7 +70,7 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
enum GNUNET_PEERSTORE_StoreOption options,
uint16_t msg_type)
{
- struct StoreRecordMessage *srm;
+ struct PeerstoreRecordMessage *srm;
struct GNUNET_MQ_Envelope *ev;
size_t ss_size;
size_t key_size;
@@ -87,14 +87,9 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
ev = GNUNET_MQ_msg_extra (srm, msg_size, msg_type);
srm->key_size = htons (key_size);
srm->expiry = GNUNET_TIME_absolute_hton (expiry);
- if (NULL == peer)
- srm->peer_set = htons (GNUNET_NO);
- else
- {
- srm->peer_set = htons (GNUNET_YES);
- srm->peer = *peer;
- }
- srm->rid = rid;
+ GNUNET_assert (NULL != peer);
+ srm->peer = *peer;
+ srm->rid = htons (rid);
srm->sub_system_size = htons (ss_size);
srm->value_size = htons (value_size);
srm->options = htonl (options);
@@ -109,7 +104,7 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
struct GNUNET_PEERSTORE_Record *
-PEERSTORE_parse_record_message (const struct StoreRecordMessage *srm)
+PEERSTORE_parse_record_message (const struct PeerstoreRecordMessage *srm)
{
struct GNUNET_PEERSTORE_Record *record;
uint16_t req_size;
@@ -128,10 +123,7 @@ PEERSTORE_parse_record_message (const struct
StoreRecordMessage *srm)
return NULL;
}
record = GNUNET_new (struct GNUNET_PEERSTORE_Record);
- if (GNUNET_YES == ntohs (srm->peer_set))
- {
- record->peer = srm->peer;
- }
+ record->peer = srm->peer;
record->expiry = GNUNET_TIME_absolute_ntoh (srm->expiry);
dummy = (char *) &srm[1];
if (ss_size > 0)
diff --git a/src/service/peerstore/peerstore_common.h
b/src/service/peerstore/peerstore_common.h
index 56f1f8b8b..b0239e66e 100644
--- a/src/service/peerstore/peerstore_common.h
+++ b/src/service/peerstore/peerstore_common.h
@@ -70,7 +70,7 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
* @return Pointer to record or NULL on error
*/
struct GNUNET_PEERSTORE_Record *
-PEERSTORE_parse_record_message (const struct StoreRecordMessage *srm);
+PEERSTORE_parse_record_message (const struct PeerstoreRecordMessage *srm);
/**
diff --git a/src/service/peerstore/perf_peerstore_store.c
b/src/service/peerstore/perf_peerstore_store.c
index e328be93e..3386ccebd 100644
--- a/src/service/peerstore/perf_peerstore_store.c
+++ b/src/service/peerstore/perf_peerstore_store.c
@@ -32,7 +32,7 @@ static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
-static struct GNUNET_PEERSTORE_WatchContext *wc;
+static struct GNUNET_PEERSTORE_Monitor *wc;
static char *ss = "test_peerstore_stress";
static struct GNUNET_PeerIdentity p;
@@ -45,7 +45,7 @@ static int count_fin = 0;
static void
disconnect (void *cls)
{
- GNUNET_PEERSTORE_watch_cancel (wc);
+ GNUNET_PEERSTORE_monitor_stop (wc);
if (NULL != h)
GNUNET_PEERSTORE_disconnect (h);
GNUNET_SCHEDULER_shutdown ();
@@ -56,7 +56,7 @@ static void
store_cont (void *cls, int ret)
{
count_fin++;
- if (count_fin == count)
+ if ((STORES == count) && (count_fin == count))
{
ok = 0;
GNUNET_SCHEDULER_add_now (&disconnect, NULL);
@@ -86,6 +86,22 @@ watch_cb (void *cls, const struct GNUNET_PEERSTORE_Record
*record,
store ();
}
+static void
+error_cb (void *cls)
+{
+ // Never reach this
+}
+
+static void
+sync_cb (void *cls)
+{
+ static int initial_sync = 0;
+ if (1 == initial_sync)
+ return;
+ store ();
+ initial_sync = 1;
+ return;
+}
static void
run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -94,7 +110,11 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle
*cfg,
memset (&p, 5, sizeof(p));
h = GNUNET_PEERSTORE_connect (cfg);
GNUNET_assert (NULL != h);
- wc = GNUNET_PEERSTORE_watch (h, ss, &p, k, &watch_cb, NULL);
+ wc = GNUNET_PEERSTORE_monitor_start (cfg, GNUNET_YES,
+ ss, &p, k,
+ error_cb, NULL,
+ sync_cb, NULL,
+ &watch_cb, NULL);
store ();
}
diff --git a/src/service/peerstore/test_peerstore_api_iterate.c
b/src/service/peerstore/test_peerstore_api_iterate.c
index 0fa58f3fd..8d072d418 100644
--- a/src/service/peerstore/test_peerstore_api_iterate.c
+++ b/src/service/peerstore/test_peerstore_api_iterate.c
@@ -56,12 +56,13 @@ iter3_cb (void *cls,
{
if (NULL != emsg)
{
- GNUNET_PEERSTORE_iterate_cancel (ic);
+ GNUNET_PEERSTORE_iteration_stop (ic);
return;
}
if (NULL != record)
{
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 3);
@@ -77,22 +78,23 @@ iter2_cb (void *cls,
{
if (NULL != emsg)
{
- GNUNET_PEERSTORE_iterate_cancel (ic);
+ GNUNET_PEERSTORE_iteration_stop (ic);
return;
}
if (NULL != record)
{
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 2);
count = 0;
- ic = GNUNET_PEERSTORE_iterate (h,
- ss,
- NULL,
- NULL,
- &iter3_cb,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ ss,
+ NULL,
+ NULL,
+ &iter3_cb,
+ NULL);
}
@@ -103,23 +105,24 @@ iter1_cb (void *cls,
{
if (NULL != emsg)
{
- GNUNET_PEERSTORE_iterate_cancel (ic);
+ GNUNET_PEERSTORE_iteration_stop (ic);
return;
}
if (NULL != record)
{
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u is count\n", count);
GNUNET_assert (count == 1);
count = 0;
- ic = GNUNET_PEERSTORE_iterate (h,
- ss,
- &p1,
- NULL,
- &iter2_cb,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ ss,
+ &p1,
+ NULL,
+ &iter2_cb,
+ NULL);
}
@@ -156,11 +159,11 @@ store_cont (void *cls, int success)
else
{
count = 0;
- ic = GNUNET_PEERSTORE_iterate (h,
- ss,
- &p1,
- k1,
- &iter1_cb, NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ ss,
+ &p1,
+ k1,
+ &iter1_cb, NULL);
return;
}
count++;
diff --git a/src/service/peerstore/test_peerstore_api_store.c
b/src/service/peerstore/test_peerstore_api_store.c
index 8cf0e60a7..1883310b9 100644
--- a/src/service/peerstore/test_peerstore_api_store.c
+++ b/src/service/peerstore/test_peerstore_api_store.c
@@ -30,6 +30,7 @@
static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
+static struct GNUNET_PEERSTORE_IterateContext *ic;
static char *subsystem = "test_peerstore_api_store";
static struct GNUNET_PeerIdentity pid;
@@ -48,6 +49,7 @@ finish (void *cls)
GNUNET_SCHEDULER_shutdown ();
}
+
static void
test3_cont2 (void *cls,
const struct GNUNET_PEERSTORE_Record *record,
@@ -61,6 +63,7 @@ test3_cont2 (void *cls,
GNUNET_assert (0 == strcmp ((char *) val3,
(char *) record->value));
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 1);
@@ -76,12 +79,12 @@ test3_cont (void *cls,
if (GNUNET_YES != success)
return;
count = 0;
- GNUNET_PEERSTORE_iterate (h,
- subsystem,
- &pid,
- key,
- &test3_cont2,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ subsystem,
+ &pid,
+ key,
+ &test3_cont2,
+ NULL);
}
@@ -118,6 +121,7 @@ test2_cont2 (void *cls,
GNUNET_assert ((0 == strcmp ((char *) val1, (char *) record->value)) ||
(0 == strcmp ((char *) val2, (char *) record->value)));
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 2);
@@ -132,11 +136,11 @@ test2_cont (void *cls, int success)
if (GNUNET_YES != success)
return;
count = 0;
- GNUNET_PEERSTORE_iterate (h,
- subsystem,
- &pid, key,
- &test2_cont2,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ subsystem,
+ &pid, key,
+ &test2_cont2,
+ NULL);
}
@@ -174,6 +178,7 @@ test1_cont2 (void *cls,
GNUNET_assert ((strlen (val1) + 1) == record->value_size);
GNUNET_assert (0 == strcmp ((char *) val1, (char *) record->value));
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 1);
@@ -189,12 +194,12 @@ test1_cont (void *cls, int success)
if (GNUNET_YES != success)
return;
count = 0;
- GNUNET_PEERSTORE_iterate (h,
- subsystem,
- &pid,
- key,
- &test1_cont2,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ subsystem,
+ &pid,
+ key,
+ &test1_cont2,
+ NULL);
}
diff --git a/src/service/peerstore/test_peerstore_api_watch.c
b/src/service/peerstore/test_peerstore_api_watch.c
index 63b0e896b..e195bb814 100644
--- a/src/service/peerstore/test_peerstore_api_watch.c
+++ b/src/service/peerstore/test_peerstore_api_watch.c
@@ -33,7 +33,7 @@ static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
-static struct GNUNET_PEERSTORE_WatchContext *wc;
+static struct GNUNET_PEERSTORE_Monitor *wc;
static char *ss = "test_peerstore_api_watch";
@@ -46,8 +46,8 @@ static struct GNUNET_PeerIdentity p;
static void
finish (void *cls)
{
- GNUNET_PEERSTORE_watch_cancel (wc);
GNUNET_PEERSTORE_disconnect (h);
+ GNUNET_PEERSTORE_monitor_stop(wc);
GNUNET_SCHEDULER_shutdown ();
}
@@ -76,17 +76,7 @@ watch_cb (void *cls,
const char *emsg)
{
GNUNET_assert (NULL == emsg);
- if (GNUNET_YES == initial_iteration)
- {
- if (NULL != record)
- {
- GNUNET_break (0);
- return; // Ignore this record, FIXME: Test unclean
- }
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &cont, NULL);
- initial_iteration = GNUNET_NO;
- return;
- }
+ GNUNET_assert (GNUNET_YES != initial_iteration);
if (NULL == record)
{
GNUNET_break (0);
@@ -101,6 +91,20 @@ watch_cb (void *cls,
}
+static void
+sync_cb (void *cls)
+{
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &cont, NULL);
+ initial_iteration = GNUNET_NO;
+}
+
+static void
+error_cb (void *cls)
+{
+ // Never reach this
+ GNUNET_assert (0);
+}
+
static void
run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -112,12 +116,17 @@ run (void *cls,
memset (&p,
4,
sizeof(p));
- wc = GNUNET_PEERSTORE_watch (h,
- ss,
- &p,
- k,
- &watch_cb,
- NULL);
+ wc = GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ ss,
+ &p,
+ k,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &watch_cb,
+ NULL);
}
diff --git a/src/service/topology/gnunet-daemon-topology.c
b/src/service/topology/gnunet-daemon-topology.c
index 8b9360e86..588000358 100644
--- a/src/service/topology/gnunet-daemon-topology.c
+++ b/src/service/topology/gnunet-daemon-topology.c
@@ -36,6 +36,7 @@
#include "gnunet_peerstore_service.h"
#include "gnunet_statistics_service.h"
#include "gnunet_transport_application_service.h"
+#include <assert.h>
/**
@@ -118,7 +119,7 @@ struct GNUNET_SCHEDULER_Task *peerstore_notify_task;
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
-static struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+static struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
* Our configuration.
@@ -725,6 +726,22 @@ consider_for_advertising (const struct
GNUNET_MessageHeader *hello)
}
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ _ ("Error in communication with PEERSTORE service to
monitor.\n"));
+ return;
+}
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ _ ("Finished initial PEERSTORE iteration in monitor.\n"));
+ return;
+}
+
/**
* PEERSTORE calls this function to let us know about a possible peer
* that we might want to connect to.
@@ -736,28 +753,38 @@ consider_for_advertising (const struct
GNUNET_MessageHeader *hello)
*/
static void
process_peer (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *err_msg)
{
struct Peer *pos;
+ struct GNUNET_MessageHeader *hello;
if (NULL != err_msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
_ ("Error in communication with PEERSTORE service: %s\n"),
err_msg);
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (ps, GNUNET_NO, &process_peer,
- NULL);
+ GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ error_cb,
+ NULL,
+ sync_cb,
+ NULL,
+ &process_peer,
+ NULL);
return;
}
- GNUNET_assert (NULL != peer);
+ GNUNET_assert (NULL != record);
+ hello = record->value;
if (NULL == hello)
{
/* free existing HELLO, if any */
- pos = GNUNET_CONTAINER_multipeermap_get (peers, peer);
+ pos = GNUNET_CONTAINER_multipeermap_get (peers, &record->peer);
if (NULL != pos)
{
GNUNET_free (pos->hello);
@@ -770,13 +797,15 @@ process_peer (void *cls,
if (NULL == pos->mq)
free_peer (NULL, &pos->pid, pos);
}
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
return;
}
consider_for_advertising (hello);
- pos = GNUNET_CONTAINER_multipeermap_get (peers, peer);
+ pos = GNUNET_CONTAINER_multipeermap_get (peers, &record->peer);
if (NULL == pos)
- pos = make_peer (peer, hello);
+ pos = make_peer (&record->peer, hello);
attempt_connect (pos);
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
}
@@ -788,7 +817,17 @@ start_notify (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting to process new hellos for gossiping.\n");
peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (ps, GNUNET_NO, &process_peer, NULL);
+ GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &process_peer,
+ NULL);
}
@@ -916,7 +955,7 @@ cleaning_task (void *cls)
}
if (NULL != peerstore_notify)
{
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify = NULL;
}
else if (NULL != peerstore_notify_task)
diff --git a/src/service/transport/gnunet-communicator-tcp.c
b/src/service/transport/gnunet-communicator-tcp.c
index a79d72331..189febfda 100644
--- a/src/service/transport/gnunet-communicator-tcp.c
+++ b/src/service/transport/gnunet-communicator-tcp.c
@@ -972,17 +972,17 @@ queue_destroy (struct Queue *queue)
}
if (NULL != queue->rekey_monotime_get)
{
- GNUNET_PEERSTORE_iterate_cancel (queue->rekey_monotime_get);
+ GNUNET_PEERSTORE_iteration_stop (queue->rekey_monotime_get);
queue->rekey_monotime_get = NULL;
}
if (NULL != queue->handshake_monotime_get)
{
- GNUNET_PEERSTORE_iterate_cancel (queue->handshake_monotime_get);
+ GNUNET_PEERSTORE_iteration_stop (queue->handshake_monotime_get);
queue->handshake_monotime_get = NULL;
}
if (NULL != queue->handshake_ack_monotime_get)
{
- GNUNET_PEERSTORE_iterate_cancel (queue->handshake_ack_monotime_get);
+ GNUNET_PEERSTORE_iteration_stop (queue->handshake_ack_monotime_get);
queue->handshake_ack_monotime_get = NULL;
}
if (NULL != queue->qh)
@@ -1305,6 +1305,7 @@ rekey_monotime_cb (void *cls,
}
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (queue->rekey_monotime_get, 1);
GNUNET_break (0);
return;
}
@@ -1331,6 +1332,7 @@ rekey_monotime_cb (void *cls,
GNUNET_PEERSTORE_STOREOPTION_REPLACE,
&rekey_monotime_store_cb,
queue);
+ GNUNET_PEERSTORE_iteration_next (queue->rekey_monotime_get, 1);
}
@@ -1402,12 +1404,12 @@ do_rekey (struct Queue *queue, const struct TCPRekey
*rekey)
return;
}
queue->rekey_monotonic_time = rekey->monotonic_time;
- queue->rekey_monotime_get = GNUNET_PEERSTORE_iterate (peerstore,
-
"transport_tcp_communicator",
- &queue->target,
-
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_REKEY,
- &rekey_monotime_cb,
- queue);
+ queue->rekey_monotime_get = GNUNET_PEERSTORE_iteration_start (peerstore,
+
"transport_tcp_communicator",
+ &queue->target,
+
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_REKEY,
+
&rekey_monotime_cb,
+ queue);
gcry_cipher_close (queue->in_cipher);
queue->rekeyed = GNUNET_YES;
setup_in_cipher (&rekey->ephemeral, queue);
@@ -1462,6 +1464,7 @@ handshake_ack_monotime_cb (void *cls,
}
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
GNUNET_break (0);
return;
}
@@ -1489,6 +1492,7 @@ handshake_ack_monotime_cb (void *cls,
&
handshake_ack_monotime_store_cb,
queue);
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
}
@@ -1942,14 +1946,16 @@ try_handle_plaintext (struct Queue *queue)
return 0;
}
- queue->handshake_ack_monotime_get = GNUNET_PEERSTORE_iterate (peerstore,
-
"transport_tcp_communicator",
-
&queue->target
- ,
-
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE_ACK,
- &
-
handshake_ack_monotime_cb,
- queue);
+ queue->handshake_ack_monotime_get = GNUNET_PEERSTORE_iteration_start (
+ peerstore,
+ "transport_tcp_communicator",
+ &queue->
+ target
+ ,
+ GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE_ACK,
+ &
+ handshake_ack_monotime_cb,
+ queue);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Handling plaintext, ack processed!\n");
@@ -2777,6 +2783,7 @@ handshake_monotime_cb (void *cls,
GNUNET_i2s (pid));
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
GNUNET_break (0);
return;
}
@@ -2804,6 +2811,7 @@ handshake_monotime_cb (void *cls,
&
handshake_monotime_store_cb,
queue);
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
}
@@ -2848,12 +2856,12 @@ decrypt_and_check_tc (struct Queue *queue,
&tc->sender.public_key);
if (GNUNET_YES == ret)
queue->handshake_monotime_get =
- GNUNET_PEERSTORE_iterate (peerstore,
- "transport_tcp_communicator",
- &queue->target,
-
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE,
- &handshake_monotime_cb,
- queue);
+ GNUNET_PEERSTORE_iteration_start (peerstore,
+ "transport_tcp_communicator",
+ &queue->target,
+
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE,
+ &handshake_monotime_cb,
+ queue);
return ret;
}
diff --git a/src/service/transport/gnunet-service-transport.c
b/src/service/transport/gnunet-service-transport.c
index 523ef2120..6d2ba4139 100644
--- a/src/service/transport/gnunet-service-transport.c
+++ b/src/service/transport/gnunet-service-transport.c
@@ -2062,7 +2062,7 @@ struct IncomingRequest
/**
* Notify context for new HELLOs.
*/
- struct GNUNET_PEERSTORE_NotifyContext *nc;
+ struct GNUNET_PEERSTORE_Monitor *nc;
/**
* Which peer is this about?
@@ -2089,7 +2089,7 @@ struct PeerRequest
/**
* Notify context for new HELLOs.
*/
- struct GNUNET_PEERSTORE_NotifyContext *nc;
+ struct GNUNET_PEERSTORE_Monitor *nc;
/**
* What kind of performance preference does this @e tc have?
@@ -2980,7 +2980,8 @@ free_incoming_request (struct IncomingRequest *ir)
GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir);
GNUNET_assert (ir_total > 0);
ir_total--;
- GNUNET_PEERSTORE_hello_changed_notify_cancel (ir->nc);
+ if (NULL != ir->nc)
+ GNUNET_PEERSTORE_monitor_stop (ir->nc);
ir->nc = NULL;
GNUNET_free (ir);
}
@@ -3618,7 +3619,7 @@ free_neighbour (struct Neighbour *neighbour)
}
if (NULL != neighbour->get)
{
- GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
+ GNUNET_PEERSTORE_iteration_stop (neighbour->get);
neighbour->get = NULL;
}
if (NULL != neighbour->sc)
@@ -3996,7 +3997,8 @@ stop_peer_request (void *cls,
struct TransportClient *tc = cls;
struct PeerRequest *pr = value;
- GNUNET_PEERSTORE_hello_changed_notify_cancel (pr->nc);
+ if (NULL != pr->nc)
+ GNUNET_PEERSTORE_monitor_stop (pr->nc);
pr->nc = NULL;
GNUNET_assert (
GNUNET_YES ==
@@ -4072,7 +4074,8 @@ client_disconnect_cb (void *cls,
struct AddressListEntry *ale;
if (NULL != tc->details.communicator.free_queue_entry_task)
- GNUNET_SCHEDULER_cancel
(tc->details.communicator.free_queue_entry_task);
+ GNUNET_SCHEDULER_cancel (
+ tc->details.communicator.free_queue_entry_task);
while (NULL != (q = tc->details.communicator.queue_head))
free_queue (q);
while (NULL != (ale = tc->details.communicator.addr_head))
@@ -4141,9 +4144,9 @@ finish_cmc_handling_with_continue (struct
CommunicatorMessageContext *cmc,
int free_cmc);
static enum GNUNET_GenericReturnValue
-resume_communicators(void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+resume_communicators (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
struct VirtualLink *vl = value;
struct CommunicatorMessageContext *cmc;
@@ -4589,21 +4592,22 @@ free_timedout_queue_entry (void *cls)
while (NULL != qep)
{
struct QueueEntry *pos = qep;
-
+
qep = qep->next;
- struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference
(pos->creation_timestamp, now);
+ struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference (
+ pos->creation_timestamp, now);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"diff to now %s \n",
GNUNET_TIME_relative2s (diff, GNUNET_NO));
- if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, < , diff))
+ if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, <, diff))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Freeing timed out QueueEntry with MID %" PRIu64
- " and QID %u\n",
- pos->mid,
- queue->qid);
- free_queue_entry(pos, tc);
+ "Freeing timed out QueueEntry with MID %" PRIu64
+ " and QID %u\n",
+ pos->mid,
+ queue->qid);
+ free_queue_entry (pos, tc);
}
}
}
@@ -4716,9 +4720,11 @@ queue_send_msg (struct Queue *queue,
struct TransportClient *tc = queue->tc;
if (NULL == tc->details.communicator.free_queue_entry_task)
- tc->details.communicator.free_queue_entry_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-
&free_timedout_queue_entry,
-
tc);
+ tc->details.communicator.free_queue_entry_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &
+ free_timedout_queue_entry,
+ tc);
}
if (NULL != pm && NULL != (pa = pm->pa_head))
{
@@ -4729,7 +4735,8 @@ queue_send_msg (struct Queue *queue,
// GNUNET_CONTAINER_multiuuidmap_get (pending_acks,
&ack[i].ack_uuid.value);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending message MID %" PRIu64
- " of type %u (%u) and size %lu with MQ %p queue %s (QID %u)
pending %" PRIu64 "\n",
+ " of type %u (%u) and size %lu with MQ %p queue %s (QID %u)
pending %"
+ PRIu64 "\n",
GNUNET_ntohll (smt->mid),
ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
ntohs (smt->header.size),
@@ -5699,7 +5706,7 @@ shc_cont (void *cls, int success)
ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
&store_pi,
ale);
- }
+ }
}
@@ -6057,7 +6064,7 @@ handle_raw_message (void *cls, const struct
GNUNET_MessageHeader *mh)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%u items stored in ring buffer\n",
GNUNET_YES == is_ring_buffer_full ? RING_BUFFER_SIZE :
- ring_buffer_head);
+ ring_buffer_head);
/*GNUNET_break_op (0);
GNUNET_STATISTICS_update (GST_stats,
@@ -6369,7 +6376,8 @@ handle_fragment_box (void *cls, const struct
TransportFragmentBoxMessage *fb)
else
rc->msg_missing = 0;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received fragment with size %u at offset %u/%u %u bytes
missing from %s for NEW message %" PRIu64 "\n",
+ "Received fragment with size %u at offset %u/%u %u bytes
missing from %s for NEW message %"
+ PRIu64 "\n",
fsize,
ntohs (fb->frag_off),
msize,
@@ -6644,8 +6652,8 @@ completed_pending_message (struct PendingMessage *pm)
free_pending_message (pm);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"pos frag_off %lu pos bytes_msg %lu pmt %u parent %u\n",
- pos->frag_off,
- pos->bytes_msg,
+ (unsigned long) pos->frag_off,
+ (unsigned long) pos->bytes_msg,
pos->pmt,
NULL == pos->frag_parent ? 1 : 0);
/* check if subtree is done */
@@ -6672,7 +6680,8 @@ completed_pending_message (struct PendingMessage *pm)
}
/* Was this the last applicable fragment? */
- if ((NULL == pos->head_frag) && (NULL == pos->frag_parent || PMT_DV_BOX ==
pos->pmt) &&
+ if ((NULL == pos->head_frag) && (NULL == pos->frag_parent || PMT_DV_BOX ==
+ pos->pmt) &&
(pos->frag_off == pos->bytes_msg))
client_send_response (pos);
return;
@@ -8186,7 +8195,7 @@ forward_dv_box (struct Neighbour *next_hop,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%u items stored in DV ring buffer\n",
GNUNET_YES == is_ring_buffer_dv_full ? RING_BUFFER_SIZE :
- ring_buffer_dv_head);
+ ring_buffer_dv_head);
}
}
}
@@ -8202,7 +8211,7 @@ free_backtalker (struct Backtalker *b)
{
if (NULL != b->get)
{
- GNUNET_PEERSTORE_iterate_cancel (b->get);
+ GNUNET_PEERSTORE_iteration_stop (b->get);
b->get = NULL;
GNUNET_assert (NULL != b->cmc);
finish_cmc_handling (b->cmc);
@@ -8309,6 +8318,7 @@ backtalker_monotime_cb (void *cls,
}
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (b->get, 1);
GNUNET_break (0);
return;
}
@@ -8328,8 +8338,8 @@ backtalker_monotime_cb (void *cls,
/* Setting body_size to 0 prevents call to #forward_backchannel_payload()
*/
b->body_size = 0;
- return;
}
+ GNUNET_PEERSTORE_iteration_next (b->get, 1);
}
@@ -8652,12 +8662,12 @@ handle_dv_box (void *cls, const struct
TransportDVBoxMessage *dvb)
GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
b->get =
- GNUNET_PEERSTORE_iterate (peerstore,
- "transport",
- &b->pid,
-
GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
- &backtalker_monotime_cb,
- b);
+ GNUNET_PEERSTORE_iteration_start (peerstore,
+ "transport",
+ &b->pid,
+
GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+ &backtalker_monotime_cb,
+ b);
} /* end actual decryption */
}
@@ -8819,7 +8829,7 @@ start_address_validation (const struct
GNUNET_PeerIdentity *pid,
&vs->challenge,
sizeof(vs->challenge));
vs->address = GNUNET_strdup (address);
- GNUNET_CRYPTO_hash (vs->address, strlen(vs->address), &vs->hc);
+ GNUNET_CRYPTO_hash (vs->address, strlen (vs->address), &vs->hc);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting address validation `%s' of peer %s using challenge
%s\n",
address,
@@ -8887,11 +8897,12 @@ hello_for_incoming_cb (void *cls,
*/
static void
handle_hello_for_incoming (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *emsg)
{
+ struct IncomingRequest *ir = cls;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
if (NULL != emsg)
{
@@ -8900,8 +8911,12 @@ handle_hello_for_incoming (void *cls,
emsg);
return;
}
- if (0 == GNUNET_memcmp (peer, &GST_my_identity))
+ hello = record->value;
+ if (0 == GNUNET_memcmp (&record->peer, &GST_my_identity))
+ {
+ GNUNET_PEERSTORE_monitor_next (ir->nc, 1);
return;
+ }
builder = GNUNET_HELLO_builder_from_msg (hello);
GNUNET_HELLO_builder_iterate (builder,
hello_for_incoming_cb,
@@ -8910,6 +8925,22 @@ handle_hello_for_incoming (void *cls,
}
+static void
+hello_for_incoming_error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+hello_for_incoming_sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
+}
+
+
/**
* Communicator gave us a transport address validation challenge. Process the
* request.
@@ -9008,10 +9039,17 @@ handle_validation_challenge (
ir->pid = sender;
GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir);
- ir->nc = GNUNET_PEERSTORE_hello_changed_notify (peerstore,
- GNUNET_NO,
- &handle_hello_for_incoming,
- NULL);
+ ir->nc = GNUNET_PEERSTORE_monitor_start (GST_cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &hello_for_incoming_error_cb,
+ NULL,
+ &hello_for_incoming_sync_cb,
+ NULL,
+ &handle_hello_for_incoming,
+ ir);
ir_total++;
/* Bound attempts we do in parallel here, might otherwise get excessive */
while (ir_total > MAX_INCOMING_REQUEST)
@@ -9107,6 +9145,7 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const
char *address)
return NULL;
}
+
static void
validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs);
@@ -9139,8 +9178,8 @@ revalidate_map_it (
{
(void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Key in revalidate map %s \n",
- GNUNET_h2s (key));
+ "Key in revalidate map %s \n",
+ GNUNET_h2s (key));
return GNUNET_YES;
}
@@ -9287,7 +9326,7 @@ handle_validation_response (
vs->revalidation_task =
GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_subtract (vs->next_challenge,
GNUNET_TIME_UNIT_MINUTES),
-
&revalidation_start_cb, vs);
+ &revalidation_start_cb, vs);
vs->sc = GNUNET_PEERSTORE_store (peerstore,
"transport",
&cmc->im.sender,
@@ -10022,7 +10061,7 @@ update_pm_next_attempt (struct PendingMessage *pm,
if (PMT_DV_BOX == root->pmt)
root = root->frag_parent;
reorder_root_pm (root, root->next_attempt);
- //root->next_attempt = GNUNET_TIME_UNIT_ZERO_ABS;
+ // root->next_attempt = GNUNET_TIME_UNIT_ZERO_ABS;
}
else
{
@@ -10036,10 +10075,10 @@ update_pm_next_attempt (struct PendingMessage *pm,
next_attempt);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "frag_count %u after factor\n",
- root->frag_count);
+ "frag_count %u after factor\n",
+ root->frag_count);
s1 = GNUNET_TIME_relative_multiply_double (plus_mean,
- factor);
+ factor);
s2 = GNUNET_TIME_relative_divide (plus,
root->frag_count);
plus_mean = GNUNET_TIME_relative_add (s1, s2);
@@ -10966,7 +11005,8 @@ validation_transmit_on_queue (struct Queue *q, struct
ValidationState *vs)
GNUNET_CONTAINER_multihashmap_remove (revalidation_map, &vs->hc, vs);
monotonic_time = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
- if (GNUNET_TIME_UNIT_ZERO_ABS.abs_value_us ==
vs->last_challenge_use.abs_value_us)
+ if (GNUNET_TIME_UNIT_ZERO_ABS.abs_value_us ==
+ vs->last_challenge_use.abs_value_us)
{
vs->first_challenge_use = monotonic_time;
}
@@ -11325,6 +11365,7 @@ neighbour_dv_monotime_cb (void *cls,
}
if (0 == record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (n->get, 1);
GNUNET_break (0);
return;
}
@@ -11332,6 +11373,7 @@ neighbour_dv_monotime_cb (void *cls,
n->last_dv_learn_monotime =
GNUNET_TIME_absolute_max (n->last_dv_learn_monotime,
GNUNET_TIME_absolute_ntoh (*mtbe));
+ GNUNET_PEERSTORE_iteration_next (n->get, 1);
}
@@ -11389,12 +11431,12 @@ handle_add_queue_message (void *cls,
neighbour,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
neighbour->get =
- GNUNET_PEERSTORE_iterate (peerstore,
- "transport",
- &neighbour->pid,
- GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
- &neighbour_dv_monotime_cb,
- neighbour);
+ GNUNET_PEERSTORE_iteration_start (peerstore,
+ "transport",
+ &neighbour->pid,
+
GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+ &neighbour_dv_monotime_cb,
+ neighbour);
}
addr_len = ntohs (aqm->header.size) - sizeof(*aqm);
addr = (const char *) &aqm[1];
@@ -11445,18 +11487,21 @@ handle_add_queue_message (void *cls,
queue->idle = GNUNET_YES;
/* check if valdiations are waiting for the queue */
if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (validation_map,
- &aqm->receiver))
- {
- if (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_get_multiple
(validation_map,
- &aqm->receiver,
-
&check_validation_request_pending,
- queue))
+ &aqm->receiver))
+ {
+ if (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_get_multiple (
+ validation_map,
+ &aqm->
+ receiver,
+ &
+ check_validation_request_pending,
+ queue))
start_address_validation (&aqm->receiver, queue->address);
}
else
start_address_validation (&aqm->receiver, queue->address);
/* look for traffic for this queue */
- //TODO Check whether this makes any sense at all.
+ // TODO Check whether this makes any sense at all.
/*schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);*/
/* might be our first queue, try launching DV learning */
@@ -11658,12 +11703,12 @@ hello_for_client_cb (void *cls,
*/
static void
handle_hello_for_client (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *emsg)
{
- (void) cls;
+ struct PeerRequest *pr = cls;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
if (NULL != emsg)
{
@@ -11672,8 +11717,12 @@ handle_hello_for_client (void *cls,
emsg);
return;
}
- if (0 == GNUNET_memcmp (peer, &GST_my_identity))
+ hello = record->value;
+ if (0 == GNUNET_memcmp (&record->peer, &GST_my_identity))
+ {
+ GNUNET_PEERSTORE_monitor_next (pr->nc, 1);
return;
+ }
builder = GNUNET_HELLO_builder_from_msg (hello);
GNUNET_HELLO_builder_iterate (builder,
hello_for_client_cb,
@@ -11682,6 +11731,22 @@ handle_hello_for_client (void *cls,
}
+static void
+hello_for_client_error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+hello_for_client_sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
+}
+
+
/**
* We have received a `struct ExpressPreferenceMessage` from an application
* client.
@@ -11728,10 +11793,18 @@ handle_suggest (void *cls, const struct
ExpressPreferenceMessage *msg)
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- pr->nc = GNUNET_PEERSTORE_hello_changed_notify (peerstore,
- GNUNET_NO,
- &handle_hello_for_client,
- NULL);
+ pr->nc =
+ GNUNET_PEERSTORE_monitor_start (GST_cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &hello_for_client_error_cb,
+ NULL,
+ &hello_for_client_sync_cb,
+ NULL,
+ &handle_hello_for_client,
+ pr);
GNUNET_SERVICE_client_continue (tc->client);
}
diff --git a/src/service/transport/transport-testing2.c
b/src/service/transport/transport-testing2.c
index 4081e5111..36a57b2b6 100644
--- a/src/service/transport/transport-testing2.c
+++ b/src/service/transport/transport-testing2.c
@@ -341,7 +341,7 @@ hello_iter_cb (void *cb_cls,
memcpy (p->hello, record->value, p->hello_size);
p->hello[p->hello_size - 1] = '\0';
- GNUNET_PEERSTORE_iterate_cancel (p->pic);
+ GNUNET_PEERSTORE_iteration_stop (p->pic);
p->pic = NULL;
if (NULL != p->start_cb)
{
@@ -352,6 +352,7 @@ hello_iter_cb (void *cb_cls,
p->start_cb (p->start_cb_cls);
p->start_cb = NULL;
}
+ GNUNET_PEERSTORE_iteration_next (p->pic, 1);
}
@@ -360,12 +361,12 @@ retrieve_hello (void *cls)
{
struct GNUNET_TRANSPORT_TESTING_PeerContext *p = cls;
p->rh_task = NULL;
- p->pic = GNUNET_PEERSTORE_iterate (p->ph,
- "transport",
- &p->id,
- GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
- hello_iter_cb,
- p);
+ p->pic = GNUNET_PEERSTORE_iteration_start (p->ph,
+ "transport",
+ &p->id,
+
GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
+ hello_iter_cb,
+ p);
}
@@ -536,7 +537,7 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct
GNUNET_i2s (&p->id));
if (NULL != p->pic)
{
- GNUNET_PEERSTORE_iterate_cancel (p->pic);
+ GNUNET_PEERSTORE_iteration_stop (p->pic);
p->pic = NULL;
}
if (NULL != p->th)
@@ -594,12 +595,12 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct
¬ify_disconnect);
GNUNET_assert (NULL != p->th);
p->ah = GNUNET_TRANSPORT_application_init (p->cfg);
- p->pic = GNUNET_PEERSTORE_iterate (p->ph,
- "transport",
- &p->id,
- GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
- hello_iter_cb,
- p);
+ p->pic = GNUNET_PEERSTORE_iteration_start (p->ph,
+ "transport",
+ &p->id,
+
GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
+ hello_iter_cb,
+ p);
GNUNET_assert (NULL != p->pic);
return GNUNET_OK;
}
@@ -632,7 +633,7 @@ GNUNET_TRANSPORT_TESTING_stop_peer (struct
}
if (NULL != p->pic)
{
- GNUNET_PEERSTORE_iterate_cancel (p->pic);
+ GNUNET_PEERSTORE_iteration_stop (p->pic);
p->pic = NULL;
}
if (NULL != p->th)
diff --git a/src/service/transport/transport_api_cmd_start_peer.c
b/src/service/transport/transport_api_cmd_start_peer.c
index 24d731d77..311289f92 100644
--- a/src/service/transport/transport_api_cmd_start_peer.c
+++ b/src/service/transport/transport_api_cmd_start_peer.c
@@ -67,9 +67,10 @@ hello_iter_cb (void *cb_cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Our hello %s\n",
sps->hello);
- GNUNET_PEERSTORE_iterate_cancel (sps->pic);
+ GNUNET_PEERSTORE_iteration_stop (sps->pic);
sps->pic = NULL;
GNUNET_TESTING_async_finish (&sps->ac);
+ GNUNET_PEERSTORE_iteration_next (sps->pic, 1);
}
@@ -83,12 +84,12 @@ retrieve_hello (void *cls)
{
struct GNUNET_TESTING_StartPeerState *sps = cls;
sps->rh_task = NULL;
- sps->pic = GNUNET_PEERSTORE_iterate (sps->ph,
- "transport",
- &sps->id,
- GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
- hello_iter_cb,
- sps);
+ sps->pic = GNUNET_PEERSTORE_iteration_start (sps->ph,
+ "transport",
+ &sps->id,
+
GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
+ hello_iter_cb,
+ sps);
}
@@ -423,12 +424,13 @@ start_peer_traits (void *cls,
GNUNET_TRANSPORT_TESTING_make_trait_application_handle ((const void *) ah),
GNUNET_TRANSPORT_TESTING_make_trait_peer_id ((const void *) id),
GNUNET_TRANSPORT_TESTING_make_trait_connected_peers_map ((const
- void *)
- connected_peers_map),
+ void *)
+
connected_peers_map),
GNUNET_TRANSPORT_TESTING_make_trait_hello ((const void *) hello),
GNUNET_TRANSPORT_TESTING_make_trait_hello_size ((const void *) hello_size),
GNUNET_TRANSPORT_TESTING_make_trait_state ((const void *) sps),
- GNUNET_TRANSPORT_TESTING_make_trait_broadcast ((const void *)
&sps->broadcast),
+ GNUNET_TRANSPORT_TESTING_make_trait_broadcast ((const
+ void *) &sps->broadcast),
GNUNET_TESTING_trait_end ()
};
diff --git a/src/service/transport/transport_api_cmd_stop_peer.c
b/src/service/transport/transport_api_cmd_stop_peer.c
index 333a3dae7..fbd2b1df3 100644
--- a/src/service/transport/transport_api_cmd_stop_peer.c
+++ b/src/service/transport/transport_api_cmd_stop_peer.c
@@ -68,7 +68,7 @@ stop_peer_run (void *cls,
if (NULL != sps->pic)
{
- GNUNET_PEERSTORE_iterate_cancel (sps->pic);
+ GNUNET_PEERSTORE_iteration_stop (sps->pic);
}
if (NULL != sps->th)
{
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet] branch master updated (72bc94764 -> 83dda01ce),
gnunet <=