[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] 01/02: PEERSTORE: Align API with NAMESTORE iterator and monitor
From: |
gnunet |
Subject: |
[gnunet] 01/02: PEERSTORE: Align API with NAMESTORE iterator and monitor. |
Date: |
Fri, 23 Feb 2024 15:42:49 +0100 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
commit d8a247e343a85d0e184aeebdab9c26f85afb320c
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Fri Feb 23 15:38:49 2024 +0100
PEERSTORE: Align API with NAMESTORE iterator and monitor.
This includes renaming of PEERSTORE API calls and changing the
indended usage/contract.
It also deprecates some rather wonky API calls (e.g. *hello_notify and
hello_add and friends)
NEWS: Reworked PEERSTORE API
---
src/include/gnunet_peerstore_plugin.h | 18 +-
src/include/gnunet_peerstore_service.h | 185 ++--
src/include/gnunet_protocols.h | 29 +-
src/plugin/peerstore/plugin_peerstore_sqlite.c | 183 +++-
src/service/cadet/gnunet-service-cadet_hello.c | 57 +-
src/service/dhtu/plugin_dhtu_gnunet.c | 63 +-
src/service/fs/gnunet-service-fs_cp.c | 27 +-
.../hostlist/gnunet-daemon-hostlist_server.c | 53 +-
src/service/namestore/gnunet-service-namestore.c | 17 +-
src/service/peerstore/gnunet-service-peerstore.c | 1048 +++++++++++++++-----
src/service/peerstore/meson.build | 3 +-
src/service/peerstore/peerstore.h | 174 +++-
src/service/peerstore/peerstore_api.c | 671 +++----------
src/service/peerstore/peerstore_api_monitor.c | 297 ++++++
src/service/peerstore/peerstore_common.c | 20 +-
src/service/peerstore/peerstore_common.h | 2 +-
src/service/peerstore/perf_peerstore_store.c | 28 +-
src/service/peerstore/test_peerstore_api_iterate.c | 43 +-
src/service/peerstore/test_peerstore_api_store.c | 39 +-
src/service/peerstore/test_peerstore_api_watch.c | 47 +-
src/service/topology/gnunet-daemon-topology.c | 63 +-
src/service/transport/gnunet-communicator-tcp.c | 54 +-
src/service/transport/gnunet-service-transport.c | 219 ++--
src/service/transport/transport-testing2.c | 31 +-
.../transport/transport_api_cmd_start_peer.c | 22 +-
.../transport/transport_api_cmd_stop_peer.c | 2 +-
26 files changed, 2143 insertions(+), 1252 deletions(-)
diff --git a/src/include/gnunet_peerstore_plugin.h
b/src/include/gnunet_peerstore_plugin.h
index 2636c3009..6ecb9135a 100644
--- a/src/include/gnunet_peerstore_plugin.h
+++ b/src/include/gnunet_peerstore_plugin.h
@@ -46,6 +46,19 @@ extern "C"
#endif
#endif
+/**
+ * Function called by PEERSTORE for each matching record.
+ *
+ * @param cls closure
+ * @param seq sequence in interation
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
+ */
+typedef void (*GNUNET_PEERSTORE_PluginProcessor) (
+ void *cls,
+ uint64_t seq,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg);
/**
* @brief struct returned by the initialization function of the plugin
@@ -93,6 +106,7 @@ struct GNUNET_PEERSTORE_PluginFunctions
* @param sub_system name of sub system
* @param peer Peer identity (can be NULL)
* @param key entry key string (can be NULL)
+ * @param limit max number of results to give
* @param iter function to call asynchronously with the results, terminated
* by a NULL result
* @param iter_cls closure for @a iter
@@ -104,7 +118,9 @@ struct GNUNET_PEERSTORE_PluginFunctions
const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
- GNUNET_PEERSTORE_Processor iter,
+ uint64_t serial,
+ uint64_t limit,
+ GNUNET_PEERSTORE_PluginProcessor iter,
void *iter_cls);
/**
diff --git a/src/include/gnunet_peerstore_service.h
b/src/include/gnunet_peerstore_service.h
index 966abb7a6..0284d46dd 100644
--- a/src/include/gnunet_peerstore_service.h
+++ b/src/include/gnunet_peerstore_service.h
@@ -66,32 +66,32 @@ extern "C" {
* messages.
*/
#define GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME \
- "transport-backchannel-monotonic-time"
+ "transport-backchannel-monotonic-time"
/**
* Key used to store sender's monotonic time from DV learn
* messages.
*/
#define GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME \
- "transport-dv-learn-monotonic-time"
+ "transport-dv-learn-monotonic-time"
/**
* Key used to store sender's monotonic time from handshake message.
*/
#define GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE \
- "transport-tcp-communicator-handshake"
+ "transport-tcp-communicator-handshake"
/**
* Key used to store sender's monotonic time from handshake ack message.
*/
#define GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE_ACK \
- "transport-tcp-communicator-handshake-ack"
+ "transport-tcp-communicator-handshake-ack"
/**
* Key used to store sender's monotonic time from rekey message.
*/
#define GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_REKEY \
- "transport-tcp-communicator-rekey"
+ "transport-tcp-communicator-rekey"
/**
@@ -108,7 +108,12 @@ enum GNUNET_PEERSTORE_StoreOption
* Delete any previous values for the given key before
* storing the given value.
*/
- GNUNET_PEERSTORE_STOREOPTION_REPLACE = 1
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE = 1,
+
+ /**
+ * Upserts values. Replaces if expiry is later than existing.
+ */
+ GNUNET_PEERSTORE_STOREOPTION_UPSERT_LATER_EXPIRY = 2
};
/**
@@ -161,11 +166,6 @@ struct GNUNET_PEERSTORE_Record
*/
struct GNUNET_TIME_Absolute expiry;
- /**
- * Client from which this record originated.
- * NOTE: This is internal to the service.
- */
- struct GNUNET_SERVICE_Client *client;
};
@@ -208,16 +208,6 @@ struct GNUNET_PEERSTORE_StoreHelloContext
*/
void *cont_cls;
- /**
- * Map with all store contexts started during adding hello.
- */
- struct GNUNET_CONTAINER_MultiPeerMap *store_context_map;
-
- /**
- * Active watch to be notified about conflicting hello uri add requests.
- */
- struct GNUNET_PEERSTORE_WatchContext *wc;
-
/**
* Hello uri which was request for storing.
*/
@@ -229,9 +219,10 @@ struct GNUNET_PEERSTORE_StoreHelloContext
struct GNUNET_PeerIdentity *pid;
/**
- * Was this request successful.
+ * The iteration for the merge
*/
- int success;
+ struct GNUNET_PEERSTORE_StoreContext *sc;
+
};
/**
@@ -249,6 +240,7 @@ struct GNUNET_PEERSTORE_StoreHelloContextClosure
* Function called by PEERSTORE for each matching record.
*
* @param cls closure
+ * @param seq sequence in interation
* @param record peerstore record information
* @param emsg error message, or NULL if no errors
*/
@@ -269,38 +261,6 @@ typedef void (*GNUNET_PEERSTORE_hello_notify_cb) (
const struct GNUNET_MessageHeader *hello,
const char *err_msg);
-/**
- * Call a method whenever our known information about peers
- * changes. Initially calls the given function for all known
- * peers and then only signals changes.
- *
- * If @a include_friend_only is set to #GNUNET_YES peerinfo will include HELLO
- * messages which are intended for friend to friend mode and which do not
- * have to be gossiped. Otherwise these messages are skipped. //FIXME Not
implemented atm!
- *
- * @param h Handle to the PEERSTORE service
- * @param include_friend_only include HELLO messages for friends only (not
used at the moment)
- * @param callback the method to call for getting the hello.
- * @param callback_cls closure for @a callback
- * @return NULL on error
- */
-struct GNUNET_PEERSTORE_NotifyContext *
-GNUNET_PEERSTORE_hello_changed_notify (struct GNUNET_PEERSTORE_Handle *h,
- int include_friend_only,
- GNUNET_PEERSTORE_hello_notify_cb
callback,
- void *callback_cls);
-
-
-/**
- * Stop notifying about changes.
- *
- * @param nc context to stop notifying
- */
-void
-GNUNET_PEERSTORE_hello_changed_notify_cancel (struct
- GNUNET_PEERSTORE_NotifyContext
*nc);
-
-
/**
* Add hello to peerstore.
*
@@ -385,65 +345,130 @@ GNUNET_PEERSTORE_store_cancel (struct
GNUNET_PEERSTORE_StoreContext *sc);
/**
- * Iterate over records matching supplied key information
+ * Iterate over peerstore entries.
+ * The iteration can be filtered to contain only records
+ * matching @a peer and/or @a key.
+ * The @a sub_system to match must be provided.
+ * @a callback will be called with (each) matching record.
+ * #GNUNET_PEERSTORE_iteration_next() must be invoked
+ * to continue processing until the end of the iteration is
+ * reached.
*
* @param h handle to the PEERSTORE service
* @param sub_system name of sub system
* @param peer Peer identity (can be NULL)
* @param key entry key string (can be NULL)
- * @param callback function called with each matching record, all NULL's on end
+ * @param callback function called with each matching record. The record will
be NULL to indicate end.
* @param callback_cls closure for @a callback
* @return Handle to iteration request
*/
struct GNUNET_PEERSTORE_IterateContext *
-GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls);
+GNUNET_PEERSTORE_iteration_start (struct GNUNET_PEERSTORE_Handle *h,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls);
/**
- * Cancel an iterate request
- * Please do not call after the iterate request is done
+ * Continue an iteration.
+ * Do NOT call after the iterate request is done.
*
- * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
+ * @param ic Iterate request context as returned by
#GNUNET_PEERSTORE_iteration_start()
+ * @param limit how many records to return max until
#GNUNET_PEERSTORE_iterate_next() needs to be called again.
*/
void
-GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic);
+GNUNET_PEERSTORE_iteration_next (struct GNUNET_PEERSTORE_IterateContext *ic,
+ uint64_t limit);
+
+/**
+ * Cancel an iteration.
+ * Do NOT call after the iterate request is done
+ *
+ * @param ic Iterate request context as returned by
#GNUNET_PEERSTORE_iteration_start()
+ */
+void
+GNUNET_PEERSTORE_iteration_stop (struct GNUNET_PEERSTORE_IterateContext *ic);
/**
* Request watching a given key
- * User will be notified with any new values added to key,
- * all existing entries are supplied beforehand.
+ * The monitoring can be filtered to contain only records
+ * matching @a peer and/or @a key.
+ * The @a sub_system to match must be provided.
+ * @a callback will be called with (each) matching new record.
+ * #GNUNET_PEERSTORE_monitor_next() must be invoked
+ * to continue processing until @a sync_cb is
+ * called, indicating that the caller should be up-to-date.
+ * The caller will be notified with any new values added to key
+ * through @a callback.
+ * If @a iterate_first is set to GNUNET_YES, the monitor will first
+ * iterate over all existing, matching records. In any case,
+ * after @a sync_cb is called the first time monitoring starts.
*
* @param h handle to the PEERSTORE service
+ * @param iterate_first first iterated of all results if GNUNET_YES
* @param sub_system name of sub system
* @param peer Peer identity
* @param key entry key string
+ * @param error_cb function to call on error (i.e. disconnect); note that
+ * unlike the other error callbacks in this API, a call to this
+ * function does NOT destroy the monitor handle, it merely signals
+ * that monitoring is down. You need to still explicitly call
+ * #GNUNET_PEERSTORE_monitor_stop().
+ * @param error_cb_cls closure for @a error_cb
+ * @param sync_cb function called when we're in sync with the peerstore
+ * @param sync_cb_cls closure for @a sync_cb
* @param callback function called with each new value
* @param callback_cls closure for @a callback
* @return Handle to watch request
*/
-struct GNUNET_PEERSTORE_WatchContext *
-GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls);
-
+struct GNUNET_PEERSTORE_Monitor *
+GNUNET_PEERSTORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ int iterate_first,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_SCHEDULER_TaskCallback error_cb,
+ void *error_cb_cls,
+ GNUNET_SCHEDULER_TaskCallback sync_cb,
+ void *sync_cb_cls,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls);
/**
- * Cancel a watch request
+ * Calls the monitor processor specified in #GNUNET_PEERSTORE_monitor_start
+ * for the next record(s). This function is used to allow clients that merely
+ * monitor the NAMESTORE to still throttle namestore operations, so we can be
+ * sure that the monitors can keep up.
*
- * @param wc handle to the watch request
+ * Note that #GNUNET_PEERSTORE_store() only waits for this
+ * call if the previous limit set by the client was already reached.
+ * Thus, by using a @a limit greater than 1, monitors basically enable
+ * a queue of notifications to be processed asynchronously with some
+ * delay. Note that even with a limit of 1 the
+ * #GNUNET_PEERSTORE_store() function will run asynchronously
+ * and the continuation may be invoked before the monitors completed
+ * (or even started) processing the notification. Thus, monitors will
+ * only closely track the current state of the peerstore, but not
+ * be involved in the transactions.
+ *
+ * @param zm the monitor
+ * @param limit number of records to return to the iterator in one shot
+ * (before #GNUNET_PEERSTORE_monitor_next is to be called again)
*/
void
-GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc);
+GNUNET_PEERSTORE_monitor_next (struct GNUNET_PEERSTORE_Monitor *zm,
+ uint64_t limit);
+/**
+ * Stop monitoring.
+ *
+ * @param zm handle to the monitor activity to stop
+ */
+void
+GNUNET_PEERSTORE_monitor_stop (struct GNUNET_PEERSTORE_Monitor *zm);
#if 0 /* keep Emacsens' auto-indent happy */
{
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 42524d490..8638703db 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2599,14 +2599,14 @@ extern "C" {
#define GNUNET_MESSAGE_TYPE_PEERSTORE_STORE 820
/**
- * Iteration request
+ * Iteration request (see also 828, 829)
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE 821
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_START 821
/**
- * Iteration record message
+ * Record result message
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD 822
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD 822
/**
* Iteration end message
@@ -2614,25 +2614,34 @@ extern "C" {
#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END 823
/**
- * Watch request
+ * Monitor request
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH 824
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_START 824
/**
- * Watch response
+ * Monitor sync
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD 825
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_SYNC 825
/**
- * Watch cancel request
+ * Monitor next request
*/
-#define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL 826
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT 826
/**
* Store result message
*/
#define GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT 827
+/**
+ * Iteration request (see also 821, 829)
+ */
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_NEXT 828
+
+/**
+ * Iteration request (see also 821, 828)
+ */
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_STOP 829
/*******************************************************************************
* SOCIAL message types
diff --git a/src/plugin/peerstore/plugin_peerstore_sqlite.c
b/src/plugin/peerstore/plugin_peerstore_sqlite.c
index 7d06d2c63..373315bab 100644
--- a/src/plugin/peerstore/plugin_peerstore_sqlite.c
+++ b/src/plugin/peerstore/plugin_peerstore_sqlite.c
@@ -105,6 +105,11 @@ struct Plugin
*/
sqlite3_stmt *select_peerstoredata_by_all;
+ /**
+ * Precompiled SQL for selecting from peerstoredata
+ */
+ sqlite3_stmt *upsert_peerstoredata_later_expiry;
+
/**
* Precompiled SQL for deleting expired
* records from peerstoredata
@@ -243,13 +248,17 @@ peerstore_sqlite_iterate_records (void *cls,
const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
- GNUNET_PEERSTORE_Processor iter,
+ uint64_t serial,
+ uint64_t limit,
+ GNUNET_PEERSTORE_PluginProcessor iter,
void *iter_cls)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt;
int err = 0;
int sret;
+ int ret;
+ uint64_t seq;
struct GNUNET_PEERSTORE_Record rec;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -260,6 +269,8 @@ peerstore_sqlite_iterate_records (void *cls,
{
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -272,6 +283,8 @@ peerstore_sqlite_iterate_records (void *cls,
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_string (key),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -287,6 +300,8 @@ peerstore_sqlite_iterate_records (void *cls,
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_auto_from_type (peer),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -300,6 +315,8 @@ peerstore_sqlite_iterate_records (void *cls,
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_auto_from_type (peer),
GNUNET_SQ_query_param_string (key),
+ GNUNET_SQ_query_param_uint64 (&serial),
+ GNUNET_SQ_query_param_uint64 (&limit),
GNUNET_SQ_query_param_end
};
@@ -320,46 +337,56 @@ peerstore_sqlite_iterate_records (void *cls,
}
err = 0;
- while (SQLITE_ROW == (sret = sqlite3_step (stmt)))
+ ret = GNUNET_OK;
+ for (uint64_t i = 0; i < limit; i++)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Returning a matched record.\n");
- struct GNUNET_SQ_ResultSpec rs[] = {
- GNUNET_SQ_result_spec_string (&rec.sub_system),
- GNUNET_SQ_result_spec_auto_from_type (&rec.peer),
- GNUNET_SQ_result_spec_string (&rec.key),
- GNUNET_SQ_result_spec_variable_size (&rec.value, &rec.value_size),
- GNUNET_SQ_result_spec_absolute_time (&rec.expiry),
- GNUNET_SQ_result_spec_end
- };
-
- if (GNUNET_OK !=
- GNUNET_SQ_extract_result (stmt,
- rs))
+ sret = sqlite3_step (stmt);
+ if (SQLITE_DONE == sret)
{
- GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Iteration done (no results)\n");
+ ret = GNUNET_NO;
break;
}
- if (NULL != iter)
- iter (iter_cls,
- &rec,
- NULL);
- GNUNET_SQ_cleanup_result (rs);
- }
- if (SQLITE_DONE != sret)
- {
- LOG_SQLITE (plugin,
- GNUNET_ERROR_TYPE_ERROR,
- "sqlite_step");
- err = 1;
+ if (SQLITE_ROW != sret)
+ {
+ LOG_SQLITE (plugin,
+ GNUNET_ERROR_TYPE_ERROR,
+ "sqlite_step");
+ ret = GNUNET_SYSERR;
+ break;
+ }
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Returning a matched record.\n");
+ struct GNUNET_SQ_ResultSpec rs[] = {
+ GNUNET_SQ_result_spec_uint64 (&seq),
+ GNUNET_SQ_result_spec_string (&rec.sub_system),
+ GNUNET_SQ_result_spec_auto_from_type (&rec.peer),
+ GNUNET_SQ_result_spec_string (&rec.key),
+ GNUNET_SQ_result_spec_variable_size (&rec.value, &rec.value_size),
+ GNUNET_SQ_result_spec_absolute_time (&rec.expiry),
+ GNUNET_SQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_SQ_extract_result (stmt,
+ rs))
+ {
+ GNUNET_break (0);
+ break;
+ }
+ if (NULL != iter)
+ iter (iter_cls,
+ seq,
+ &rec,
+ NULL);
+ GNUNET_SQ_cleanup_result (rs);
+ }
}
GNUNET_SQ_reset (plugin->dbh,
stmt);
- if (NULL != iter)
- iter (iter_cls,
- NULL,
- err ? "sqlite error" : NULL);
- return GNUNET_OK;
+ return ret;
}
@@ -393,7 +420,7 @@ peerstore_sqlite_store_record (void *cls,
void *cont_cls)
{
struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->insert_peerstoredata;
+ sqlite3_stmt *stmt;
struct GNUNET_SQ_QueryParam params[] = {
GNUNET_SQ_query_param_string (sub_system),
GNUNET_SQ_query_param_auto_from_type (peer),
@@ -410,13 +437,43 @@ peerstore_sqlite_store_record (void *cls,
peer,
key);
}
- if (GNUNET_OK !=
- GNUNET_SQ_bind (stmt,
- params))
- LOG_SQLITE (plugin,
- GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_bind");
- else if (SQLITE_DONE != sqlite3_step (stmt))
+
+ if (GNUNET_PEERSTORE_STOREOPTION_UPSERT_LATER_EXPIRY == options)
+ {
+ struct GNUNET_SQ_QueryParam params_upsert[] = {
+ GNUNET_SQ_query_param_fixed_size (value, size),
+ GNUNET_SQ_query_param_absolute_time (&expiry),
+ GNUNET_SQ_query_param_string (sub_system),
+ GNUNET_SQ_query_param_auto_from_type (peer),
+ GNUNET_SQ_query_param_string (key),
+ GNUNET_SQ_query_param_absolute_time (&expiry),
+ GNUNET_SQ_query_param_end
+ };
+ stmt = plugin->upsert_peerstoredata_later_expiry;
+ if (GNUNET_OK !=
+ GNUNET_SQ_bind (stmt,
+ params_upsert))
+ {
+ LOG_SQLITE (plugin,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ GNUNET_assert (0);
+ }
+ }
+ else
+ {
+ stmt = plugin->insert_peerstoredata;
+ if (GNUNET_OK !=
+ GNUNET_SQ_bind (stmt,
+ params))
+ {
+ LOG_SQLITE (plugin,
+ GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ GNUNET_assert (0);
+ }
+ }
+ if (SQLITE_DONE != sqlite3_step (stmt))
{
LOG_SQLITE (plugin,
GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
@@ -558,6 +615,7 @@ database_setup (struct Plugin *plugin)
/* Create tables */
sql_exec (plugin->dbh,
"CREATE TABLE IF NOT EXISTS peerstoredata (\n"
+ " uid INTEGER PRIMARY KEY,"
" sub_system TEXT NOT NULL,\n"
" peer_id BLOB NOT NULL,\n"
" key TEXT NOT NULL,\n"
@@ -566,7 +624,7 @@ database_setup (struct Plugin *plugin)
/* Create Indices */
if (SQLITE_OK !=
sqlite3_exec (plugin->dbh,
- "CREATE INDEX IF NOT EXISTS peerstoredata_key_index ON
peerstoredata (sub_system, peer_id, key)",
+ "CREATE INDEX IF NOT EXISTS peerstoredata_key_index ON
peerstoredata (sub_system, peer_id, key, uid)",
NULL,
NULL,
NULL))
@@ -583,23 +641,46 @@ database_setup (struct Plugin *plugin)
" VALUES (?,?,?,?,?);",
&plugin->insert_peerstoredata);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
- " WHERE sub_system = ?",
+ "UPDATE peerstoredata"
+ " SET"
+ " value = ?,"
+ " expiry = ?"
+ " WHERE sub_system = ?"
+ " AND peer_id = ?"
+ " AND key = ?"
+ " AND expiry < ?",
+ &plugin->upsert_peerstoredata_later_expiry);
+ sql_prepare (plugin->dbh,
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
+ " WHERE sub_system = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
" WHERE sub_system = ?"
- " AND peer_id = ?",
+ " AND peer_id = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata_by_pid);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
" WHERE sub_system = ?"
- " AND key = ?",
+ " AND key = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata_by_key);
sql_prepare (plugin->dbh,
- "SELECT sub_system,peer_id,key,value,expiry FROM peerstoredata"
+ "SELECT uid,sub_system,peer_id,key,value,expiry FROM
peerstoredata"
" WHERE sub_system = ?"
- " AND peer_id = ?" " AND key = ?",
+ " AND peer_id = ?"
+ " AND key = ?"
+ " AND uid > ?"
+ " ORDER BY uid ASC"
+ " LIMIT ?",
&plugin->select_peerstoredata_by_all);
sql_prepare (plugin->dbh,
"DELETE FROM peerstoredata"
diff --git a/src/service/cadet/gnunet-service-cadet_hello.c
b/src/service/cadet/gnunet-service-cadet_hello.c
index ac84fab1e..cc28f0fde 100644
--- a/src/service/cadet/gnunet-service-cadet_hello.c
+++ b/src/service/cadet/gnunet-service-cadet_hello.c
@@ -24,6 +24,7 @@
* @author Bartlomiej Polot
* @author Christian Grothoff
*/
+#include "gnunet_common.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_statistics_service.h"
@@ -50,7 +51,7 @@ static struct GNUNET_PEERSTORE_Handle *peerstore;
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
-static struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+static struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
@@ -63,17 +64,20 @@ static struct GNUNET_PEERSTORE_NotifyContext
*peerstore_notify;
*/
static void
got_hello (void *cls,
- const struct GNUNET_PeerIdentity *id,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *err_msg)
{
struct CadetPeer *peer;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
- if ((NULL == id) ||
- (NULL == hello))
+ if (NULL == record->value)
+ {
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
return;
- if (0 == GNUNET_memcmp (id,
+ }
+ hello = record->value;
+ if (0 == GNUNET_memcmp (&record->peer,
&my_full_id))
{
GNUNET_free (mine);
@@ -83,19 +87,37 @@ got_hello (void *cls,
GNUNET_TIME_UNIT_ZERO);
GNUNET_HELLO_builder_free (builder);
GCD_hello_update ();
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Hello for %s (%d bytes), expires on %s\n",
- GNUNET_i2s (id),
+ GNUNET_i2s (&record->peer),
ntohs (hello->size),
GNUNET_STRINGS_absolute_time_to_string (
GNUNET_HELLO_builder_get_expiration_time (hello)));
- peer = GCP_get (id,
+ peer = GCP_get (&record->peer,
GNUNET_YES);
GCP_set_hello (peer,
hello);
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
+}
+
+
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
}
@@ -110,8 +132,17 @@ GCH_init (const struct GNUNET_CONFIGURATION_Handle *c)
GNUNET_assert (NULL == peerstore_notify);
peerstore = GNUNET_PEERSTORE_connect (c);
peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (peerstore, GNUNET_NO, &got_hello,
- NULL);
+ GNUNET_PEERSTORE_monitor_start (c,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &got_hello,
+ NULL);
}
@@ -123,7 +154,7 @@ GCH_shutdown ()
{
if (NULL != peerstore_notify)
{
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify = NULL;
}
if (NULL != peerstore)
@@ -153,8 +184,8 @@ GCH_get_mine (void)
{
builder = GNUNET_HELLO_builder_new (&my_full_id);
mine = GNUNET_HELLO_builder_to_dht_hello_msg (builder,
- my_private_key,
- GNUNET_TIME_UNIT_ZERO);
+ my_private_key,
+ GNUNET_TIME_UNIT_ZERO);
GNUNET_HELLO_builder_free (builder);
}
return mine;
diff --git a/src/service/dhtu/plugin_dhtu_gnunet.c
b/src/service/dhtu/plugin_dhtu_gnunet.c
index 7c02fdd15..826c33527 100644
--- a/src/service/dhtu/plugin_dhtu_gnunet.c
+++ b/src/service/dhtu/plugin_dhtu_gnunet.c
@@ -161,7 +161,7 @@ struct Plugin
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
- struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+ struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
* Identity of this peer.
@@ -176,7 +176,7 @@ struct Plugin
};
-//#include "../peerinfo-tool/gnunet-peerinfo_plugins.c"
+// #include "../peerinfo-tool/gnunet-peerinfo_plugins.c"
/**
@@ -198,7 +198,7 @@ gnunet_try_connect (void *cls,
int pfx_len;
eou = strstr (address,
- "://");
+ "://");
if (NULL == eou)
{
GNUNET_break (0);
@@ -379,7 +379,7 @@ add_addr (void *cls,
{
struct Plugin *plugin = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG ,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"peerinfo_cb addr %s\n",
addr);
plugin->env->address_add_cb (plugin->env->cls,
@@ -403,26 +403,44 @@ add_addr (void *cls,
*/
static void
peerinfo_cb (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *emsg)
{
struct Plugin *plugin = cls;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
+ hello = record->value;
if (NULL == hello)
return;
- if (NULL == peer)
- return;
if (0 !=
- GNUNET_memcmp (peer,
+ GNUNET_memcmp (&record->peer,
&plugin->my_identity))
+ {
+ GNUNET_PEERSTORE_monitor_next (plugin->peerstore_notify, 1);
return;
+ }
builder = GNUNET_HELLO_builder_from_msg (hello);
GNUNET_HELLO_builder_iterate (builder,
add_addr,
plugin);
GNUNET_HELLO_builder_free (builder);
+ GNUNET_PEERSTORE_monitor_next (plugin->peerstore_notify, 1);
+}
+
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
}
@@ -439,17 +457,23 @@ peerinfo_cb (void *cls,
* @param my_identity ID of this peer, NULL if we failed
*/
static void
- core_init_cb (void *cls,
+core_init_cb (void *cls,
const struct GNUNET_PeerIdentity *my_identity)
{
struct Plugin *plugin = cls;
plugin->my_identity = *my_identity;
- plugin->peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (plugin->peerstore,
- GNUNET_NO,
- &peerinfo_cb,
- plugin);
+ plugin->peerstore_notify = GNUNET_PEERSTORE_monitor_start (plugin->env->cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+
GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &peerinfo_cb,
+ plugin);
}
@@ -538,10 +562,10 @@ libgnunet_plugin_dhtu_gnunet_done (void *cls)
if (NULL != plugin->transport)
GNUNET_TRANSPORT_application_done (plugin->transport);
if (NULL != plugin->peerstore_notify)
- GNUNET_PEERSTORE_hello_changed_notify_cancel (plugin->peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (plugin->peerstore_notify);
if (NULL != plugin->peerstore)
GNUNET_PEERSTORE_disconnect (plugin->peerstore);
- //GPI_plugins_unload ();
+ // GPI_plugins_unload ();
GNUNET_free (plugin->my_priv);
GNUNET_free (plugin);
GNUNET_free (api);
@@ -570,7 +594,8 @@ libgnunet_plugin_dhtu_gnunet_init (void *cls)
};
plugin = GNUNET_new (struct Plugin);
- plugin->my_priv = GNUNET_CRYPTO_eddsa_key_create_from_configuration
(env->cfg);
+ plugin->my_priv = GNUNET_CRYPTO_eddsa_key_create_from_configuration (
+ env->cfg);
plugin->env = env;
api = GNUNET_new (struct GNUNET_DHTU_PluginFunctions);
api->cls = plugin;
@@ -598,6 +623,6 @@ libgnunet_plugin_dhtu_gnunet_init (void *cls)
libgnunet_plugin_dhtu_gnunet_done (plugin);
return NULL;
}
- //GPI_plugins_load (env->cfg);
+ // GPI_plugins_load (env->cfg);
return api;
}
diff --git a/src/service/fs/gnunet-service-fs_cp.c
b/src/service/fs/gnunet-service-fs_cp.c
index b2b73d0a5..18a25a030 100644
--- a/src/service/fs/gnunet-service-fs_cp.c
+++ b/src/service/fs/gnunet-service-fs_cp.c
@@ -44,7 +44,7 @@
* How often do we flush respect values to disk?
*/
#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
- GNUNET_TIME_UNIT_MINUTES, 5)
+ GNUNET_TIME_UNIT_MINUTES, 5)
/**
* After how long do we discard a reply?
@@ -394,6 +394,11 @@ peer_respect_cb (void *cls,
struct GSF_ConnectedPeer *cp = cls;
GNUNET_assert (NULL != cp->respect_iterate_req);
+ if ((NULL == record) && (NULL == emsg))
+ {
+ cp->respect_iterate_req = NULL;
+ return;
+ }
if ((NULL != record) &&
(sizeof(cp->disk_respect) == record->value_size))
{
@@ -402,8 +407,12 @@ peer_respect_cb (void *cls,
}
GSF_push_start_ (cp);
if (NULL != record)
- GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
+ {
+ GNUNET_PEERSTORE_iteration_stop (cp->respect_iterate_req);
+ return;
+ }
cp->respect_iterate_req = NULL;
+ GNUNET_PEERSTORE_iteration_next (cp->respect_iterate_req, 1);
}
@@ -476,12 +485,12 @@ GSF_peer_connect_handler (void *cls,
GNUNET_CONTAINER_multipeermap_size (cp_map),
GNUNET_NO);
cp->respect_iterate_req
- = GNUNET_PEERSTORE_iterate (peerstore,
- "fs",
- peer,
- "respect",
- &peer_respect_cb,
- cp);
+ = GNUNET_PEERSTORE_iteration_start (peerstore,
+ "fs",
+ peer,
+ "respect",
+ &peer_respect_cb,
+ cp);
GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
cp);
return cp;
@@ -1370,7 +1379,7 @@ GSF_peer_disconnect_handler (void *cls,
GNUNET_NO);
if (NULL != cp->respect_iterate_req)
{
- GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
+ GNUNET_PEERSTORE_iteration_stop (cp->respect_iterate_req);
cp->respect_iterate_req = NULL;
}
GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
diff --git a/src/service/hostlist/gnunet-daemon-hostlist_server.c
b/src/service/hostlist/gnunet-daemon-hostlist_server.c
index d2ef8dd7a..f243deb00 100644
--- a/src/service/hostlist/gnunet-daemon-hostlist_server.c
+++ b/src/service/hostlist/gnunet-daemon-hostlist_server.c
@@ -25,6 +25,7 @@
* @author David Barksdale
* @brief application to provide an integrated hostlist HTTP server
*/
+#include "gnunet_common.h"
#include "platform.h"
#include <microhttpd.h>
#include "gnunet-daemon-hostlist_server.h"
@@ -40,7 +41,7 @@
* time out?
*/
#define GNUNET_ADV_TIMEOUT \
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
/**
* Map with hellos we build the hostlist with.
@@ -82,7 +83,7 @@ struct GNUNET_SCHEDULER_Task *peerstore_notify_task;
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
-static struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+static struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
* Our primary task for IPv4.
@@ -464,13 +465,13 @@ connect_handler (void *cls,
*/
static void
process_notify (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *err_msg)
{
unsigned int map_size;
struct GNUNET_MessageHeader *hello_cpy;
struct GNUNET_PeerIdentity *peer_cpy;
+ struct GNUNET_MessageHeader *hello;
map_size = GNUNET_CONTAINER_multipeermap_size (hellos);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -483,6 +484,7 @@ process_notify (void *cls,
err_msg);
return;
}
+ hello = record->value;
if (NULL != builder)
{
GNUNET_free (builder->data);
@@ -495,7 +497,7 @@ process_notify (void *cls,
}
peer_cpy = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
- GNUNET_memcpy (peer_cpy, peer, sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (peer_cpy, &record->peer, sizeof (struct GNUNET_PeerIdentity));
hello_cpy = GNUNET_malloc (ntohs (hello->size));
GNUNET_memcpy (hello_cpy, hello, ntohs (hello->size));
GNUNET_assert (GNUNET_YES ==
@@ -513,7 +515,8 @@ process_notify (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"1 Peerstore is notifying us to rebuild our hostlist map size %u
peer %s\n",
map_size,
- GNUNET_i2s (peer));
+ GNUNET_i2s (&record->peer));
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
}
@@ -594,6 +597,22 @@ prepare_daemon (struct MHD_Daemon *daemon_handle)
}
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
+}
+
+
static void
start_notify (void *cls)
{
@@ -601,9 +620,16 @@ start_notify (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting to process new hellos to add to hostlist.\n");
- peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (peerstore, GNUNET_NO,
- &process_notify, NULL);
+ peerstore_notify = GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+
GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &process_notify, NULL);
}
@@ -820,9 +846,10 @@ GNUNET_HOSTLIST_server_start (const struct
GNUNET_CONFIGURATION_Handle *c,
hostlist_task_v4 = prepare_daemon (daemon_handle_v4);
if (NULL != daemon_handle_v6)
hostlist_task_v6 = prepare_daemon (daemon_handle_v6);
- peerstore_notify_task = GNUNET_SCHEDULER_add_delayed
(GNUNET_TIME_UNIT_MINUTES,
- start_notify,
- NULL);
+ peerstore_notify_task = GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_UNIT_MINUTES,
+ start_notify,
+ NULL);
return GNUNET_OK;
}
@@ -861,7 +888,7 @@ GNUNET_HOSTLIST_server_stop ()
}
if (NULL != peerstore_notify)
{
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify = NULL;
}
else if (NULL != peerstore_notify_task)
diff --git a/src/service/namestore/gnunet-service-namestore.c
b/src/service/namestore/gnunet-service-namestore.c
index b159ba769..f29058d0e 100644
--- a/src/service/namestore/gnunet-service-namestore.c
+++ b/src/service/namestore/gnunet-service-namestore.c
@@ -109,16 +109,10 @@ struct ZoneIteration
*/
uint32_t offset;
- /**
- * Number of pending cache operations triggered by this zone iteration which
we
- * need to wait for before allowing the client to continue.
- */
- unsigned int cache_ops;
-
/**
* Set to #GNUNET_YES if the last iteration exhausted the limit set by the
* client and we should send the
#GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT_END
- * message and free the data structure once @e cache_ops is zero.
+ * message and free the data structure.
*/
int send_end;
};
@@ -1360,7 +1354,8 @@ handle_edit_record_set (void *cls, const struct
EditRecordSetMessage *er_msg)
env =
GNUNET_MQ_msg_extra (rer_msg,
rlc.rd_ser_len + old_editor_hint_len,
-
GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_RESPONSE);
+
GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_SET_EDIT_RESPONSE)
+ ;
rer_msg->editor_hint_len = htons (old_editor_hint_len);
rer_msg->gns_header.r_id = er_msg->gns_header.r_id;
rer_msg->rd_count = htons (rlc.res_rd_count);
@@ -1859,7 +1854,8 @@ store_record_set (struct NamestoreClient *nc,
lctx.exp.abs_value_us;
rd_nf[rd_nf_count].data = NULL;
rd_nf[rd_nf_count].data_size = 0;
- rd_nf[rd_nf_count].flags = GNUNET_GNSRECORD_RF_PRIVATE |
GNUNET_GNSRECORD_RF_MAINTENANCE;
+ rd_nf[rd_nf_count].flags = GNUNET_GNSRECORD_RF_PRIVATE
+ | GNUNET_GNSRECORD_RF_MAINTENANCE;
rd_nf_count++;
}
if ((0 == strcmp (GNUNET_GNS_EMPTY_LABEL_AT, conv_name)) &&
@@ -2285,8 +2281,7 @@ run_zone_iteration_round (struct ZoneIteration *zi,
uint64_t limit)
"Returned %llu results, more results available\n",
(unsigned long long) limit);
zi->send_end = (0 != proc.limit);
- if (0 == zi->cache_ops)
- zone_iteration_done_client_continue (zi);
+ zone_iteration_done_client_continue (zi);
}
diff --git a/src/service/peerstore/gnunet-service-peerstore.c
b/src/service/peerstore/gnunet-service-peerstore.c
index 90b4e8d88..ee9847164 100644
--- a/src/service/peerstore/gnunet-service-peerstore.c
+++ b/src/service/peerstore/gnunet-service-peerstore.c
@@ -23,6 +23,8 @@
* @brief peerstore service implementation
* @author Omar Tarabai
*/
+#include "gnunet_peerstore_service.h"
+#include "gnunet_protocols.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "peerstore.h"
@@ -36,6 +38,214 @@
*/
#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
+
+/**
+ * A namestore client
+ */
+struct PeerstoreClient;
+
+/**
+ * A peerstore monitor.
+ */
+struct Monitor
+{
+ /**
+ * Next element in the DLL
+ */
+ struct Monitor *next;
+
+ /**
+ * Previous element in the DLL
+ */
+ struct Monitor *prev;
+
+ /**
+ * Namestore client which intiated this zone monitor
+ */
+ struct PeerstoreClient *pc;
+
+ /**
+ * Task active during initial iteration.
+ */
+ struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * Task to warn about slow monitors.
+ */
+ struct GNUNET_SCHEDULER_Task *sa_wait_warning;
+
+ /**
+ * Since when are we blocked on this monitor?
+ */
+ struct GNUNET_TIME_Absolute sa_waiting_start;
+
+ /**
+ * Last sequence number in the zone iteration used to address next
+ * result of the iteration in the store
+ *
+ * Initially set to 0.
+ * Updated in #monitor_iterate_cb()
+ */
+ uint64_t seq;
+
+ /**
+ * Current limit of how many more messages we are allowed
+ * to queue to this monitor.
+ */
+ uint64_t limit;
+
+ /**
+ * How many more requests may we receive from the iterator
+ * before it is at the limit we gave it? Will be below or
+ * equal to @e limit. The effective limit for monitor
+ * events is thus @e iteration_cnt - @e limit!
+ */
+ uint64_t iteration_cnt;
+
+ /**
+ * Are we (still) in the initial iteration pass?
+ */
+ int in_first_iteration;
+
+ /**
+ * Is the peer set?
+ */
+ int peer_set;
+
+ /**
+ * Responsible sub system string
+ */
+ char *sub_system;
+
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Record key string
+ */
+ char *key;
+};
+
+/**
+ * A peerstore iteration operation.
+ */
+struct Iteration
+{
+ /**
+ * Next element in the DLL
+ */
+ struct Iteration *next;
+
+ /**
+ * Previous element in the DLL
+ */
+ struct Iteration *prev;
+
+ /**
+ * Namestore client which intiated this zone iteration
+ */
+ struct PeerstoreClient *pc;
+
+ /**
+ * Last sequence number in the zone iteration used to address next
+ * result of the zone iteration in the store
+ *
+ * Initially set to 0.
+ * Updated in #zone_iterate_proc()
+ */
+ uint64_t seq;
+
+ /**
+ * The operation id for the zone iteration in the response for the client
+ */
+ uint32_t request_id;
+
+ /**
+ * Offset of the zone iteration used to address next result of the zone
+ * iteration in the store
+ *
+ * Initially set to 0 in #handle_iteration_start
+ * Incremented with by every call to #handle_iteration_next
+ */
+ uint32_t offset;
+
+ /**
+ * Set to #GNUNET_YES if the last iteration exhausted the limit set by the
+ * client and we should send the
#GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD_RESULT_END
+ * message and free the data structure.
+ */
+ int send_end;
+
+ /**
+ * Responsible sub system string
+ */
+ char *sub_system;
+
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Record key string
+ */
+ char *key;
+
+ /**
+ * Peer is set?
+ */
+ int peer_set;
+};
+
+/**
+ * A peerstore client
+ */
+struct PeerstoreClient
+{
+ /**
+ * The client
+ */
+ struct GNUNET_SERVICE_Client *client;
+
+ /**
+ * Message queue for transmission to @e client
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Head of the DLL of
+ * Zone iteration operations in progress initiated by this client
+ */
+ struct Iteration *op_head;
+
+ /**
+ * Tail of the DLL of
+ * Zone iteration operations in progress initiated by this client
+ */
+ struct Iteration *op_tail;
+};
+
+
+struct StoreRecordContext
+{
+ /**
+ * The record that was stored.
+ */
+ struct GNUNET_PEERSTORE_Record *record;
+
+ /**
+ * The request ID
+ */
+ uint32_t rid;
+
+ /**
+ * The client
+ */
+ struct PeerstoreClient *pc;
+};
+
/**
* Our configuration.
*/
@@ -51,26 +261,25 @@ static char *db_lib_name;
*/
static struct GNUNET_PEERSTORE_PluginFunctions *db;
-/**
- * Hashmap with all watch requests
- */
-static struct GNUNET_CONTAINER_MultiHashMap *watchers;
-
/**
* Task run to clean up expired records.
*/
static struct GNUNET_SCHEDULER_Task *expire_task;
/**
- * Are we in the process of shutting down the service? #GNUNET_YES / #GNUNET_NO
+ * Monitor DLL
*/
-static int in_shutdown;
+static struct Monitor *monitors_head;
/**
- * Number of connected clients.
+ * Monitor DLL
*/
-static unsigned int num_clients;
+static struct Monitor *monitors_tail;
+/**
+ * Notification context shared by all monitors.
+ */
+static struct GNUNET_NotificationContext *monitor_nc;
/**
* Perform the actual shutdown operations
@@ -86,53 +295,20 @@ do_shutdown ()
GNUNET_free (db_lib_name);
db_lib_name = NULL;
}
- if (NULL != watchers)
- {
- GNUNET_CONTAINER_multihashmap_destroy (watchers);
- watchers = NULL;
- }
if (NULL != expire_task)
{
GNUNET_SCHEDULER_cancel (expire_task);
expire_task = NULL;
}
+ if (NULL != monitor_nc)
+ {
+ GNUNET_notification_context_destroy (monitor_nc);
+ monitor_nc = NULL;
+ }
GNUNET_SCHEDULER_shutdown ();
}
-struct IterationContext
-{
- /**
- * The record that was stored.
- */
- struct GNUNET_PEERSTORE_Record *record;
-
- /**
- * The request ID
- */
- uint32_t rid;
-
-};
-
-struct StoreRecordContext
-{
- /**
- * The record that was stored.
- */
- struct GNUNET_PEERSTORE_Record *record;
-
- /**
- * The request ID
- */
- uint32_t rid;
-
- /**
- * The client
- */
- struct GNUNET_SERVICE_Client *client;
-};
-
-
/**
* Task run during shutdown.
*
@@ -143,9 +319,7 @@ shutdown_task (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Priming PEERSTORE for shutdown.\n");
- in_shutdown = GNUNET_YES;
- if (0 == num_clients) /* Only when no connected clients. */
- do_shutdown ();
+ do_shutdown ();
}
@@ -201,114 +375,123 @@ expire_records_continuation (void *cls, int success)
/**
- * A client disconnected. Remove all of its data structure entries.
+ * Send 'sync' message to zone monitor, we're now in sync.
*
- * @param cls closure, NULL
- * @param client identification of the client
- * @param mq the message queue
- * @return
+ * @param zm monitor that is now in sync
*/
-static void *
-client_connect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- struct GNUNET_MQ_Handle *mq)
+static void
+monitor_sync (struct Monitor *mc)
{
- num_clients++;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *sync;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "A client connected (now %u)\n", num_clients);
- return client;
+ "Synching zone monitor %p\n", mc);
+
+ env = GNUNET_MQ_msg (sync, GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_SYNC);
+ GNUNET_MQ_send (mc->pc->mq, env);
+ /* mark iteration done */
+ mc->in_first_iteration = GNUNET_NO;
+ mc->iteration_cnt = 0;
}
/**
- * Search for a disconnected client and remove it
+ * Given a new record, notifies watchers
*
- * @param cls closuer, a `struct GNUNET_SERVICE_Client`
- * @param key hash of record key
- * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
- * @return #GNUNET_OK to continue iterating
+ * @param record changed record to update watchers with
*/
-static int
-client_disconnect_it (void *cls, const struct GNUNET_HashCode *key, void
*value)
+static void
+watch_notifier (struct GNUNET_PEERSTORE_Record *record)
{
- if (value == cls)
+ struct GNUNET_MQ_Envelope *env;
+ struct Monitor *mc;
+
+ // FIXME this is very inefficient, we may want to use a hash
+ // map again.
+ for (mc = monitors_head; NULL != mc; mc = mc->next)
{
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (watchers, key,
value));
- num_clients++; /* Watchers do not count */
+ if ((GNUNET_YES == mc->peer_set) &&
+ (0 != memcmp (&mc->peer, &record->peer, sizeof (record->peer))))
+ continue;
+ if ((NULL != mc->sub_system) &&
+ (0 != strcmp (mc->sub_system, record->sub_system)))
+ continue;
+ if ((NULL != mc->key) &&
+ (0 != strcmp (mc->key, record->key)))
+ continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found watcher %p to update.\n", mc);
+ env = PEERSTORE_create_record_mq_envelope (
+ 0,
+ record->sub_system,
+ &record->peer,
+ record->key,
+ record->value,
+ record->value_size,
+ record->expiry,
+ 0,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD);
+ GNUNET_MQ_send (mc->pc->mq, env);
}
- return GNUNET_OK;
}
/**
- * A client disconnected. Remove all of its data structure entries.
- *
- * @param cls closure, NULL
- * @param client identification of the client
+ * Context for iteration operations passed from
+ * #run_iteration_round to #iterate_proc as closure
*/
-static void
-client_disconnect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- void *app_cls)
+struct IterationProcResult
{
- num_clients--;
- if (NULL != watchers)
- GNUNET_CONTAINER_multihashmap_iterate (watchers,
- &client_disconnect_it,
- client);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "A client disconnected (%u remaining).\n",
- num_clients);
- if ((0 == num_clients) && in_shutdown)
- do_shutdown ();
-}
+ /**
+ * The zone iteration handle
+ */
+ struct Iteration *ic;
+ /**
+ * Number of results left to be returned in this iteration.
+ */
+ uint64_t limit;
+
+};
/**
- * Function called by for each matching record.
+ * Process results for zone iteration from database
*
- * @param cls closure
- * @param record peerstore record found
- * @param emsg error message or NULL if no errors
- * @return #GNUNET_YES to continue iteration
+ * @param cls struct ZoneIterationProcResult
+ * @param seq sequence number of the record, MUST NOT BE ZERO
+ * @param zone_key the zone key
+ * @param name name
+ * @param rd_count number of records for this name
+ * @param rd record data
*/
static void
-record_iterator (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
+iterate_proc (void *cls,
+ uint64_t seq,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
{
- struct IterationContext *ic = cls;
+ struct IterationProcResult *proc = cls;
struct GNUNET_MQ_Envelope *env;
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error iterating over peerstore: `%s'", emsg);
+ return;
+ }
if (NULL == record)
{
- /* No more records */
- struct PeerstoreResultMessage *endmsg;
-
- env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
- endmsg->rid = ic->rid;
- if (NULL == emsg)
- {
- endmsg->result = htonl (GNUNET_OK);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
- GNUNET_SERVICE_client_continue (ic->record->client);
- }
- else
- {
- endmsg->result = htonl (GNUNET_SYSERR);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
- GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to iterate: %s\n", emsg);
- GNUNET_SERVICE_client_drop (ic->record->client);
- }
- PEERSTORE_destroy_record (ic->record);
- GNUNET_free (ic);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration done\n");
return;
}
-
+ if (0 == proc->limit)
+ {
+ /* what is this!? should never happen */
+ GNUNET_break (0);
+ return;
+ }
+ proc->ic->seq = seq;
env = PEERSTORE_create_record_mq_envelope (
- ic->rid,
+ proc->ic->request_id,
record->sub_system,
&record->peer,
record->key,
@@ -316,132 +499,361 @@ record_iterator (void *cls,
record->value_size,
record->expiry,
0,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD);
+ GNUNET_MQ_send (proc->ic->pc->mq, env);
+ proc->limit--;
}
-
/**
- * Iterator over all watcher clients
- * to notify them of a new record
+ * Function called once we are done with the iteration and
+ * allow the zone iteration client to send us more messages.
*
- * @param cls closure, a `struct GNUNET_PEERSTORE_Record *`
- * @param key hash of record key
- * @param value the watcher client, a `struct GNUNET_SERVICE_Client *`
- * @return #GNUNET_YES to continue iterating
+ * @param zi zone iteration we are processing
*/
-static int
-watch_notifier_it (void *cls, const struct GNUNET_HashCode *key, void *value)
+static void
+iteration_done_client_continue (struct Iteration *ic)
{
- struct GNUNET_PEERSTORE_Record *record = cls;
- struct GNUNET_SERVICE_Client *client = value;
struct GNUNET_MQ_Envelope *env;
+ struct PeerstoreResultMessage *endmsg;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
- env = PEERSTORE_create_record_mq_envelope (
- 0,
- record->sub_system,
- &record->peer,
- record->key,
- record->value,
- record->value_size,
- record->expiry,
- 0,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
- return GNUNET_YES;
+ GNUNET_SERVICE_client_continue (ic->pc->client);
+ if (! ic->send_end)
+ return;
+ /* No more records */
+
+ env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+ endmsg->rid = htons (ic->request_id);
+ endmsg->result = htonl (GNUNET_OK);
+ GNUNET_MQ_send (ic->pc->mq, env);
+ GNUNET_free (ic->key);
+ GNUNET_free (ic->sub_system);
+ GNUNET_CONTAINER_DLL_remove (ic->pc->op_head, ic->pc->op_tail, ic);
+ GNUNET_free (ic);
+ return;
}
+
/**
- * Given a new record, notifies watchers
+ * Perform the next round of the zone iteration.
*
- * @param record changed record to update watchers with
+ * @param ic iterator to process
+ * @param limit number of results to return in one pass
*/
static void
-watch_notifier (struct GNUNET_PEERSTORE_Record *record)
+run_iteration_round (struct Iteration *ic, uint64_t limit)
{
- struct GNUNET_HashCode keyhash;
+ struct IterationProcResult proc;
+ struct GNUNET_TIME_Absolute start;
+ struct GNUNET_TIME_Relative duration;
- PEERSTORE_hash_key (record->sub_system, &record->peer, record->key,
&keyhash);
- GNUNET_CONTAINER_multihashmap_get_multiple (watchers,
- &keyhash,
- &watch_notifier_it,
- record);
+ memset (&proc, 0, sizeof(proc));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to return up to %llu records at position %llu\n",
+ (unsigned long long) limit,
+ (unsigned long long) ic->seq);
+ proc.ic = ic;
+ proc.limit = limit;
+ start = GNUNET_TIME_absolute_get ();
+ GNUNET_break (GNUNET_SYSERR !=
+ db->iterate_records (db->cls,
+ ic->sub_system,
+ (GNUNET_YES == ic->peer_set) ? &ic->peer :
+ NULL,
+ ic->key,
+ ic->seq,
+ proc.limit,
+ &iterate_proc,
+ &proc));
+ duration = GNUNET_TIME_absolute_get_duration (start);
+ duration = GNUNET_TIME_relative_divide (duration, limit - proc.limit);
+ if (0 == proc.limit)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Returned %llu results, more results available\n",
+ (unsigned long long) limit);
+ ic->send_end = (0 != proc.limit);
+ iteration_done_client_continue (ic);
}
/**
- * Handle a watch cancel request from client
+ * Check an iterate request from client
+ *
+ * @param cls client identification of the client
+ * @param srm the actual message
+ * @return #GNUNET_OK if @a srm is well-formed
+ */
+static int
+check_iterate_start (void *cls, const struct
+ PeerstoreIterationStartMessage *srm)
+{
+ uint16_t ss_size;
+ uint16_t key_size;
+ uint16_t size;
+
+ ss_size = ntohs (srm->sub_system_size);
+ key_size = ntohs (srm->key_size);
+ size = ntohs (srm->header.size);
+
+ if (size < key_size + ss_size + sizeof(*srm))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle an iterate request from client
*
* @param cls identification of the client
- * @param hm the actual message
+ * @param srm the actual message
*/
static void
-handle_watch_cancel (void *cls, const struct StoreKeyHashMessage *hm)
+handle_iterate_start (void *cls, const struct PeerstoreIterationStartMessage *
+ srm)
{
- struct GNUNET_SERVICE_Client *client = cls;
+ struct Iteration *ic = GNUNET_new (struct Iteration);
+ uint16_t ss_size;
+ char *ptr;
+
+ ss_size = ntohs (srm->sub_system_size);
+
+ ic->pc = cls;
+ ic->request_id = ntohs (srm->rid);
+ ic->offset = 0;
+ ic->peer_set = (ntohs (srm->peer_set)) ? GNUNET_YES : GNUNET_NO;
+ if (GNUNET_YES == ic->peer_set)
+ ic->peer = srm->peer;
+ ptr = (char*) &srm[1];
+ ic->sub_system = GNUNET_strdup (ptr);
+ ptr += ss_size;
+ if (0 < ntohs (srm->key_size))
+ ic->key = GNUNET_strdup (ptr);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Iterate request: ss `%s', peer `%s', key `%s'\n",
+ ic->sub_system,
+ GNUNET_i2s (&ic->peer),
+ (NULL == ic->key) ? "NULL" : ic->key);
+ GNUNET_CONTAINER_DLL_insert (ic->pc->op_head,
+ ic->pc->op_tail,
+ ic);
+ run_iteration_round (ic, 1);
+}
+
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request.\n");
- if (GNUNET_OK !=
- GNUNET_CONTAINER_multihashmap_remove (watchers, &hm->keyhash, client))
+/**
+ * Handles a #GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATION_STOP message
+ *
+ * @param cls the client sending the message
+ * @param zis_msg message from the client
+ */
+static void
+handle_iterate_stop (void *cls,
+ const struct PeerstoreIterationStopMessage *zis_msg)
+{
+ struct PeerstoreClient *pc = cls;
+ struct Iteration *ic;
+ uint32_t rid;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received ITERATION_STOP message\n");
+ rid = ntohl (zis_msg->rid);
+ for (ic = pc->op_head; NULL != ic; ic = ic->next)
+ if (ic->request_id == rid)
+ break;
+ if (NULL == ic)
{
GNUNET_break (0);
- GNUNET_SERVICE_client_drop (client);
+ GNUNET_SERVICE_client_drop (pc->client);
return;
}
- num_clients++;
- GNUNET_SERVICE_client_continue (client);
+ GNUNET_CONTAINER_DLL_remove (pc->op_head, pc->op_tail, ic);
+ GNUNET_free (ic);
+ GNUNET_SERVICE_client_continue (pc->client);
}
/**
- * Handle a watch request from client
+ * Handles a #GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATION_NEXT message
*
- * @param cls identification of the client
- * @param hm the actual message
+ * @param cls the client sending the message
+ * @param zis_msg message from the client
*/
static void
-handle_watch (void *cls, const struct StoreKeyHashMessage *hm)
+handle_iterate_next (void *cls,
+ const struct PeerstoreIterationNextMessage *is_msg)
{
- struct GNUNET_SERVICE_Client *client = cls;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch request.\n");
- num_clients--; /* do not count watchers */
- GNUNET_SERVICE_client_mark_monitor (client);
- GNUNET_CONTAINER_multihashmap_put (watchers,
- &hm->keyhash,
- client,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- GNUNET_SERVICE_client_continue (client);
+ struct PeerstoreClient *pc = cls;
+ struct Iteration *ic;
+ uint32_t rid;
+ uint64_t limit;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received ITERATION_NEXT message\n");
+ rid = ntohs (is_msg->rid);
+ limit = GNUNET_ntohll (is_msg->limit);
+ for (ic = pc->op_head; NULL != ic; ic = ic->next)
+ if (ic->request_id == rid)
+ break;
+ if (NULL == ic)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Not in iteration...\n");
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (pc->client);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found iteration...\n");
+ run_iteration_round (ic, limit);
}
/**
- * Check an iterate request from client
+ * Obtain the next datum during the monitor's initial iteration.
*
- * @param cls client identification of the client
- * @param srm the actual message
- * @return #GNUNET_OK if @a srm is well-formed
+ * @param cls monitor that does its initial iteration
*/
-static int
-check_iterate (void *cls, const struct StoreRecordMessage *srm)
+static void
+monitor_iteration_next (void *cls);
+
+
+/**
+ * A #GNUNET_NAMESTORE_RecordIterator for monitors.
+ *
+ * @param cls a 'struct ZoneMonitor *' with information about the monitor
+ * @param seq sequence number of the record, MUST NOT BE ZERO
+ * @param zone_key zone key of the zone
+ * @param name name
+ * @param rd_count number of records in @a rd
+ * @param rd array of records
+ */
+static void
+monitor_iterate_cb (void *cls,
+ uint64_t seq,
+ const struct GNUNET_PEERSTORE_Record *record,
+ const char *emsg)
{
- struct GNUNET_PEERSTORE_Record *record;
+ struct Monitor *mc = cls;
+ struct GNUNET_MQ_Envelope *env;
- record = PEERSTORE_parse_record_message (srm);
+ GNUNET_assert (0 != seq);
+ mc->seq = seq;
+
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error iterating over peerstore: `%s'", emsg);
+ return;
+ }
if (NULL == record)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration done\n");
+ return;
+ }
+ if (0 == mc->limit)
+ {
+ /* what is this!? should never happen */
GNUNET_break (0);
- return GNUNET_SYSERR;
+ return;
}
- if (NULL == record->sub_system)
+ env = PEERSTORE_create_record_mq_envelope (
+ 0,
+ record->sub_system,
+ &record->peer,
+ record->key,
+ record->value,
+ record->value_size,
+ record->expiry,
+ 0,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD);
+ GNUNET_MQ_send (mc->pc->mq, env);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sent records.\n");
+ mc->limit--;
+ mc->iteration_cnt--;
+ if ((0 == mc->iteration_cnt) && (0 != mc->limit))
+ {
+ /* We are done with the current iteration batch, AND the
+ client would right now accept more, so go again! */
+ GNUNET_assert (NULL == mc->task);
+ mc->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, mc);
+ }
+}
+
+
+/**
+ * Obtain the next datum during the zone monitor's zone initial iteration.
+ *
+ * @param cls zone monitor that does its initial iteration
+ */
+static void
+monitor_iteration_next (void *cls)
+{
+ struct Monitor *mc = cls;
+ int ret;
+
+ mc->task = NULL;
+ GNUNET_assert (0 == mc->iteration_cnt);
+ if (mc->limit > 16)
+ mc->iteration_cnt = mc->limit / 2; /* leave half for monitor events */
+ else
+ mc->iteration_cnt = mc->limit; /* use it all */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Running iteration\n");
+ ret = db->iterate_records (db->cls,
+ mc->sub_system,
+ (GNUNET_YES == mc->peer_set) ? &mc->peer : NULL,
+ mc->key,
+ mc->seq,
+ mc->iteration_cnt,
+ &monitor_iterate_cb,
+ mc);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_free (mc->key);
+ GNUNET_free (mc->sub_system);
+ GNUNET_free (mc);
+ GNUNET_SERVICE_client_drop (mc->pc->client);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Zone empty... syncing\n");
+ /* empty zone */
+ monitor_sync (mc);
+ return;
+ }
+}
+
+
+/**
+ * Check a monitor request from client
+ *
+ * @param cls client identification of the client
+ * @param srm the actual message
+ * @return #GNUNET_OK if @a srm is well-formed
+ */
+static int
+check_monitor_start (void *cls, const struct PeerstoreMonitorStartMessage *srm)
+{
+ uint16_t ss_size;
+ uint16_t key_size;
+ uint16_t size;
+
+ ss_size = ntohs (srm->sub_system_size);
+ key_size = ntohs (srm->key_size);
+ size = ntohs (srm->header.size);
+
+ if (size < key_size + ss_size + sizeof(*srm))
{
GNUNET_break (0);
- PEERSTORE_destroy_record (record);
return GNUNET_SYSERR;
}
- PEERSTORE_destroy_record (record);
return GNUNET_OK;
}
@@ -453,31 +865,87 @@ check_iterate (void *cls, const struct StoreRecordMessage
*srm)
* @param srm the actual message
*/
static void
-handle_iterate (void *cls, const struct StoreRecordMessage *srm)
+handle_monitor_start (void *cls, const struct PeerstoreMonitorStartMessage
*msm)
+{
+ struct Monitor *mc = GNUNET_new (struct Monitor);
+
+ uint16_t ss_size;
+ char *ptr;
+
+ ss_size = ntohs (msm->sub_system_size);
+
+ mc->pc = cls;
+ mc->peer_set = (ntohs (msm->peer_set)) ? GNUNET_YES : GNUNET_NO;
+ if (GNUNET_YES == mc->peer_set)
+ mc->peer = msm->peer;
+ ptr = (char*) &msm[1];
+ if (0 < ss_size)
+ mc->sub_system = GNUNET_strdup (ptr);
+ ptr += ss_size;
+ if (0 < ntohs (msm->key_size))
+ mc->key = GNUNET_strdup (ptr);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Monitor request: ss `%s', peer `%s', key `%s'\n",
+ mc->sub_system,
+ GNUNET_i2s (&mc->peer),
+ (NULL == mc->key) ? "NULL" : mc->key);
+ mc->in_first_iteration = (GNUNET_YES == ntohs (msm->iterate_first));
+ mc->limit = 1;
+ mc->peer_set = (ntohs (msm->peer_set)) ? GNUNET_YES : GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert (monitors_head,
+ monitors_tail,
+ mc);
+ GNUNET_SERVICE_client_mark_monitor (mc->pc->client);
+ GNUNET_notification_context_add (monitor_nc, mc->pc->mq);
+ if (mc->in_first_iteration)
+ mc->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, mc);
+ else
+ monitor_sync (mc);
+}
+
+
+/**
+ * Handles a #GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT message
+ *
+ * @param cls the client sending the message
+ * @param nm message from the client
+ */
+static void
+handle_monitor_next (void *cls, const struct PeerstoreMonitorNextMessage *nm)
{
- struct IterationContext *ic = GNUNET_new (struct IterationContext);
+ struct PeerstoreClient *pc = cls;
+ struct Monitor *mc;
+ uint64_t inc;
- ic->record = PEERSTORE_parse_record_message (srm);
+ inc = GNUNET_ntohll (nm->limit);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Iterate request: ss `%s', peer `%s', key `%s'\n",
- ic->record->sub_system,
- GNUNET_i2s (&ic->record->peer),
- (NULL == ic->record->key) ? "NULL" : ic->record->key);
- ic->record->client = cls;
- ic->rid = srm->rid;
- if (GNUNET_OK !=
- db->iterate_records (db->cls,
- ic->record->sub_system,
- (ntohs (srm->peer_set)) ? &ic->record->peer : NULL,
- ic->record->key,
- &record_iterator,
- ic))
+ "Received MONITOR_NEXT message with limit %llu\n",
+ (unsigned long long) inc);
+ for (mc = monitors_head; NULL != mc; mc = mc->next)
+ if (mc->pc == pc)
+ break;
+ if (NULL == mc)
{
GNUNET_break (0);
- GNUNET_SERVICE_client_drop (ic->record->client);
- PEERSTORE_destroy_record (ic->record);
- GNUNET_free (ic);
+ GNUNET_SERVICE_client_drop (pc->client);
+ return;
}
+ GNUNET_SERVICE_client_continue (pc->client);
+ if (mc->limit + inc < mc->limit)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (pc->client);
+ return;
+ }
+ mc->limit += inc;
+ if ((mc->in_first_iteration) && (mc->limit == inc))
+ {
+ /* We are still iterating, and the previous iteration must
+ have stopped due to the client's limit, so continue it! */
+ GNUNET_assert (NULL == mc->task);
+ mc->task = GNUNET_SCHEDULER_add_now (&monitor_iteration_next, mc);
+ }
+ GNUNET_assert (mc->iteration_cnt <= mc->limit);
}
@@ -494,13 +962,12 @@ store_record_continuation (void *cls, int success)
struct PeerstoreResultMessage *msg;
struct GNUNET_MQ_Envelope *env;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT);
msg->rid = src->rid;
msg->result = htonl (success);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (src->client), env);
+ GNUNET_MQ_send (src->pc->mq, env);
watch_notifier (src->record);
- GNUNET_SERVICE_client_continue (src->client);
+ GNUNET_SERVICE_client_continue (src->pc->client);
PEERSTORE_destroy_record (src->record);
GNUNET_free (src);
}
@@ -514,7 +981,7 @@ store_record_continuation (void *cls, int success)
* @return #GNUNET_OK if @a srm is well-formed
*/
static int
-check_store (void *cls, const struct StoreRecordMessage *srm)
+check_store (void *cls, const struct PeerstoreRecordMessage *srm)
{
struct GNUNET_PEERSTORE_Record *record;
@@ -542,9 +1009,9 @@ check_store (void *cls, const struct StoreRecordMessage
*srm)
* @param srm the actual message
*/
static void
-handle_store (void *cls, const struct StoreRecordMessage *srm)
+handle_store (void *cls, const struct PeerstoreRecordMessage *srm)
{
- struct GNUNET_SERVICE_Client *client = cls;
+ struct PeerstoreClient *pc = cls;
struct StoreRecordContext *src = GNUNET_new (struct StoreRecordContext);
src->record = PEERSTORE_parse_record_message (srm);
GNUNET_log (
@@ -554,9 +1021,8 @@ handle_store (void *cls, const struct StoreRecordMessage
*srm)
GNUNET_i2s (&src->record->peer),
src->record->key,
(uint32_t) ntohl (srm->options));
- src->record->client = client;
src->rid = srm->rid;
- src->client = client;
+ src->pc = pc;
if (GNUNET_OK != db->store_record (db->cls,
src->record->sub_system,
&src->record->peer,
@@ -571,12 +1037,82 @@ handle_store (void *cls, const struct StoreRecordMessage
*srm)
GNUNET_break (0);
PEERSTORE_destroy_record (src->record);
GNUNET_free (src);
- GNUNET_SERVICE_client_drop (client);
+ GNUNET_SERVICE_client_drop (pc->client);
+ GNUNET_free (pc);
return;
}
}
+/**
+ * A client disconnected. Remove all of its data structure entries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ * @param mq the message queue
+ * @return
+ */
+static void *
+client_connect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ struct GNUNET_MQ_Handle *mq)
+{
+ struct PeerstoreClient *pc;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "A client %p connected\n", client);
+ pc = GNUNET_new (struct PeerstoreClient);
+ pc->client = client;
+ pc->mq = mq;
+ return pc;
+}
+
+
+/**
+ * A client disconnected. Remove all of its data structure entries.
+ *
+ * @param cls closure, NULL
+ * @param client identification of the client
+ */
+static void
+client_disconnect_cb (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ void *app_cls)
+{
+ struct PeerstoreClient *pc = app_cls;
+ struct Iteration *iter;
+ struct Monitor *mo;
+
+ (void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client %p disconnected.\n",
+ client);
+ for (mo = monitors_head; NULL != mo; mo = mo->next)
+ {
+ if (pc != mo->pc)
+ continue;
+ if (NULL != mo->task)
+ {
+ GNUNET_SCHEDULER_cancel (mo->task);
+ mo->task = NULL;
+ }
+ if (NULL != mo->sa_wait_warning)
+ {
+ GNUNET_SCHEDULER_cancel (mo->sa_wait_warning);
+ mo->sa_wait_warning = NULL;
+ }
+ GNUNET_free (mo);
+ break;
+ }
+ while (NULL != (iter = pc->op_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (pc->op_head, pc->op_tail, iter);
+ GNUNET_free (iter);
+ }
+ GNUNET_free (pc);
+}
+
+
static void
store_hello_continuation (void *cls, int success)
{
@@ -601,29 +1137,9 @@ hosts_directory_scan_callback (void *cls, const char
*fullname)
struct GNUNET_HELLO_Builder *builder;
const struct GNUNET_PeerIdentity *pid;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "1 hosts_directory_scan_callback filename %s\n",
- fullname);
-
if (GNUNET_YES != GNUNET_DISK_file_test (fullname))
return GNUNET_OK; /* ignore non-files */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "2 hosts_directory_scan_callback filename %s\n",
- fullname);
-
- /* filename = strrchr (fullname, DIR_SEPARATOR); */
- /* if ((NULL == filename) || (1 > strlen (filename))) */
- /* filename = fullname; */
- /* else */
- /* filename++; */
-
- /* GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, */
- /* "3 hosts_directory_scan_callback filename %s\n", */
- /* filename); */
-
- /* if (GNUNET_YES != GNUNET_DISK_file_test (filename)) */
- /* return GNUNET_OK; */
size_total = GNUNET_DISK_fn_read (fullname, buffer, sizeof(buffer));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Read %d bytes from `%s'\n",
@@ -687,8 +1203,6 @@ run (void *cls,
char *ip;
char *peerdir;
- in_shutdown = GNUNET_NO;
- num_clients = 0;
cfg = c;
if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -713,7 +1227,6 @@ run (void *cls,
GNUNET_SCHEDULER_shutdown ();
return;
}
- watchers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records, NULL);
use_included = GNUNET_CONFIGURATION_get_value_yesno (cfg,
@@ -741,6 +1254,7 @@ run (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_ ("Skipping import of included HELLOs\n"));
}
+ monitor_nc = GNUNET_notification_context_create (1);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
}
@@ -750,27 +1264,27 @@ run (void *cls,
* Define "main" method using service macro.
*/
GNUNET_SERVICE_MAIN (
- "peerstore",
- GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
- &run,
- &client_connect_cb,
- &client_disconnect_cb,
- NULL,
- GNUNET_MQ_hd_var_size (store,
- GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
- struct StoreRecordMessage,
- NULL),
- GNUNET_MQ_hd_var_size (iterate,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE,
- struct StoreRecordMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (watch,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH,
- struct StoreKeyHashMessage,
+ "peerstore", GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, &run, &client_connect_cb,
+ &client_disconnect_cb, NULL,
+ GNUNET_MQ_hd_var_size (store, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
+ struct PeerstoreRecordMessage, NULL),
+ GNUNET_MQ_hd_var_size (iterate_start,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_START,
+ struct PeerstoreIterationStartMessage, NULL),
+ GNUNET_MQ_hd_fixed_size (iterate_stop,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_STOP,
+ struct PeerstoreIterationStopMessage, NULL),
+ GNUNET_MQ_hd_fixed_size (iterate_next,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_NEXT,
+ struct PeerstoreIterationNextMessage,
NULL),
- GNUNET_MQ_hd_fixed_size (watch_cancel,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL,
- struct StoreKeyHashMessage,
+ GNUNET_MQ_hd_var_size (monitor_start,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_START,
+ struct PeerstoreMonitorStartMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (monitor_next,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT,
+ struct PeerstoreMonitorNextMessage,
NULL),
GNUNET_MQ_handler_end ());
diff --git a/src/service/peerstore/meson.build
b/src/service/peerstore/meson.build
index 2c6f7eba8..8f0950b93 100644
--- a/src/service/peerstore/meson.build
+++ b/src/service/peerstore/meson.build
@@ -1,4 +1,5 @@
libgnunetpeerstore_src = ['peerstore_api.c',
+ 'peerstore_api_monitor.c',
'peerstore_common.c']
gnunetservicepeerstore_src = ['gnunet-service-peerstore.c']
@@ -86,7 +87,7 @@ test('test_peerstore_api_watch', testpeerstore_api_watch,
suite: 'peerstore', workdir: meson.current_build_dir())
test('test_peerstore_api_iterate', testpeerstore_api_iterate,
suite: 'peerstore', workdir: meson.current_build_dir())
-test('perf_peerstore_store', testpeerstore_api_perf,
+test('perf_peerstore_api_store', testpeerstore_api_perf,
suite: 'peerstore', workdir: meson.current_build_dir())
diff --git a/src/service/peerstore/peerstore.h
b/src/service/peerstore/peerstore.h
index 26c656f00..c005d329b 100644
--- a/src/service/peerstore/peerstore.h
+++ b/src/service/peerstore/peerstore.h
@@ -34,7 +34,7 @@ GNUNET_NETWORK_STRUCT_BEGIN
/**
* Message carrying a PEERSTORE record message
*/
-struct StoreRecordMessage
+struct PeerstoreRecordMessage
{
/**
* GNUnet message header
@@ -42,25 +42,25 @@ struct StoreRecordMessage
struct GNUNET_MessageHeader header;
/**
- * #GNUNET_YES if peer id value set, #GNUNET_NO otherwise
+ * Peer Identity
*/
- uint16_t peer_set GNUNET_PACKED;
+ struct GNUNET_PeerIdentity peer;
/**
- * Size of the sub_system string
- * Allocated at position 0 after this struct
+ * Expiry time of entry
*/
- uint16_t sub_system_size GNUNET_PACKED;
+ struct GNUNET_TIME_AbsoluteNBO expiry;
- /**
- * Peer Identity
+/**
+ * Request id.
*/
- struct GNUNET_PeerIdentity peer;
+ uint32_t rid GNUNET_PACKED;
/**
- * Expiry time of entry
+ * Options, needed only in case of a
+ * store operation
*/
- struct GNUNET_TIME_AbsoluteNBO expiry;
+ uint32_t /* enum GNUNET_PEERSTORE_StoreOption */ options GNUNET_PACKED;
/**
* Size of the key string
@@ -74,16 +74,17 @@ struct StoreRecordMessage
*/
uint16_t value_size GNUNET_PACKED;
+
/**
- * Request id.
+ * Size of the sub_system string
+ * Allocated at position 0 after this struct
*/
- uint32_t rid GNUNET_PACKED;
+ uint16_t sub_system_size GNUNET_PACKED;
/**
- * Options, needed only in case of a
- * store operation
+ * reserved
*/
- uint32_t /* enum GNUNET_PEERSTORE_StoreOption */ options GNUNET_PACKED;
+ uint16_t reserved GNUNET_PACKED;
/* Followed by key and value */
};
@@ -130,7 +131,148 @@ struct StoreKeyHashMessage
* Hash of a record key
*/
struct GNUNET_HashCode keyhash;
+};
+
+/**
+ * Iteration start message
+ */
+struct PeerstoreMonitorStartMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+
+ /**
+ * Size of the key string
+ * Allocated at position 1 after this struct
+ */
+ uint16_t key_size GNUNET_PACKED;
+
+ /**
+ * #GNUNET_YES if peer id value set, #GNUNET_NO otherwise
+ */
+ uint16_t peer_set GNUNET_PACKED;
+
+ /**
+ * Size of the sub_system string
+ * Allocated at position 0 after this struct
+ */
+ uint16_t sub_system_size GNUNET_PACKED;
+
+ /**
+ * #GNUNET_YES if iterate first, #GNUNET_NO otherwise
+ */
+ uint16_t iterate_first GNUNET_PACKED;
+
+ /* Followed by key */
+};
+
+/**
+ * Iteration next message
+ */
+struct PeerstoreMonitorNextMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Number of records to return.
+ */
+ uint64_t limit GNUNET_PACKED;
+
+};
+
+/**
+ * Iteration start message
+ */
+struct PeerstoreIterationStartMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+
+ /**
+ * #GNUNET_YES if peer id value set, #GNUNET_NO otherwise
+ */
+ uint16_t peer_set GNUNET_PACKED;
+
+ /**
+ * Size of the sub_system string
+ * Allocated at position 0 after this struct
+ */
+ uint16_t sub_system_size GNUNET_PACKED;
+
+ /**
+ * reserved
+ */
+ uint16_t reserved GNUNET_PACKED;
+
+ /**
+ * Size of the key string
+ * Allocated at position 1 after this struct
+ */
+ uint16_t key_size GNUNET_PACKED;
+
+ /* Followed by subsystem and key */
+};
+
+
+/**
+ * Iteration next message
+ */
+struct PeerstoreIterationNextMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Number of records to return.
+ */
+ uint64_t limit GNUNET_PACKED;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+};
+
+struct PeerstoreIterationStopMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
};
GNUNET_NETWORK_STRUCT_END
diff --git a/src/service/peerstore/peerstore_api.c
b/src/service/peerstore/peerstore_api.c
index feb5aeb3c..f52879b5c 100644
--- a/src/service/peerstore/peerstore_api.c
+++ b/src/service/peerstore/peerstore_api.c
@@ -23,6 +23,8 @@
* @author Omar Tarabai
* @author Christian Grothoff
*/
+#include "gnunet_common.h"
+#include "gnunet_protocols.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_hello_uri_lib.h"
@@ -162,6 +164,11 @@ struct GNUNET_PEERSTORE_StoreContext
* Options for the store operation.
*/
enum GNUNET_PEERSTORE_StoreOption options;
+
+ /**
+ * Temporary envelope
+ */
+ struct GNUNET_MQ_Envelope *env;
};
/**
@@ -230,70 +237,13 @@ struct GNUNET_PEERSTORE_IterateContext
*/
uint32_t rid;
-};
-
-/**
- * Context for a watch request
- */
-struct GNUNET_PEERSTORE_WatchContext
-{
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_WatchContext *next;
-
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_WatchContext *prev;
-
- /**
- * Handle to the PEERSTORE service.
- */
- struct GNUNET_PEERSTORE_Handle *h;
-
/**
- * Callback with each record received
- */
- GNUNET_PEERSTORE_Processor callback;
-
- /**
- * Closure for @e callback
+ * Temporary envelope
*/
- void *callback_cls;
-
- /**
- * Hash of the combined key
- */
- struct GNUNET_HashCode keyhash;
-
- /**
- * The iteration context to deliver the actual values for the key.
- */
- struct GNUNET_PEERSTORE_IterateContext *ic;
-
- /**
- * The peer we are watching for values.
- */
- const struct GNUNET_PeerIdentity *peer;
-
- /**
- * The key we like to watch for values.
- */
- const char *key;
-
- /**
- * The sub system requested the watch.
- */
- const char *sub_system;
-
- /**
- * Request ID
- */
- uint32_t rid;
-
+ struct GNUNET_MQ_Envelope *env;
};
+
/**
* Context for the info handler.
*/
@@ -317,7 +267,7 @@ struct GNUNET_PEERSTORE_NotifyContext
/**
* The watch for this context.
*/
- struct GNUNET_PEERSTORE_WatchContext *wc;
+ struct GNUNET_PEERSTORE_Monitor *wc;
/**
* Is this request canceled.
@@ -419,30 +369,6 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
}
-/**
- * Iterator over previous watches to resend them
- *
- * @param cls the `struct GNUNET_PEERSTORE_Handle`
- * @param key key for the watch
- * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
-{
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_WatchContext *wc = value;
- struct StoreKeyHashMessage *hm;
- struct GNUNET_MQ_Envelope *ev;
-
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
- hm->keyhash = wc->keyhash;
- hm->rid = get_op_id (h);
- GNUNET_MQ_send (h->mq, ev);
- return GNUNET_YES;
-}
-
-
/**
* Connect to the PEERSTORE service.
*
@@ -548,6 +474,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
sc->rid = get_op_id (h);
sc->sub_system = GNUNET_strdup (sub_system);
+ GNUNET_assert (NULL != peer);
sc->peer = *peer;
sc->key = GNUNET_strdup (key);
sc->value = GNUNET_memdup (value, size);
@@ -569,7 +496,14 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
- GNUNET_MQ_send (h->mq, ev);
+ if (NULL == h->mq)
+ {
+ sc->env = ev;
+ }
+ else
+ {
+ GNUNET_MQ_send (h->mq, ev);
+ }
return sc;
}
@@ -589,8 +523,7 @@ handle_store_result (void *cls, const struct
PeerstoreResultMessage *msg)
LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerstoreResultMessage\n");
for (sc = h->store_head; NULL != sc; sc = sc->next)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Trying %u ?= %u\n", sc->rid, msg->rid);
- if (sc->rid == msg->rid)
+ if (sc->rid == ntohs (msg->rid))
break;
}
if (NULL == sc)
@@ -623,7 +556,7 @@ handle_iterate_end (void *cls, const struct
PeerstoreResultMessage *msg)
struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
for (ic = h->iterate_head; NULL != ic; ic = ic->next)
- if (ic->rid == msg->rid)
+ if (ic->rid == ntohs (msg->rid))
break;
if (NULL == ic)
{
@@ -646,7 +579,7 @@ handle_iterate_end (void *cls, const struct
PeerstoreResultMessage *msg)
* @param msg message received
*/
static int
-check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
+check_iterate_result (void *cls, const struct PeerstoreRecordMessage *msg)
{
/* we defer validation to #handle_iterate_result */
return GNUNET_OK;
@@ -660,15 +593,15 @@ check_iterate_result (void *cls, const struct
StoreRecordMessage *msg)
* @param msg message received
*/
static void
-handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
+handle_iterate_result (void *cls, const struct PeerstoreRecordMessage *msg)
{
struct GNUNET_PEERSTORE_Handle *h = cls;
struct GNUNET_PEERSTORE_IterateContext *ic;
struct GNUNET_PEERSTORE_Record *record;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received StoreRecordMessage\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received RecordMessage\n");
for (ic = h->iterate_head; NULL != ic; ic = ic->next)
- if (ic->rid == msg->rid)
+ if (ic->rid == ntohs (msg->rid))
break;
if (NULL == ic)
{
@@ -701,8 +634,49 @@ handle_iterate_result (void *cls, const struct
StoreRecordMessage *msg)
* @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
*/
void
-GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
+GNUNET_PEERSTORE_iteration_next (struct GNUNET_PEERSTORE_IterateContext *ic,
+ uint64_t limit)
{
+ struct GNUNET_MQ_Envelope *ev;
+ struct PeerstoreIterationNextMessage *inm;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending PEERSTORE_ITERATION_NEXT message\n");
+ ev = GNUNET_MQ_msg (inm, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_NEXT);
+ inm->rid = htons (ic->rid);
+ inm->limit = GNUNET_htonll (limit);
+ if (NULL == ic->h->mq)
+ {
+ ic->env = ev;
+ }
+ else
+ {
+ GNUNET_MQ_send (ic->h->mq, ev);
+ }
+}
+
+
+/**
+ * Cancel an iterate request
+ * Please do not call after the iterate request is done
+ *
+ * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
+ */
+void
+GNUNET_PEERSTORE_iteration_stop (struct GNUNET_PEERSTORE_IterateContext *ic)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct PeerstoreIterationStopMessage *ism;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending PEERSTORE_ITERATION_STOP message\n");
+ if (NULL != ic->h->mq)
+ {
+ ev = GNUNET_MQ_msg (ism, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+ ism->rid = htons (ic->rid);
+ if (NULL != ic->h->mq)
+ GNUNET_MQ_send (ic->h->mq, ev);
+ }
GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
GNUNET_free (ic->sub_system);
GNUNET_free (ic->key);
@@ -711,34 +685,50 @@ GNUNET_PEERSTORE_iterate_cancel (struct
GNUNET_PEERSTORE_IterateContext *ic)
struct GNUNET_PEERSTORE_IterateContext *
-GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls)
+GNUNET_PEERSTORE_iteration_start (struct GNUNET_PEERSTORE_Handle *h,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls)
{
struct GNUNET_MQ_Envelope *ev;
+ struct PeerstoreIterationStartMessage *srm;
struct GNUNET_PEERSTORE_IterateContext *ic;
+ size_t ss_size;
+ size_t key_size;
+ size_t msg_size;
+ void *dummy;
ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
ic->rid = get_op_id (h);
- ev =
- PEERSTORE_create_record_mq_envelope (ic->rid,
- sub_system,
- peer,
- key,
- NULL,
- 0,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0,
-
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
+
+ GNUNET_assert (NULL != sub_system);
+ ss_size = strlen (sub_system) + 1;
+ if (NULL == key)
+ key_size = 0;
+ else
+ key_size = strlen (key) + 1;
+ msg_size = ss_size + key_size;
+ ev = GNUNET_MQ_msg_extra (srm, msg_size,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_START);
+ srm->key_size = htons (key_size);
+ srm->rid = htons (ic->rid);
+ srm->sub_system_size = htons (ss_size);
+ dummy = &srm[1];
+ GNUNET_memcpy (dummy, sub_system, ss_size);
+ dummy += ss_size;
+ GNUNET_memcpy (dummy, key, key_size);
ic->callback = callback;
ic->callback_cls = callback_cls;
ic->h = h;
ic->sub_system = GNUNET_strdup (sub_system);
if (NULL != peer)
+ {
ic->peer = *peer;
+ srm->peer_set = htons (GNUNET_YES);
+ srm->peer = *peer;
+ }
if (NULL != key)
ic->key = GNUNET_strdup (key);
GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
@@ -750,63 +740,6 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle
*h,
}
-/******************************************************************************/
-/******************* WATCH FUNCTIONS
*********************/
-/******************************************************************************/
-
-/**
- * When a watch record is received, validate it is well-formed.
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
-static int
-check_watch_record (void *cls, const struct StoreRecordMessage *msg)
-{
- /* we defer validation to #handle_watch_result */
- return GNUNET_OK;
-}
-
-
-/**
- * When a watch record is received, process it.
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
-static void
-handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
-{
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_Record *record;
- struct GNUNET_HashCode keyhash;
- struct GNUNET_PEERSTORE_WatchContext *wc;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
- record = PEERSTORE_parse_record_message (msg);
- if (NULL == record)
- {
- disconnect_and_schedule_reconnect (h);
- return;
- }
- PEERSTORE_hash_key (record->sub_system, &record->peer, record->key,
&keyhash);
- // FIXME: what if there are multiple watches for the same key?
- wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
- if (NULL == wc)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _ ("Received a watch result for a non existing watch.\n"));
- PEERSTORE_destroy_record (record);
- disconnect_and_schedule_reconnect (h);
- return;
- }
- h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- if (NULL != wc->callback)
- wc->callback (wc->callback_cls, record, NULL);
- PEERSTORE_destroy_record (record);
-}
-
-
/**
* Close the existing connection to PEERSTORE and reconnect.
*
@@ -826,16 +759,11 @@ reconnect (void *cls)
struct PeerstoreResultMessage,
h),
GNUNET_MQ_hd_var_size (iterate_result,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
- struct StoreRecordMessage,
- h),
- GNUNET_MQ_hd_var_size (watch_record,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
- struct StoreRecordMessage,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD,
+ struct PeerstoreRecordMessage,
h),
GNUNET_MQ_handler_end ()
};
- struct GNUNET_MQ_Envelope *ev;
h->reconnect_task = NULL;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
@@ -853,272 +781,25 @@ reconnect (void *cls)
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Resending pending requests after reconnect.\n");
- if (NULL != h->watches)
- GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL !=
ic;
ic = ic->next)
{
- ic->rid = get_op_id (h);
- ev =
- PEERSTORE_create_record_mq_envelope (ic->rid,
- ic->sub_system,
- &ic->peer,
- ic->key,
- NULL,
- 0,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0,
-
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
- GNUNET_MQ_send (h->mq, ev);
+ GNUNET_MQ_send (h->mq, ic->env);
}
for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
sc = sc->next)
{
- sc->rid = get_op_id (h);
- ev =
- PEERSTORE_create_record_mq_envelope (sc->rid,
- sc->sub_system,
- &sc->peer,
- sc->key,
- sc->value,
- sc->size,
- sc->expiry,
- sc->options,
-
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
- GNUNET_MQ_send (h->mq, ev);
- }
-}
-
-
-/**
- * Cancel a watch request
- *
- * @param wc handle to the watch request
- */
-void
-GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
-{
- struct GNUNET_PEERSTORE_Handle *h = wc->h;
- struct GNUNET_MQ_Envelope *ev;
- struct StoreKeyHashMessage *hm;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Cancelling watch.\n");
- if (NULL != wc->ic)
- {
- GNUNET_PEERSTORE_iterate_cancel (wc->ic);
- GNUNET_free (wc);
- return;
+ GNUNET_MQ_send (h->mq, sc->env);
}
-
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
- hm->keyhash = wc->keyhash;
- hm->rid = get_op_id (h);
- GNUNET_MQ_send (h->mq, ev);
- GNUNET_assert (
- GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
- GNUNET_free (wc);
}
static void
-watch_iterate (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
+hello_store_success (void *cls, int success)
{
- struct GNUNET_PEERSTORE_WatchContext *wc = cls;
- struct GNUNET_PEERSTORE_Handle *h = wc->h;
- struct StoreKeyHashMessage *hm;
- struct GNUNET_MQ_Envelope *ev;
- const struct GNUNET_PeerIdentity *peer;
-
- if (NULL != emsg)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
- wc->callback (wc->callback_cls, NULL, emsg);
- return;
- }
- if ((NULL != record) &&
- (NULL != wc->callback))
- {
- wc->callback (wc->callback_cls, record, NULL);
- return;
- }
-
- if (NULL == wc->peer)
- peer = GNUNET_new (struct GNUNET_PeerIdentity);
- else
- peer = wc->peer;
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
- PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
- hm->rid = get_op_id (h);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Hash key we watch for %s\n",
- GNUNET_h2s_full (&hm->keyhash));
- wc->keyhash = hm->keyhash;
- if (NULL == h->watches)
- h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
- GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
- h->watches,
- &wc->keyhash,
- wc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
- wc->sub_system,
- GNUNET_i2s (peer),
- wc->key);
- GNUNET_MQ_send (h->mq, ev);
- wc->ic = NULL;
- if (NULL != wc->callback)
- wc->callback (wc->callback_cls, record, NULL);
- if (NULL == wc->peer)
- GNUNET_free_nz ((void *) peer);
- return;
-
-}
-
-
-/**
- * Request watching a given key
- * User will be notified with any new values added to key,
- * all existing entries are supplied beforehand.
- *
- * @param h handle to the PEERSTORE service
- * @param sub_system name of sub system
- * @param peer Peer identity
- * @param key entry key string
- * @param callback function called with each new value
- * @param callback_cls closure for @a callback
- * @return Handle to watch request
- */
-struct GNUNET_PEERSTORE_WatchContext *
-GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls)
-{
- struct GNUNET_PEERSTORE_WatchContext *wc;
-
- wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
- wc->callback = callback;
- wc->callback_cls = callback_cls;
- wc->h = h;
- wc->key = key;
- wc->peer = peer;
- wc->sub_system = sub_system;
-
- wc->ic = GNUNET_PEERSTORE_iterate (h,
- sub_system,
- peer,
- key,
- &watch_iterate,
- wc);
-
- return wc;
-}
-
-
-/******************************************************************************/
-/******************* HELLO FUNCTIONS
*********************/
-/******************************************************************************/
-
-
-static void
-hello_updated (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
-{
- struct GNUNET_PEERSTORE_NotifyContext *nc = cls;
- const struct GNUNET_MessageHeader *hello;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated\n");
- if (NULL != emsg)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
- nc->callback (nc->callback_cls, NULL, NULL, emsg);
- return;
- }
- if (NULL == record)
- return;
- hello = record->value;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated with expired %s and size %u for peer %s\n",
- GNUNET_STRINGS_absolute_time_to_string (
- GNUNET_HELLO_builder_get_expiration_time (hello)),
- ntohs (hello->size),
- GNUNET_i2s (&record->peer));
- if ((0 == record->value_size))
- {
- GNUNET_break (0);
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated call callback\n");
- nc->callback (nc->callback_cls, &record->peer, hello, NULL);
-}
-
-
-struct GNUNET_PEERSTORE_NotifyContext *
-GNUNET_PEERSTORE_hello_changed_notify (struct GNUNET_PEERSTORE_Handle *h,
- int include_friend_only,
- GNUNET_PEERSTORE_hello_notify_cb
callback,
- void *callback_cls)
-{
- struct GNUNET_PEERSTORE_NotifyContext *nc;
-
- nc = GNUNET_new (struct GNUNET_PEERSTORE_NotifyContext);
- nc->callback = callback;
- nc->callback_cls = callback_cls;
- nc->h = h;
-
- nc->wc = GNUNET_PEERSTORE_watch (h,
- "peerstore",
- NULL,
- GNUNET_PEERSTORE_HELLO_KEY,
- &hello_updated,
- nc);
-
- return nc;
-}
-
-
-/**
- * Stop notifying about changes.
- *
- * @param nc context to stop notifying
- */
-void
-GNUNET_PEERSTORE_hello_changed_notify_cancel (struct
- GNUNET_PEERSTORE_NotifyContext
*nc)
-{
- if (NULL != nc->wc)
- {
- GNUNET_PEERSTORE_watch_cancel (nc->wc);
- nc->wc = NULL;
- }
- GNUNET_free (nc);
-}
-
-
-static void
-merge_success (void *cls, int success)
-{
- struct StoreHelloCls *shu_cls = cls;
- struct GNUNET_PEERSTORE_StoreHelloContext *huc = shu_cls->huc;
+ struct GNUNET_PEERSTORE_StoreHelloContext *huc = cls;
- if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_remove
(huc->store_context_map,
- huc->pid,
shu_cls->sc))
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "There was no store context to be removed after storing hello for
peer %s\n",
- GNUNET_i2s (huc->pid));
+ huc->sc = NULL;
if (GNUNET_OK != success)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -1127,112 +808,12 @@ merge_success (void *cls, int success)
GNUNET_free (huc->hello);
GNUNET_free (huc->pid);
GNUNET_free (huc);
- GNUNET_free (shu_cls);
- return;
- }
- if (0 == GNUNET_CONTAINER_multipeermap_size (huc->store_context_map))
- {
- GNUNET_PEERSTORE_watch_cancel (huc->wc);
- huc->wc = NULL;
- huc->cont (huc->cont_cls, GNUNET_OK);
- huc->success = GNUNET_OK;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Storing hello uri succeeded for peer %s!\n",
- GNUNET_i2s (huc->pid));
- GNUNET_free (huc->hello);
- GNUNET_free (huc->pid);
- GNUNET_free (huc);
- GNUNET_free (shu_cls);
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got notified during storing hello uri for peer %s!\n",
- GNUNET_i2s (huc->pid));
- GNUNET_free (shu_cls);
-}
-
-
-static void
-store_hello (struct GNUNET_PEERSTORE_StoreHelloContext *huc,
- const struct GNUNET_MessageHeader *hello)
-{
- struct GNUNET_PEERSTORE_Handle *h = huc->h;
- struct GNUNET_PEERSTORE_StoreContext *sc;
- struct StoreHelloCls *shu_cls = GNUNET_new (struct StoreHelloCls);
- struct GNUNET_TIME_Absolute hello_exp;
-
- shu_cls->huc = huc;
- hello_exp = GNUNET_HELLO_builder_get_expiration_time (hello);
- sc = GNUNET_PEERSTORE_store (h,
- "peerstore",
- huc->pid,
- GNUNET_PEERSTORE_HELLO_KEY,
- hello,
- ntohs (hello->size),
- hello_exp,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- merge_success,
- shu_cls);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "store_hello with expiration %s\n",
- GNUNET_STRINGS_absolute_time_to_string (hello_exp));
- GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_put (
- huc->store_context_map,
- huc->pid,
- sc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- shu_cls->sc = sc;
-}
-
-
-// TODO Find a better name for the function. We do not merge, but replace, if
there is a storing process
-// during another store process with a newer hello.
-static void
-merge_uri (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
-{
- struct GNUNET_PEERSTORE_StoreHelloContext *huc = cls;
- struct GNUNET_MessageHeader *hello;
- struct GNUNET_TIME_Absolute huc_hello_exp_time;
- struct GNUNET_TIME_Absolute record_hello_exp_time;
-
- if (NULL != emsg)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
- return;
- }
-
- if (NULL == record && GNUNET_NO == huc->success)
- {
- huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "merge_uri just store for peer %s with expiration %s\n",
- GNUNET_i2s (huc->pid),
- GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
- store_hello (huc, huc->hello);
- }
- else if (GNUNET_NO == huc->success && 0 == GNUNET_memcmp (huc->pid,
- &record->peer))
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "merge_uri record for peer %s\n",
- GNUNET_i2s (&record->peer));
- hello = record->value;
- if ((0 == record->value_size))
- {
- GNUNET_break (0);
- return;
- }
-
- huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
- record_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (hello);
-
- if (GNUNET_TIME_absolute_cmp (huc_hello_exp_time, >,
record_hello_exp_time))
- store_hello (huc, huc->hello);
- }
+ huc->cont (huc->cont_cls, GNUNET_OK);
+ GNUNET_free (huc->hello);
+ GNUNET_free (huc->pid);
+ GNUNET_free (huc);
}
@@ -1258,7 +839,6 @@ GNUNET_PEERSTORE_hello_add (struct GNUNET_PEERSTORE_Handle
*h,
return NULL;
huc = GNUNET_new (struct GNUNET_PEERSTORE_StoreHelloContext);
- huc->store_context_map = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
huc->h = h;
huc->cont = cont;
huc->cont_cls = cont_cls;
@@ -1275,39 +855,26 @@ GNUNET_PEERSTORE_hello_add (struct
GNUNET_PEERSTORE_Handle *h,
GNUNET_i2s (huc->pid),
GNUNET_STRINGS_absolute_time_to_string (huc_exp),
size_msg);
- huc->wc = GNUNET_PEERSTORE_watch (h,
+ huc->sc = GNUNET_PEERSTORE_store (h,
"peerstore",
- NULL,
+ huc->pid,
GNUNET_PEERSTORE_HELLO_KEY,
- &merge_uri,
+ huc->hello,
+ ntohs (huc->hello->size),
+ hello_exp,
+
GNUNET_PEERSTORE_STOREOPTION_UPSERT_LATER_EXPIRY,
+ &hello_store_success,
huc);
GNUNET_HELLO_builder_free (builder);
-
return huc;
}
-static enum GNUNET_GenericReturnValue
-free_store_context (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
-{
- (void) cls;
-
- GNUNET_PEERSTORE_store_cancel ((struct
- GNUNET_PEERSTORE_StoreContext *) value);
- return GNUNET_YES; // FIXME why is this a map anyway
-}
-
-
void
GNUNET_PEERSTORE_hello_add_cancel (struct
GNUNET_PEERSTORE_StoreHelloContext *huc)
{
- GNUNET_PEERSTORE_watch_cancel (huc->wc);
- GNUNET_CONTAINER_multipeermap_iterate (huc->store_context_map,
- free_store_context,
- NULL);
+ GNUNET_PEERSTORE_store_cancel (huc->sc);
GNUNET_free (huc->hello);
GNUNET_free (huc->pid);
GNUNET_free (huc);
diff --git a/src/service/peerstore/peerstore_api_monitor.c
b/src/service/peerstore/peerstore_api_monitor.c
new file mode 100644
index 000000000..8badb43eb
--- /dev/null
+++ b/src/service/peerstore/peerstore_api_monitor.c
@@ -0,0 +1,297 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2013-2024, 2019 GNUnet e.V.
+
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
+ */
+/**
+ * @file service/peerstore/peerstore_api.c
+ * @brief API for peerstore
+ * @author Martin Schanzenbach
+ */
+#include "gnunet_common.h"
+#include "gnunet_protocols.h"
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "peerstore.h"
+#include "peerstore_common.h"
+#include "gnunet_peerstore_service.h"
+
+#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-monitor-api", \
+ __VA_ARGS__)
+/**
+ * Context for a monitor
+ */
+struct GNUNET_PEERSTORE_Monitor
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_Monitor *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_Monitor *prev;
+
+ /**
+ * Function to call on errors.
+ */
+ GNUNET_SCHEDULER_TaskCallback error_cb;
+
+ /**
+ * Closure for @e error_cb.
+ */
+ void *error_cb_cls;
+
+ /**
+ * Callback with each record received
+ */
+ GNUNET_PEERSTORE_Processor callback;
+
+ /**
+ * Closure for @e callback
+ */
+ void *callback_cls;
+
+ /**
+ * Hash of the combined key
+ */
+ struct GNUNET_HashCode keyhash;
+
+ /**
+ * The peer we are watching for values.
+ */
+ const struct GNUNET_PeerIdentity *peer;
+
+ /**
+ * The key we like to watch for values.
+ */
+ const char *key;
+
+ /**
+ * The sub system requested the watch.
+ */
+ const char *sub_system;
+
+ /**
+ * Request ID
+ */
+ uint32_t rid;
+
+ /**
+ * CFG
+ */
+ const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Sync CB
+ */
+ GNUNET_SCHEDULER_TaskCallback sync_cb;
+
+ /**
+ * Sync CB cls
+ */
+ void *sync_cb_cls;
+
+ /**
+ * MQ
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Iterate first flag
+ */
+ int iterate_first;
+};
+
+static void
+handle_sync (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc = cls;
+
+ if (NULL != mc->sync_cb)
+ mc->sync_cb (mc->sync_cb_cls);
+}
+
+
+/**
+ * When a response for iterate request is received, check the
+ * message is well-formed.
+ *
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
+ * @param msg message received
+ */
+static int
+check_result (void *cls, const struct PeerstoreRecordMessage *msg)
+{
+ /* we defer validation to #handle_result */
+ return GNUNET_OK;
+}
+
+
+/**
+ * When a response to monitor is received
+ *
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
+ * @param msg message received
+ */
+static void
+handle_result (void *cls, const struct PeerstoreRecordMessage *msg)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc = cls;
+ struct GNUNET_PEERSTORE_Record *record;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Monitor received RecordMessage\n");
+ record = PEERSTORE_parse_record_message (msg);
+ if (NULL == record)
+ {
+ mc->callback (mc->callback_cls,
+ NULL,
+ _ ("Received a malformed response from service."));
+ }
+ else
+ {
+ mc->callback (mc->callback_cls, record, NULL);
+ PEERSTORE_destroy_record (record);
+ }
+}
+
+
+static void reconnect (struct GNUNET_PEERSTORE_Monitor *mc);
+
+static void
+mq_error_handler (void *cls, enum GNUNET_MQ_Error err)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc = cls;
+
+ reconnect (mc);
+}
+
+
+static void
+reconnect (struct GNUNET_PEERSTORE_Monitor *mc)
+{
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_fixed_size (sync,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_SYNC,
+ struct GNUNET_MessageHeader,
+ mc),
+ GNUNET_MQ_hd_var_size (result,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_RECORD,
+ struct PeerstoreRecordMessage, mc),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct PeerstoreMonitorStartMessage *sm;
+ size_t key_len = 0;
+
+ if (NULL != mc->mq)
+ {
+ GNUNET_MQ_destroy (mc->mq);
+ mc->error_cb (mc->error_cb_cls);
+ }
+ mc->mq = GNUNET_CLIENT_connect (mc->cfg,
+ "peerstore",
+ handlers,
+ &mq_error_handler,
+ mc);
+ if (NULL == mc->mq)
+ return;
+ if (NULL != mc->key)
+ key_len = strlen (mc->key) + 1;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending MONITOR_START\n");
+ env = GNUNET_MQ_msg_extra (sm,
+ htons (key_len),
+ GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_START);
+ sm->iterate_first = htons (mc->iterate_first);
+ if (NULL != mc->peer)
+ {
+ sm->peer = *mc->peer;
+ sm->peer_set = htons (GNUNET_YES);
+ }
+ if (NULL != mc->key)
+ GNUNET_memcpy (&sm[1], mc->key, key_len);
+ sm->key_size = htons (key_len);
+ GNUNET_MQ_send (mc->mq, env);
+}
+
+
+struct GNUNET_PEERSTORE_Monitor *
+GNUNET_PEERSTORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ int iterate_first,
+ const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ GNUNET_SCHEDULER_TaskCallback error_cb,
+ void *error_cb_cls,
+ GNUNET_SCHEDULER_TaskCallback sync_cb,
+ void *sync_cb_cls,
+ GNUNET_PEERSTORE_Processor callback,
+ void *callback_cls)
+{
+ struct GNUNET_PEERSTORE_Monitor *mc;
+
+ mc = GNUNET_new (struct GNUNET_PEERSTORE_Monitor);
+ mc->callback = callback;
+ mc->callback_cls = callback_cls;
+ mc->sync_cb = sync_cb;
+ mc->sync_cb_cls = sync_cb_cls;
+ mc->error_cb = error_cb;
+ mc->error_cb_cls = error_cb_cls;
+ mc->key = key;
+ mc->peer = peer;
+ mc->iterate_first = iterate_first;
+ mc->sub_system = sub_system;
+ mc->cfg = cfg;
+ reconnect (mc);
+ if (NULL == mc->mq)
+ {
+ GNUNET_free (mc);
+ return NULL;
+ }
+ return mc;
+}
+
+
+/**
+ * Stop monitoring.
+ *
+ * @param zm handle to the monitor activity to stop
+ */
+void
+GNUNET_PEERSTORE_monitor_stop (struct GNUNET_PEERSTORE_Monitor *zm)
+{
+ if (NULL != zm->mq)
+ {
+ GNUNET_MQ_destroy (zm->mq);
+ zm->mq = NULL;
+ }
+ GNUNET_free (zm);
+}
+
+
+void
+GNUNET_PEERSTORE_monitor_next (struct GNUNET_PEERSTORE_Monitor *zm,
+ uint64_t limit)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct PeerstoreMonitorNextMessage *nm;
+
+ env = GNUNET_MQ_msg (nm, GNUNET_MESSAGE_TYPE_PEERSTORE_MONITOR_NEXT);
+ nm->limit = GNUNET_htonll (limit);
+ GNUNET_MQ_send (zm->mq, env);
+}
diff --git a/src/service/peerstore/peerstore_common.c
b/src/service/peerstore/peerstore_common.c
index 5d4d06c0c..435444917 100644
--- a/src/service/peerstore/peerstore_common.c
+++ b/src/service/peerstore/peerstore_common.c
@@ -70,7 +70,7 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
enum GNUNET_PEERSTORE_StoreOption options,
uint16_t msg_type)
{
- struct StoreRecordMessage *srm;
+ struct PeerstoreRecordMessage *srm;
struct GNUNET_MQ_Envelope *ev;
size_t ss_size;
size_t key_size;
@@ -87,14 +87,9 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
ev = GNUNET_MQ_msg_extra (srm, msg_size, msg_type);
srm->key_size = htons (key_size);
srm->expiry = GNUNET_TIME_absolute_hton (expiry);
- if (NULL == peer)
- srm->peer_set = htons (GNUNET_NO);
- else
- {
- srm->peer_set = htons (GNUNET_YES);
- srm->peer = *peer;
- }
- srm->rid = rid;
+ GNUNET_assert (NULL != peer);
+ srm->peer = *peer;
+ srm->rid = htons (rid);
srm->sub_system_size = htons (ss_size);
srm->value_size = htons (value_size);
srm->options = htonl (options);
@@ -109,7 +104,7 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
struct GNUNET_PEERSTORE_Record *
-PEERSTORE_parse_record_message (const struct StoreRecordMessage *srm)
+PEERSTORE_parse_record_message (const struct PeerstoreRecordMessage *srm)
{
struct GNUNET_PEERSTORE_Record *record;
uint16_t req_size;
@@ -128,10 +123,7 @@ PEERSTORE_parse_record_message (const struct
StoreRecordMessage *srm)
return NULL;
}
record = GNUNET_new (struct GNUNET_PEERSTORE_Record);
- if (GNUNET_YES == ntohs (srm->peer_set))
- {
- record->peer = srm->peer;
- }
+ record->peer = srm->peer;
record->expiry = GNUNET_TIME_absolute_ntoh (srm->expiry);
dummy = (char *) &srm[1];
if (ss_size > 0)
diff --git a/src/service/peerstore/peerstore_common.h
b/src/service/peerstore/peerstore_common.h
index 56f1f8b8b..b0239e66e 100644
--- a/src/service/peerstore/peerstore_common.h
+++ b/src/service/peerstore/peerstore_common.h
@@ -70,7 +70,7 @@ PEERSTORE_create_record_mq_envelope (uint32_t rid,
* @return Pointer to record or NULL on error
*/
struct GNUNET_PEERSTORE_Record *
-PEERSTORE_parse_record_message (const struct StoreRecordMessage *srm);
+PEERSTORE_parse_record_message (const struct PeerstoreRecordMessage *srm);
/**
diff --git a/src/service/peerstore/perf_peerstore_store.c
b/src/service/peerstore/perf_peerstore_store.c
index e328be93e..3386ccebd 100644
--- a/src/service/peerstore/perf_peerstore_store.c
+++ b/src/service/peerstore/perf_peerstore_store.c
@@ -32,7 +32,7 @@ static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
-static struct GNUNET_PEERSTORE_WatchContext *wc;
+static struct GNUNET_PEERSTORE_Monitor *wc;
static char *ss = "test_peerstore_stress";
static struct GNUNET_PeerIdentity p;
@@ -45,7 +45,7 @@ static int count_fin = 0;
static void
disconnect (void *cls)
{
- GNUNET_PEERSTORE_watch_cancel (wc);
+ GNUNET_PEERSTORE_monitor_stop (wc);
if (NULL != h)
GNUNET_PEERSTORE_disconnect (h);
GNUNET_SCHEDULER_shutdown ();
@@ -56,7 +56,7 @@ static void
store_cont (void *cls, int ret)
{
count_fin++;
- if (count_fin == count)
+ if ((STORES == count) && (count_fin == count))
{
ok = 0;
GNUNET_SCHEDULER_add_now (&disconnect, NULL);
@@ -86,6 +86,22 @@ watch_cb (void *cls, const struct GNUNET_PEERSTORE_Record
*record,
store ();
}
+static void
+error_cb (void *cls)
+{
+ // Never reach this
+}
+
+static void
+sync_cb (void *cls)
+{
+ static int initial_sync = 0;
+ if (1 == initial_sync)
+ return;
+ store ();
+ initial_sync = 1;
+ return;
+}
static void
run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -94,7 +110,11 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle
*cfg,
memset (&p, 5, sizeof(p));
h = GNUNET_PEERSTORE_connect (cfg);
GNUNET_assert (NULL != h);
- wc = GNUNET_PEERSTORE_watch (h, ss, &p, k, &watch_cb, NULL);
+ wc = GNUNET_PEERSTORE_monitor_start (cfg, GNUNET_YES,
+ ss, &p, k,
+ error_cb, NULL,
+ sync_cb, NULL,
+ &watch_cb, NULL);
store ();
}
diff --git a/src/service/peerstore/test_peerstore_api_iterate.c
b/src/service/peerstore/test_peerstore_api_iterate.c
index 0fa58f3fd..8d072d418 100644
--- a/src/service/peerstore/test_peerstore_api_iterate.c
+++ b/src/service/peerstore/test_peerstore_api_iterate.c
@@ -56,12 +56,13 @@ iter3_cb (void *cls,
{
if (NULL != emsg)
{
- GNUNET_PEERSTORE_iterate_cancel (ic);
+ GNUNET_PEERSTORE_iteration_stop (ic);
return;
}
if (NULL != record)
{
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 3);
@@ -77,22 +78,23 @@ iter2_cb (void *cls,
{
if (NULL != emsg)
{
- GNUNET_PEERSTORE_iterate_cancel (ic);
+ GNUNET_PEERSTORE_iteration_stop (ic);
return;
}
if (NULL != record)
{
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 2);
count = 0;
- ic = GNUNET_PEERSTORE_iterate (h,
- ss,
- NULL,
- NULL,
- &iter3_cb,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ ss,
+ NULL,
+ NULL,
+ &iter3_cb,
+ NULL);
}
@@ -103,23 +105,24 @@ iter1_cb (void *cls,
{
if (NULL != emsg)
{
- GNUNET_PEERSTORE_iterate_cancel (ic);
+ GNUNET_PEERSTORE_iteration_stop (ic);
return;
}
if (NULL != record)
{
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u is count\n", count);
GNUNET_assert (count == 1);
count = 0;
- ic = GNUNET_PEERSTORE_iterate (h,
- ss,
- &p1,
- NULL,
- &iter2_cb,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ ss,
+ &p1,
+ NULL,
+ &iter2_cb,
+ NULL);
}
@@ -156,11 +159,11 @@ store_cont (void *cls, int success)
else
{
count = 0;
- ic = GNUNET_PEERSTORE_iterate (h,
- ss,
- &p1,
- k1,
- &iter1_cb, NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ ss,
+ &p1,
+ k1,
+ &iter1_cb, NULL);
return;
}
count++;
diff --git a/src/service/peerstore/test_peerstore_api_store.c
b/src/service/peerstore/test_peerstore_api_store.c
index 8cf0e60a7..1883310b9 100644
--- a/src/service/peerstore/test_peerstore_api_store.c
+++ b/src/service/peerstore/test_peerstore_api_store.c
@@ -30,6 +30,7 @@
static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
+static struct GNUNET_PEERSTORE_IterateContext *ic;
static char *subsystem = "test_peerstore_api_store";
static struct GNUNET_PeerIdentity pid;
@@ -48,6 +49,7 @@ finish (void *cls)
GNUNET_SCHEDULER_shutdown ();
}
+
static void
test3_cont2 (void *cls,
const struct GNUNET_PEERSTORE_Record *record,
@@ -61,6 +63,7 @@ test3_cont2 (void *cls,
GNUNET_assert (0 == strcmp ((char *) val3,
(char *) record->value));
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 1);
@@ -76,12 +79,12 @@ test3_cont (void *cls,
if (GNUNET_YES != success)
return;
count = 0;
- GNUNET_PEERSTORE_iterate (h,
- subsystem,
- &pid,
- key,
- &test3_cont2,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ subsystem,
+ &pid,
+ key,
+ &test3_cont2,
+ NULL);
}
@@ -118,6 +121,7 @@ test2_cont2 (void *cls,
GNUNET_assert ((0 == strcmp ((char *) val1, (char *) record->value)) ||
(0 == strcmp ((char *) val2, (char *) record->value)));
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 2);
@@ -132,11 +136,11 @@ test2_cont (void *cls, int success)
if (GNUNET_YES != success)
return;
count = 0;
- GNUNET_PEERSTORE_iterate (h,
- subsystem,
- &pid, key,
- &test2_cont2,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ subsystem,
+ &pid, key,
+ &test2_cont2,
+ NULL);
}
@@ -174,6 +178,7 @@ test1_cont2 (void *cls,
GNUNET_assert ((strlen (val1) + 1) == record->value_size);
GNUNET_assert (0 == strcmp ((char *) val1, (char *) record->value));
count++;
+ GNUNET_PEERSTORE_iteration_next (ic, 1);
return;
}
GNUNET_assert (count == 1);
@@ -189,12 +194,12 @@ test1_cont (void *cls, int success)
if (GNUNET_YES != success)
return;
count = 0;
- GNUNET_PEERSTORE_iterate (h,
- subsystem,
- &pid,
- key,
- &test1_cont2,
- NULL);
+ ic = GNUNET_PEERSTORE_iteration_start (h,
+ subsystem,
+ &pid,
+ key,
+ &test1_cont2,
+ NULL);
}
diff --git a/src/service/peerstore/test_peerstore_api_watch.c
b/src/service/peerstore/test_peerstore_api_watch.c
index 63b0e896b..e195bb814 100644
--- a/src/service/peerstore/test_peerstore_api_watch.c
+++ b/src/service/peerstore/test_peerstore_api_watch.c
@@ -33,7 +33,7 @@ static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
-static struct GNUNET_PEERSTORE_WatchContext *wc;
+static struct GNUNET_PEERSTORE_Monitor *wc;
static char *ss = "test_peerstore_api_watch";
@@ -46,8 +46,8 @@ static struct GNUNET_PeerIdentity p;
static void
finish (void *cls)
{
- GNUNET_PEERSTORE_watch_cancel (wc);
GNUNET_PEERSTORE_disconnect (h);
+ GNUNET_PEERSTORE_monitor_stop(wc);
GNUNET_SCHEDULER_shutdown ();
}
@@ -76,17 +76,7 @@ watch_cb (void *cls,
const char *emsg)
{
GNUNET_assert (NULL == emsg);
- if (GNUNET_YES == initial_iteration)
- {
- if (NULL != record)
- {
- GNUNET_break (0);
- return; // Ignore this record, FIXME: Test unclean
- }
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &cont, NULL);
- initial_iteration = GNUNET_NO;
- return;
- }
+ GNUNET_assert (GNUNET_YES != initial_iteration);
if (NULL == record)
{
GNUNET_break (0);
@@ -101,6 +91,20 @@ watch_cb (void *cls,
}
+static void
+sync_cb (void *cls)
+{
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &cont, NULL);
+ initial_iteration = GNUNET_NO;
+}
+
+static void
+error_cb (void *cls)
+{
+ // Never reach this
+ GNUNET_assert (0);
+}
+
static void
run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -112,12 +116,17 @@ run (void *cls,
memset (&p,
4,
sizeof(p));
- wc = GNUNET_PEERSTORE_watch (h,
- ss,
- &p,
- k,
- &watch_cb,
- NULL);
+ wc = GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ ss,
+ &p,
+ k,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &watch_cb,
+ NULL);
}
diff --git a/src/service/topology/gnunet-daemon-topology.c
b/src/service/topology/gnunet-daemon-topology.c
index 8b9360e86..588000358 100644
--- a/src/service/topology/gnunet-daemon-topology.c
+++ b/src/service/topology/gnunet-daemon-topology.c
@@ -36,6 +36,7 @@
#include "gnunet_peerstore_service.h"
#include "gnunet_statistics_service.h"
#include "gnunet_transport_application_service.h"
+#include <assert.h>
/**
@@ -118,7 +119,7 @@ struct GNUNET_SCHEDULER_Task *peerstore_notify_task;
* Our peerstore notification context. We use notification
* to instantly learn about new peers as they are discovered.
*/
-static struct GNUNET_PEERSTORE_NotifyContext *peerstore_notify;
+static struct GNUNET_PEERSTORE_Monitor *peerstore_notify;
/**
* Our configuration.
@@ -725,6 +726,22 @@ consider_for_advertising (const struct
GNUNET_MessageHeader *hello)
}
+static void
+error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ _ ("Error in communication with PEERSTORE service to
monitor.\n"));
+ return;
+}
+
+static void
+sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ _ ("Finished initial PEERSTORE iteration in monitor.\n"));
+ return;
+}
+
/**
* PEERSTORE calls this function to let us know about a possible peer
* that we might want to connect to.
@@ -736,28 +753,38 @@ consider_for_advertising (const struct
GNUNET_MessageHeader *hello)
*/
static void
process_peer (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *err_msg)
{
struct Peer *pos;
+ struct GNUNET_MessageHeader *hello;
if (NULL != err_msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
_ ("Error in communication with PEERSTORE service: %s\n"),
err_msg);
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (ps, GNUNET_NO, &process_peer,
- NULL);
+ GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ error_cb,
+ NULL,
+ sync_cb,
+ NULL,
+ &process_peer,
+ NULL);
return;
}
- GNUNET_assert (NULL != peer);
+ GNUNET_assert (NULL != record);
+ hello = record->value;
if (NULL == hello)
{
/* free existing HELLO, if any */
- pos = GNUNET_CONTAINER_multipeermap_get (peers, peer);
+ pos = GNUNET_CONTAINER_multipeermap_get (peers, &record->peer);
if (NULL != pos)
{
GNUNET_free (pos->hello);
@@ -770,13 +797,15 @@ process_peer (void *cls,
if (NULL == pos->mq)
free_peer (NULL, &pos->pid, pos);
}
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
return;
}
consider_for_advertising (hello);
- pos = GNUNET_CONTAINER_multipeermap_get (peers, peer);
+ pos = GNUNET_CONTAINER_multipeermap_get (peers, &record->peer);
if (NULL == pos)
- pos = make_peer (peer, hello);
+ pos = make_peer (&record->peer, hello);
attempt_connect (pos);
+ GNUNET_PEERSTORE_monitor_next (peerstore_notify, 1);
}
@@ -788,7 +817,17 @@ start_notify (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting to process new hellos for gossiping.\n");
peerstore_notify =
- GNUNET_PEERSTORE_hello_changed_notify (ps, GNUNET_NO, &process_peer, NULL);
+ GNUNET_PEERSTORE_monitor_start (cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &error_cb,
+ NULL,
+ &sync_cb,
+ NULL,
+ &process_peer,
+ NULL);
}
@@ -916,7 +955,7 @@ cleaning_task (void *cls)
}
if (NULL != peerstore_notify)
{
- GNUNET_PEERSTORE_hello_changed_notify_cancel (peerstore_notify);
+ GNUNET_PEERSTORE_monitor_stop (peerstore_notify);
peerstore_notify = NULL;
}
else if (NULL != peerstore_notify_task)
diff --git a/src/service/transport/gnunet-communicator-tcp.c
b/src/service/transport/gnunet-communicator-tcp.c
index a79d72331..189febfda 100644
--- a/src/service/transport/gnunet-communicator-tcp.c
+++ b/src/service/transport/gnunet-communicator-tcp.c
@@ -972,17 +972,17 @@ queue_destroy (struct Queue *queue)
}
if (NULL != queue->rekey_monotime_get)
{
- GNUNET_PEERSTORE_iterate_cancel (queue->rekey_monotime_get);
+ GNUNET_PEERSTORE_iteration_stop (queue->rekey_monotime_get);
queue->rekey_monotime_get = NULL;
}
if (NULL != queue->handshake_monotime_get)
{
- GNUNET_PEERSTORE_iterate_cancel (queue->handshake_monotime_get);
+ GNUNET_PEERSTORE_iteration_stop (queue->handshake_monotime_get);
queue->handshake_monotime_get = NULL;
}
if (NULL != queue->handshake_ack_monotime_get)
{
- GNUNET_PEERSTORE_iterate_cancel (queue->handshake_ack_monotime_get);
+ GNUNET_PEERSTORE_iteration_stop (queue->handshake_ack_monotime_get);
queue->handshake_ack_monotime_get = NULL;
}
if (NULL != queue->qh)
@@ -1305,6 +1305,7 @@ rekey_monotime_cb (void *cls,
}
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (queue->rekey_monotime_get, 1);
GNUNET_break (0);
return;
}
@@ -1331,6 +1332,7 @@ rekey_monotime_cb (void *cls,
GNUNET_PEERSTORE_STOREOPTION_REPLACE,
&rekey_monotime_store_cb,
queue);
+ GNUNET_PEERSTORE_iteration_next (queue->rekey_monotime_get, 1);
}
@@ -1402,12 +1404,12 @@ do_rekey (struct Queue *queue, const struct TCPRekey
*rekey)
return;
}
queue->rekey_monotonic_time = rekey->monotonic_time;
- queue->rekey_monotime_get = GNUNET_PEERSTORE_iterate (peerstore,
-
"transport_tcp_communicator",
- &queue->target,
-
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_REKEY,
- &rekey_monotime_cb,
- queue);
+ queue->rekey_monotime_get = GNUNET_PEERSTORE_iteration_start (peerstore,
+
"transport_tcp_communicator",
+ &queue->target,
+
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_REKEY,
+
&rekey_monotime_cb,
+ queue);
gcry_cipher_close (queue->in_cipher);
queue->rekeyed = GNUNET_YES;
setup_in_cipher (&rekey->ephemeral, queue);
@@ -1462,6 +1464,7 @@ handshake_ack_monotime_cb (void *cls,
}
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
GNUNET_break (0);
return;
}
@@ -1489,6 +1492,7 @@ handshake_ack_monotime_cb (void *cls,
&
handshake_ack_monotime_store_cb,
queue);
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
}
@@ -1942,14 +1946,16 @@ try_handle_plaintext (struct Queue *queue)
return 0;
}
- queue->handshake_ack_monotime_get = GNUNET_PEERSTORE_iterate (peerstore,
-
"transport_tcp_communicator",
-
&queue->target
- ,
-
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE_ACK,
- &
-
handshake_ack_monotime_cb,
- queue);
+ queue->handshake_ack_monotime_get = GNUNET_PEERSTORE_iteration_start (
+ peerstore,
+ "transport_tcp_communicator",
+ &queue->
+ target
+ ,
+ GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE_ACK,
+ &
+ handshake_ack_monotime_cb,
+ queue);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Handling plaintext, ack processed!\n");
@@ -2777,6 +2783,7 @@ handshake_monotime_cb (void *cls,
GNUNET_i2s (pid));
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
GNUNET_break (0);
return;
}
@@ -2804,6 +2811,7 @@ handshake_monotime_cb (void *cls,
&
handshake_monotime_store_cb,
queue);
+ GNUNET_PEERSTORE_iteration_next (queue->handshake_ack_monotime_get, 1);
}
@@ -2848,12 +2856,12 @@ decrypt_and_check_tc (struct Queue *queue,
&tc->sender.public_key);
if (GNUNET_YES == ret)
queue->handshake_monotime_get =
- GNUNET_PEERSTORE_iterate (peerstore,
- "transport_tcp_communicator",
- &queue->target,
-
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE,
- &handshake_monotime_cb,
- queue);
+ GNUNET_PEERSTORE_iteration_start (peerstore,
+ "transport_tcp_communicator",
+ &queue->target,
+
GNUNET_PEERSTORE_TRANSPORT_TCP_COMMUNICATOR_HANDSHAKE,
+ &handshake_monotime_cb,
+ queue);
return ret;
}
diff --git a/src/service/transport/gnunet-service-transport.c
b/src/service/transport/gnunet-service-transport.c
index 523ef2120..6d2ba4139 100644
--- a/src/service/transport/gnunet-service-transport.c
+++ b/src/service/transport/gnunet-service-transport.c
@@ -2062,7 +2062,7 @@ struct IncomingRequest
/**
* Notify context for new HELLOs.
*/
- struct GNUNET_PEERSTORE_NotifyContext *nc;
+ struct GNUNET_PEERSTORE_Monitor *nc;
/**
* Which peer is this about?
@@ -2089,7 +2089,7 @@ struct PeerRequest
/**
* Notify context for new HELLOs.
*/
- struct GNUNET_PEERSTORE_NotifyContext *nc;
+ struct GNUNET_PEERSTORE_Monitor *nc;
/**
* What kind of performance preference does this @e tc have?
@@ -2980,7 +2980,8 @@ free_incoming_request (struct IncomingRequest *ir)
GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir);
GNUNET_assert (ir_total > 0);
ir_total--;
- GNUNET_PEERSTORE_hello_changed_notify_cancel (ir->nc);
+ if (NULL != ir->nc)
+ GNUNET_PEERSTORE_monitor_stop (ir->nc);
ir->nc = NULL;
GNUNET_free (ir);
}
@@ -3618,7 +3619,7 @@ free_neighbour (struct Neighbour *neighbour)
}
if (NULL != neighbour->get)
{
- GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
+ GNUNET_PEERSTORE_iteration_stop (neighbour->get);
neighbour->get = NULL;
}
if (NULL != neighbour->sc)
@@ -3996,7 +3997,8 @@ stop_peer_request (void *cls,
struct TransportClient *tc = cls;
struct PeerRequest *pr = value;
- GNUNET_PEERSTORE_hello_changed_notify_cancel (pr->nc);
+ if (NULL != pr->nc)
+ GNUNET_PEERSTORE_monitor_stop (pr->nc);
pr->nc = NULL;
GNUNET_assert (
GNUNET_YES ==
@@ -4072,7 +4074,8 @@ client_disconnect_cb (void *cls,
struct AddressListEntry *ale;
if (NULL != tc->details.communicator.free_queue_entry_task)
- GNUNET_SCHEDULER_cancel
(tc->details.communicator.free_queue_entry_task);
+ GNUNET_SCHEDULER_cancel (
+ tc->details.communicator.free_queue_entry_task);
while (NULL != (q = tc->details.communicator.queue_head))
free_queue (q);
while (NULL != (ale = tc->details.communicator.addr_head))
@@ -4141,9 +4144,9 @@ finish_cmc_handling_with_continue (struct
CommunicatorMessageContext *cmc,
int free_cmc);
static enum GNUNET_GenericReturnValue
-resume_communicators(void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+resume_communicators (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
struct VirtualLink *vl = value;
struct CommunicatorMessageContext *cmc;
@@ -4589,21 +4592,22 @@ free_timedout_queue_entry (void *cls)
while (NULL != qep)
{
struct QueueEntry *pos = qep;
-
+
qep = qep->next;
- struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference
(pos->creation_timestamp, now);
+ struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference (
+ pos->creation_timestamp, now);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"diff to now %s \n",
GNUNET_TIME_relative2s (diff, GNUNET_NO));
- if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, < , diff))
+ if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, <, diff))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Freeing timed out QueueEntry with MID %" PRIu64
- " and QID %u\n",
- pos->mid,
- queue->qid);
- free_queue_entry(pos, tc);
+ "Freeing timed out QueueEntry with MID %" PRIu64
+ " and QID %u\n",
+ pos->mid,
+ queue->qid);
+ free_queue_entry (pos, tc);
}
}
}
@@ -4716,9 +4720,11 @@ queue_send_msg (struct Queue *queue,
struct TransportClient *tc = queue->tc;
if (NULL == tc->details.communicator.free_queue_entry_task)
- tc->details.communicator.free_queue_entry_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-
&free_timedout_queue_entry,
-
tc);
+ tc->details.communicator.free_queue_entry_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &
+ free_timedout_queue_entry,
+ tc);
}
if (NULL != pm && NULL != (pa = pm->pa_head))
{
@@ -4729,7 +4735,8 @@ queue_send_msg (struct Queue *queue,
// GNUNET_CONTAINER_multiuuidmap_get (pending_acks,
&ack[i].ack_uuid.value);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending message MID %" PRIu64
- " of type %u (%u) and size %lu with MQ %p queue %s (QID %u)
pending %" PRIu64 "\n",
+ " of type %u (%u) and size %lu with MQ %p queue %s (QID %u)
pending %"
+ PRIu64 "\n",
GNUNET_ntohll (smt->mid),
ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
ntohs (smt->header.size),
@@ -5699,7 +5706,7 @@ shc_cont (void *cls, int success)
ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
&store_pi,
ale);
- }
+ }
}
@@ -6057,7 +6064,7 @@ handle_raw_message (void *cls, const struct
GNUNET_MessageHeader *mh)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%u items stored in ring buffer\n",
GNUNET_YES == is_ring_buffer_full ? RING_BUFFER_SIZE :
- ring_buffer_head);
+ ring_buffer_head);
/*GNUNET_break_op (0);
GNUNET_STATISTICS_update (GST_stats,
@@ -6369,7 +6376,8 @@ handle_fragment_box (void *cls, const struct
TransportFragmentBoxMessage *fb)
else
rc->msg_missing = 0;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received fragment with size %u at offset %u/%u %u bytes
missing from %s for NEW message %" PRIu64 "\n",
+ "Received fragment with size %u at offset %u/%u %u bytes
missing from %s for NEW message %"
+ PRIu64 "\n",
fsize,
ntohs (fb->frag_off),
msize,
@@ -6644,8 +6652,8 @@ completed_pending_message (struct PendingMessage *pm)
free_pending_message (pm);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"pos frag_off %lu pos bytes_msg %lu pmt %u parent %u\n",
- pos->frag_off,
- pos->bytes_msg,
+ (unsigned long) pos->frag_off,
+ (unsigned long) pos->bytes_msg,
pos->pmt,
NULL == pos->frag_parent ? 1 : 0);
/* check if subtree is done */
@@ -6672,7 +6680,8 @@ completed_pending_message (struct PendingMessage *pm)
}
/* Was this the last applicable fragment? */
- if ((NULL == pos->head_frag) && (NULL == pos->frag_parent || PMT_DV_BOX ==
pos->pmt) &&
+ if ((NULL == pos->head_frag) && (NULL == pos->frag_parent || PMT_DV_BOX ==
+ pos->pmt) &&
(pos->frag_off == pos->bytes_msg))
client_send_response (pos);
return;
@@ -8186,7 +8195,7 @@ forward_dv_box (struct Neighbour *next_hop,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%u items stored in DV ring buffer\n",
GNUNET_YES == is_ring_buffer_dv_full ? RING_BUFFER_SIZE :
- ring_buffer_dv_head);
+ ring_buffer_dv_head);
}
}
}
@@ -8202,7 +8211,7 @@ free_backtalker (struct Backtalker *b)
{
if (NULL != b->get)
{
- GNUNET_PEERSTORE_iterate_cancel (b->get);
+ GNUNET_PEERSTORE_iteration_stop (b->get);
b->get = NULL;
GNUNET_assert (NULL != b->cmc);
finish_cmc_handling (b->cmc);
@@ -8309,6 +8318,7 @@ backtalker_monotime_cb (void *cls,
}
if (sizeof(*mtbe) != record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (b->get, 1);
GNUNET_break (0);
return;
}
@@ -8328,8 +8338,8 @@ backtalker_monotime_cb (void *cls,
/* Setting body_size to 0 prevents call to #forward_backchannel_payload()
*/
b->body_size = 0;
- return;
}
+ GNUNET_PEERSTORE_iteration_next (b->get, 1);
}
@@ -8652,12 +8662,12 @@ handle_dv_box (void *cls, const struct
TransportDVBoxMessage *dvb)
GNUNET_TIME_relative_to_absolute (BACKCHANNEL_INACTIVITY_TIMEOUT);
b->task = GNUNET_SCHEDULER_add_at (b->timeout, &backtalker_timeout_cb, b);
b->get =
- GNUNET_PEERSTORE_iterate (peerstore,
- "transport",
- &b->pid,
-
GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
- &backtalker_monotime_cb,
- b);
+ GNUNET_PEERSTORE_iteration_start (peerstore,
+ "transport",
+ &b->pid,
+
GNUNET_PEERSTORE_TRANSPORT_BACKCHANNEL_MONOTIME,
+ &backtalker_monotime_cb,
+ b);
} /* end actual decryption */
}
@@ -8819,7 +8829,7 @@ start_address_validation (const struct
GNUNET_PeerIdentity *pid,
&vs->challenge,
sizeof(vs->challenge));
vs->address = GNUNET_strdup (address);
- GNUNET_CRYPTO_hash (vs->address, strlen(vs->address), &vs->hc);
+ GNUNET_CRYPTO_hash (vs->address, strlen (vs->address), &vs->hc);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting address validation `%s' of peer %s using challenge
%s\n",
address,
@@ -8887,11 +8897,12 @@ hello_for_incoming_cb (void *cls,
*/
static void
handle_hello_for_incoming (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *emsg)
{
+ struct IncomingRequest *ir = cls;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
if (NULL != emsg)
{
@@ -8900,8 +8911,12 @@ handle_hello_for_incoming (void *cls,
emsg);
return;
}
- if (0 == GNUNET_memcmp (peer, &GST_my_identity))
+ hello = record->value;
+ if (0 == GNUNET_memcmp (&record->peer, &GST_my_identity))
+ {
+ GNUNET_PEERSTORE_monitor_next (ir->nc, 1);
return;
+ }
builder = GNUNET_HELLO_builder_from_msg (hello);
GNUNET_HELLO_builder_iterate (builder,
hello_for_incoming_cb,
@@ -8910,6 +8925,22 @@ handle_hello_for_incoming (void *cls,
}
+static void
+hello_for_incoming_error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+hello_for_incoming_sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
+}
+
+
/**
* Communicator gave us a transport address validation challenge. Process the
* request.
@@ -9008,10 +9039,17 @@ handle_validation_challenge (
ir->pid = sender;
GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir);
- ir->nc = GNUNET_PEERSTORE_hello_changed_notify (peerstore,
- GNUNET_NO,
- &handle_hello_for_incoming,
- NULL);
+ ir->nc = GNUNET_PEERSTORE_monitor_start (GST_cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &hello_for_incoming_error_cb,
+ NULL,
+ &hello_for_incoming_sync_cb,
+ NULL,
+ &handle_hello_for_incoming,
+ ir);
ir_total++;
/* Bound attempts we do in parallel here, might otherwise get excessive */
while (ir_total > MAX_INCOMING_REQUEST)
@@ -9107,6 +9145,7 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const
char *address)
return NULL;
}
+
static void
validation_transmit_on_queue (struct Queue *q, struct ValidationState *vs);
@@ -9139,8 +9178,8 @@ revalidate_map_it (
{
(void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Key in revalidate map %s \n",
- GNUNET_h2s (key));
+ "Key in revalidate map %s \n",
+ GNUNET_h2s (key));
return GNUNET_YES;
}
@@ -9287,7 +9326,7 @@ handle_validation_response (
vs->revalidation_task =
GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_subtract (vs->next_challenge,
GNUNET_TIME_UNIT_MINUTES),
-
&revalidation_start_cb, vs);
+ &revalidation_start_cb, vs);
vs->sc = GNUNET_PEERSTORE_store (peerstore,
"transport",
&cmc->im.sender,
@@ -10022,7 +10061,7 @@ update_pm_next_attempt (struct PendingMessage *pm,
if (PMT_DV_BOX == root->pmt)
root = root->frag_parent;
reorder_root_pm (root, root->next_attempt);
- //root->next_attempt = GNUNET_TIME_UNIT_ZERO_ABS;
+ // root->next_attempt = GNUNET_TIME_UNIT_ZERO_ABS;
}
else
{
@@ -10036,10 +10075,10 @@ update_pm_next_attempt (struct PendingMessage *pm,
next_attempt);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "frag_count %u after factor\n",
- root->frag_count);
+ "frag_count %u after factor\n",
+ root->frag_count);
s1 = GNUNET_TIME_relative_multiply_double (plus_mean,
- factor);
+ factor);
s2 = GNUNET_TIME_relative_divide (plus,
root->frag_count);
plus_mean = GNUNET_TIME_relative_add (s1, s2);
@@ -10966,7 +11005,8 @@ validation_transmit_on_queue (struct Queue *q, struct
ValidationState *vs)
GNUNET_CONTAINER_multihashmap_remove (revalidation_map, &vs->hc, vs);
monotonic_time = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
- if (GNUNET_TIME_UNIT_ZERO_ABS.abs_value_us ==
vs->last_challenge_use.abs_value_us)
+ if (GNUNET_TIME_UNIT_ZERO_ABS.abs_value_us ==
+ vs->last_challenge_use.abs_value_us)
{
vs->first_challenge_use = monotonic_time;
}
@@ -11325,6 +11365,7 @@ neighbour_dv_monotime_cb (void *cls,
}
if (0 == record->value_size)
{
+ GNUNET_PEERSTORE_iteration_next (n->get, 1);
GNUNET_break (0);
return;
}
@@ -11332,6 +11373,7 @@ neighbour_dv_monotime_cb (void *cls,
n->last_dv_learn_monotime =
GNUNET_TIME_absolute_max (n->last_dv_learn_monotime,
GNUNET_TIME_absolute_ntoh (*mtbe));
+ GNUNET_PEERSTORE_iteration_next (n->get, 1);
}
@@ -11389,12 +11431,12 @@ handle_add_queue_message (void *cls,
neighbour,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
neighbour->get =
- GNUNET_PEERSTORE_iterate (peerstore,
- "transport",
- &neighbour->pid,
- GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
- &neighbour_dv_monotime_cb,
- neighbour);
+ GNUNET_PEERSTORE_iteration_start (peerstore,
+ "transport",
+ &neighbour->pid,
+
GNUNET_PEERSTORE_TRANSPORT_DVLEARN_MONOTIME,
+ &neighbour_dv_monotime_cb,
+ neighbour);
}
addr_len = ntohs (aqm->header.size) - sizeof(*aqm);
addr = (const char *) &aqm[1];
@@ -11445,18 +11487,21 @@ handle_add_queue_message (void *cls,
queue->idle = GNUNET_YES;
/* check if valdiations are waiting for the queue */
if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (validation_map,
- &aqm->receiver))
- {
- if (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_get_multiple
(validation_map,
- &aqm->receiver,
-
&check_validation_request_pending,
- queue))
+ &aqm->receiver))
+ {
+ if (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_get_multiple (
+ validation_map,
+ &aqm->
+ receiver,
+ &
+ check_validation_request_pending,
+ queue))
start_address_validation (&aqm->receiver, queue->address);
}
else
start_address_validation (&aqm->receiver, queue->address);
/* look for traffic for this queue */
- //TODO Check whether this makes any sense at all.
+ // TODO Check whether this makes any sense at all.
/*schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);*/
/* might be our first queue, try launching DV learning */
@@ -11658,12 +11703,12 @@ hello_for_client_cb (void *cls,
*/
static void
handle_hello_for_client (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *hello,
+ const struct GNUNET_PEERSTORE_Record *record,
const char *emsg)
{
- (void) cls;
+ struct PeerRequest *pr = cls;
struct GNUNET_HELLO_Builder *builder;
+ struct GNUNET_MessageHeader *hello;
if (NULL != emsg)
{
@@ -11672,8 +11717,12 @@ handle_hello_for_client (void *cls,
emsg);
return;
}
- if (0 == GNUNET_memcmp (peer, &GST_my_identity))
+ hello = record->value;
+ if (0 == GNUNET_memcmp (&record->peer, &GST_my_identity))
+ {
+ GNUNET_PEERSTORE_monitor_next (pr->nc, 1);
return;
+ }
builder = GNUNET_HELLO_builder_from_msg (hello);
GNUNET_HELLO_builder_iterate (builder,
hello_for_client_cb,
@@ -11682,6 +11731,22 @@ handle_hello_for_client (void *cls,
}
+static void
+hello_for_client_error_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Error in PEERSTORE monitoring\n");
+}
+
+
+static void
+hello_for_client_sync_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Done with initial PEERSTORE iteration during monitoring\n");
+}
+
+
/**
* We have received a `struct ExpressPreferenceMessage` from an application
* client.
@@ -11728,10 +11793,18 @@ handle_suggest (void *cls, const struct
ExpressPreferenceMessage *msg)
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- pr->nc = GNUNET_PEERSTORE_hello_changed_notify (peerstore,
- GNUNET_NO,
- &handle_hello_for_client,
- NULL);
+ pr->nc =
+ GNUNET_PEERSTORE_monitor_start (GST_cfg,
+ GNUNET_YES,
+ "peerstore",
+ NULL,
+ GNUNET_PEERSTORE_HELLO_KEY,
+ &hello_for_client_error_cb,
+ NULL,
+ &hello_for_client_sync_cb,
+ NULL,
+ &handle_hello_for_client,
+ pr);
GNUNET_SERVICE_client_continue (tc->client);
}
diff --git a/src/service/transport/transport-testing2.c
b/src/service/transport/transport-testing2.c
index 4081e5111..36a57b2b6 100644
--- a/src/service/transport/transport-testing2.c
+++ b/src/service/transport/transport-testing2.c
@@ -341,7 +341,7 @@ hello_iter_cb (void *cb_cls,
memcpy (p->hello, record->value, p->hello_size);
p->hello[p->hello_size - 1] = '\0';
- GNUNET_PEERSTORE_iterate_cancel (p->pic);
+ GNUNET_PEERSTORE_iteration_stop (p->pic);
p->pic = NULL;
if (NULL != p->start_cb)
{
@@ -352,6 +352,7 @@ hello_iter_cb (void *cb_cls,
p->start_cb (p->start_cb_cls);
p->start_cb = NULL;
}
+ GNUNET_PEERSTORE_iteration_next (p->pic, 1);
}
@@ -360,12 +361,12 @@ retrieve_hello (void *cls)
{
struct GNUNET_TRANSPORT_TESTING_PeerContext *p = cls;
p->rh_task = NULL;
- p->pic = GNUNET_PEERSTORE_iterate (p->ph,
- "transport",
- &p->id,
- GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
- hello_iter_cb,
- p);
+ p->pic = GNUNET_PEERSTORE_iteration_start (p->ph,
+ "transport",
+ &p->id,
+
GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
+ hello_iter_cb,
+ p);
}
@@ -536,7 +537,7 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct
GNUNET_i2s (&p->id));
if (NULL != p->pic)
{
- GNUNET_PEERSTORE_iterate_cancel (p->pic);
+ GNUNET_PEERSTORE_iteration_stop (p->pic);
p->pic = NULL;
}
if (NULL != p->th)
@@ -594,12 +595,12 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct
¬ify_disconnect);
GNUNET_assert (NULL != p->th);
p->ah = GNUNET_TRANSPORT_application_init (p->cfg);
- p->pic = GNUNET_PEERSTORE_iterate (p->ph,
- "transport",
- &p->id,
- GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
- hello_iter_cb,
- p);
+ p->pic = GNUNET_PEERSTORE_iteration_start (p->ph,
+ "transport",
+ &p->id,
+
GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
+ hello_iter_cb,
+ p);
GNUNET_assert (NULL != p->pic);
return GNUNET_OK;
}
@@ -632,7 +633,7 @@ GNUNET_TRANSPORT_TESTING_stop_peer (struct
}
if (NULL != p->pic)
{
- GNUNET_PEERSTORE_iterate_cancel (p->pic);
+ GNUNET_PEERSTORE_iteration_stop (p->pic);
p->pic = NULL;
}
if (NULL != p->th)
diff --git a/src/service/transport/transport_api_cmd_start_peer.c
b/src/service/transport/transport_api_cmd_start_peer.c
index 24d731d77..311289f92 100644
--- a/src/service/transport/transport_api_cmd_start_peer.c
+++ b/src/service/transport/transport_api_cmd_start_peer.c
@@ -67,9 +67,10 @@ hello_iter_cb (void *cb_cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Our hello %s\n",
sps->hello);
- GNUNET_PEERSTORE_iterate_cancel (sps->pic);
+ GNUNET_PEERSTORE_iteration_stop (sps->pic);
sps->pic = NULL;
GNUNET_TESTING_async_finish (&sps->ac);
+ GNUNET_PEERSTORE_iteration_next (sps->pic, 1);
}
@@ -83,12 +84,12 @@ retrieve_hello (void *cls)
{
struct GNUNET_TESTING_StartPeerState *sps = cls;
sps->rh_task = NULL;
- sps->pic = GNUNET_PEERSTORE_iterate (sps->ph,
- "transport",
- &sps->id,
- GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
- hello_iter_cb,
- sps);
+ sps->pic = GNUNET_PEERSTORE_iteration_start (sps->ph,
+ "transport",
+ &sps->id,
+
GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
+ hello_iter_cb,
+ sps);
}
@@ -423,12 +424,13 @@ start_peer_traits (void *cls,
GNUNET_TRANSPORT_TESTING_make_trait_application_handle ((const void *) ah),
GNUNET_TRANSPORT_TESTING_make_trait_peer_id ((const void *) id),
GNUNET_TRANSPORT_TESTING_make_trait_connected_peers_map ((const
- void *)
- connected_peers_map),
+ void *)
+
connected_peers_map),
GNUNET_TRANSPORT_TESTING_make_trait_hello ((const void *) hello),
GNUNET_TRANSPORT_TESTING_make_trait_hello_size ((const void *) hello_size),
GNUNET_TRANSPORT_TESTING_make_trait_state ((const void *) sps),
- GNUNET_TRANSPORT_TESTING_make_trait_broadcast ((const void *)
&sps->broadcast),
+ GNUNET_TRANSPORT_TESTING_make_trait_broadcast ((const
+ void *) &sps->broadcast),
GNUNET_TESTING_trait_end ()
};
diff --git a/src/service/transport/transport_api_cmd_stop_peer.c
b/src/service/transport/transport_api_cmd_stop_peer.c
index 333a3dae7..fbd2b1df3 100644
--- a/src/service/transport/transport_api_cmd_stop_peer.c
+++ b/src/service/transport/transport_api_cmd_stop_peer.c
@@ -68,7 +68,7 @@ stop_peer_run (void *cls,
if (NULL != sps->pic)
{
- GNUNET_PEERSTORE_iterate_cancel (sps->pic);
+ GNUNET_PEERSTORE_iteration_stop (sps->pic);
}
if (NULL != sps->th)
{
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.