[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated: PEERSTORE: Major API overhault to fix a
From: |
gnunet |
Subject: |
[gnunet] branch master updated: PEERSTORE: Major API overhault to fix a variety of race conditions. |
Date: |
Thu, 23 Nov 2023 19:18:37 +0100 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new d51ba6435 PEERSTORE: Major API overhault to fix a variety of race
conditions.
new fbe2292e2 Merge branch 'master' of git+ssh://git.gnunet.org/gnunet
d51ba6435 is described below
commit d51ba6435b62772ccf2a20d13e99c164898bdc9d
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Thu Nov 23 19:18:00 2023 +0100
PEERSTORE: Major API overhault to fix a variety of race conditions.
---
src/include/gnunet_protocols.h | 6 +
src/service/meson.build | 2 +-
src/service/peerstore/Makefile.am | 8 -
src/service/peerstore/gnunet-service-peerstore.c | 133 +++++--
src/service/peerstore/meson.build | 55 +++
src/service/peerstore/peerstore.h | 33 +-
src/service/peerstore/peerstore_api.c | 419 ++++++++++-----------
src/service/peerstore/peerstore_common.c | 17 +-
src/service/peerstore/peerstore_common.h | 4 +-
src/service/peerstore/perf_peerstore_store.c | 32 +-
src/service/peerstore/test_peerstore_api_iterate.c | 84 +++--
src/service/peerstore/test_peerstore_api_store.c | 16 +-
src/service/peerstore/test_peerstore_api_sync.c | 252 -------------
src/service/peerstore/test_peerstore_api_watch.c | 77 +++-
14 files changed, 549 insertions(+), 589 deletions(-)
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 0daa37bed..a998344b9 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2628,6 +2628,12 @@ extern "C" {
*/
#define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL 826
+/**
+ * Store result message
+ */
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT 827
+
+
/*******************************************************************************
* SOCIAL message types
******************************************************************************/
diff --git a/src/service/meson.build b/src/service/meson.build
index b5c4bc1ae..fcd3f7520 100644
--- a/src/service/meson.build
+++ b/src/service/meson.build
@@ -7,8 +7,8 @@ endif
subdir('util')
subdir('statistics')
subdir('arm')
-subdir('peerstore')
subdir('testing')
+subdir('peerstore')
subdir('nat')
subdir('nat-auto')
subdir('transport')
diff --git a/src/service/peerstore/Makefile.am
b/src/service/peerstore/Makefile.am
index 6ca8e7925..a12e8ff8b 100644
--- a/src/service/peerstore/Makefile.am
+++ b/src/service/peerstore/Makefile.am
@@ -44,7 +44,6 @@ check_PROGRAMS = \
test_peerstore_api_store \
test_peerstore_api_iterate \
test_peerstore_api_watch \
- test_peerstore_api_sync \
perf_peerstore_store
EXTRA_DIST = \
@@ -76,13 +75,6 @@ test_peerstore_api_watch_LDADD = \
$(top_builddir)/src/service/testing/libgnunettesting.la \
$(top_builddir)/src/lib/util/libgnunetutil.la
-test_peerstore_api_sync_SOURCES = \
- test_peerstore_api_sync.c
-test_peerstore_api_sync_LDADD = \
- libgnunetpeerstore.la \
- $(top_builddir)/src/service/testing/libgnunettesting.la \
- $(top_builddir)/src/lib/util/libgnunetutil.la
-
perf_peerstore_store_SOURCES = \
perf_peerstore_store.c
perf_peerstore_store_LDADD = \
diff --git a/src/service/peerstore/gnunet-service-peerstore.c
b/src/service/peerstore/gnunet-service-peerstore.c
index 364900674..77523aa2e 100644
--- a/src/service/peerstore/gnunet-service-peerstore.c
+++ b/src/service/peerstore/gnunet-service-peerstore.c
@@ -100,6 +100,39 @@ do_shutdown ()
}
+struct IterationContext
+{
+ /**
+ * The record that was stored.
+ */
+ struct GNUNET_PEERSTORE_Record *record;
+
+ /**
+ * The request ID
+ */
+ uint32_t rid;
+
+};
+
+struct StoreRecordContext
+{
+ /**
+ * The record that was stored.
+ */
+ struct GNUNET_PEERSTORE_Record *record;
+
+ /**
+ * The request ID
+ */
+ uint32_t rid;
+
+ /**
+ * The client
+ */
+ struct GNUNET_SERVICE_Client *client;
+};
+
+
/**
* Task run during shutdown.
*
@@ -245,31 +278,37 @@ record_iterator (void *cls,
const struct GNUNET_PEERSTORE_Record *record,
const char *emsg)
{
- struct GNUNET_PEERSTORE_Record *cls_record = cls;
+ struct IterationContext *ic = cls;
struct GNUNET_MQ_Envelope *env;
if (NULL == record)
{
/* No more records */
- struct GNUNET_MessageHeader *endmsg;
+ struct PeerstoreResultMessage *endmsg;
env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env);
+ endmsg->rid = ic->rid;
if (NULL == emsg)
{
- GNUNET_SERVICE_client_continue (cls_record->client);
+ endmsg->result = htonl (GNUNET_OK);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
+ GNUNET_SERVICE_client_continue (ic->record->client);
}
else
{
+ endmsg->result = htonl (GNUNET_SYSERR);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to iterate: %s\n", emsg);
- GNUNET_SERVICE_client_drop (cls_record->client);
+ GNUNET_SERVICE_client_drop (ic->record->client);
}
- PEERSTORE_destroy_record (cls_record);
+ PEERSTORE_destroy_record (ic->record);
+ GNUNET_free (ic);
return;
}
env = PEERSTORE_create_record_mq_envelope (
+ ic->rid,
record->sub_system,
&record->peer,
record->key,
@@ -278,7 +317,7 @@ record_iterator (void *cls,
record->expiry,
0,
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
}
@@ -300,6 +339,7 @@ watch_notifier_it (void *cls, const struct GNUNET_HashCode
*key, void *value)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
env = PEERSTORE_create_record_mq_envelope (
+ 0,
record->sub_system,
&record->peer,
record->key,
@@ -415,27 +455,28 @@ check_iterate (void *cls, const struct StoreRecordMessage
*srm)
static void
handle_iterate (void *cls, const struct StoreRecordMessage *srm)
{
- struct GNUNET_SERVICE_Client *client = cls;
- struct GNUNET_PEERSTORE_Record *record;
+ struct IterationContext *ic = GNUNET_new (struct IterationContext);
- record = PEERSTORE_parse_record_message (srm);
+ ic->record = PEERSTORE_parse_record_message (srm);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Iterate request: ss `%s', peer `%s', key `%s'\n",
- record->sub_system,
- GNUNET_i2s (&record->peer),
- (NULL == record->key) ? "NULL" : record->key);
- record->client = client;
+ ic->record->sub_system,
+ GNUNET_i2s (&ic->record->peer),
+ (NULL == ic->record->key) ? "NULL" : ic->record->key);
+ ic->record->client = cls;
+ ic->rid = srm->rid;
if (GNUNET_OK !=
db->iterate_records (db->cls,
- record->sub_system,
- (ntohs (srm->peer_set)) ? &record->peer : NULL,
- record->key,
+ ic->record->sub_system,
+ (ntohs (srm->peer_set)) ? &ic->record->peer : NULL,
+ ic->record->key,
&record_iterator,
- record))
+ ic))
{
GNUNET_break (0);
- GNUNET_SERVICE_client_drop (client);
- PEERSTORE_destroy_record (record);
+ GNUNET_SERVICE_client_drop (ic->record->client);
+ PEERSTORE_destroy_record (ic->record);
+ GNUNET_free (ic);
}
}
@@ -449,19 +490,28 @@ handle_iterate (void *cls, const struct
StoreRecordMessage *srm)
static void
store_record_continuation (void *cls, int success)
{
- struct GNUNET_PEERSTORE_Record *record = cls;
+ struct StoreRecordContext *src = cls;
+ struct PeerstoreResultMessage *msg;
+ struct GNUNET_MQ_Envelope *env;
if (GNUNET_OK == success)
{
- watch_notifier (record);
- GNUNET_SERVICE_client_continue (record->client);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
+ env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT);
+ msg->rid = src->rid;
+ msg->result = htonl (success);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (src->client), env);
+ watch_notifier (src->record);
+ GNUNET_SERVICE_client_continue (src->client);
}
else
{
GNUNET_break (0);
- GNUNET_SERVICE_client_drop (record->client);
+ GNUNET_SERVICE_client_drop (src->record->client);
}
- PEERSTORE_destroy_record (record);
+ PEERSTORE_destroy_record (src->record);
+ GNUNET_free (src);
}
@@ -504,35 +554,38 @@ static void
handle_store (void *cls, const struct StoreRecordMessage *srm)
{
struct GNUNET_SERVICE_Client *client = cls;
- struct GNUNET_PEERSTORE_Record *record;
-
- record = PEERSTORE_parse_record_message (srm);
+ struct StoreRecordContext *src = GNUNET_new (struct StoreRecordContext);
+ src->record = PEERSTORE_parse_record_message (srm);
GNUNET_log (
GNUNET_ERROR_TYPE_DEBUG,
"Received a store request. Sub system `%s' Peer `%s Key `%s' Options:
%u.\n",
- record->sub_system,
- GNUNET_i2s (&record->peer),
- record->key,
+ src->record->sub_system,
+ GNUNET_i2s (&src->record->peer),
+ src->record->key,
(uint32_t) ntohl (srm->options));
- record->client = client;
+ src->record->client = client;
+ src->rid = srm->rid;
+ src->client = client;
if (GNUNET_OK != db->store_record (db->cls,
- record->sub_system,
- &record->peer,
- record->key,
- record->value,
- record->value_size,
- record->expiry,
+ src->record->sub_system,
+ &src->record->peer,
+ src->record->key,
+ src->record->value,
+ src->record->value_size,
+ src->record->expiry,
ntohl (srm->options),
&store_record_continuation,
- record))
+ src))
{
GNUNET_break (0);
- PEERSTORE_destroy_record (record);
+ PEERSTORE_destroy_record (src->record);
+ GNUNET_free (src);
GNUNET_SERVICE_client_drop (client);
return;
}
}
+
static void
store_hello_continuation (void *cls, int success)
{
diff --git a/src/service/peerstore/meson.build
b/src/service/peerstore/meson.build
index db70b0b9e..2c6f7eba8 100644
--- a/src/service/peerstore/meson.build
+++ b/src/service/peerstore/meson.build
@@ -37,3 +37,58 @@ executable ('gnunet-service-peerstore',
install: true,
install_dir: get_option('libdir') / 'gnunet' / 'libexec')
+testpeerstore_api_iterate = executable ('test_peerstore_api_iterate',
+ ['test_peerstore_api_iterate.c'],
+ dependencies: [
+ libgnunetpeerstore_dep,
+ libgnunettesting_dep,
+ libgnunetutil_dep
+ ],
+ include_directories: [incdir, configuration_inc],
+ install: false)
+
+testpeerstore_api_store = executable ('test_peerstore_api_store',
+ ['test_peerstore_api_store.c'],
+ dependencies: [
+ libgnunetpeerstore_dep,
+ libgnunetutil_dep,
+ libgnunettesting_dep,
+ ],
+ include_directories: [incdir, configuration_inc],
+ install: false)
+
+testpeerstore_api_watch = executable ('test_peerstore_api_watch',
+ ['test_peerstore_api_watch.c'],
+ dependencies: [
+ libgnunetpeerstore_dep,
+ libgnunetutil_dep,
+ libgnunettesting_dep,
+ ],
+ include_directories: [incdir, configuration_inc],
+ install: false)
+testpeerstore_api_perf = executable ('perf_peerstore_store',
+ ['perf_peerstore_store.c'],
+ dependencies: [
+ libgnunetpeerstore_dep,
+ libgnunetutil_dep,
+ libgnunettesting_dep,
+ ],
+ include_directories: [incdir, configuration_inc],
+ install: false)
+
+configure_file(input : 'test_peerstore_api_data.conf',
+ output : 'test_peerstore_api_data.conf',
+ copy: true)
+
+test('test_peerstore_api_store', testpeerstore_api_store,
+ suite: 'peerstore', workdir: meson.current_build_dir())
+test('test_peerstore_api_watch', testpeerstore_api_watch,
+ suite: 'peerstore', workdir: meson.current_build_dir())
+test('test_peerstore_api_iterate', testpeerstore_api_iterate,
+ suite: 'peerstore', workdir: meson.current_build_dir())
+test('perf_peerstore_store', testpeerstore_api_perf,
+ suite: 'peerstore', workdir: meson.current_build_dir())
+
+
+
+
diff --git a/src/service/peerstore/peerstore.h
b/src/service/peerstore/peerstore.h
index 0dec03443..26c656f00 100644
--- a/src/service/peerstore/peerstore.h
+++ b/src/service/peerstore/peerstore.h
@@ -30,6 +30,7 @@
GNUNET_NETWORK_STRUCT_BEGIN
+
/**
* Message carrying a PEERSTORE record message
*/
@@ -73,6 +74,11 @@ struct StoreRecordMessage
*/
uint16_t value_size GNUNET_PACKED;
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+
/**
* Options, needed only in case of a
* store operation
@@ -82,6 +88,28 @@ struct StoreRecordMessage
/* Followed by key and value */
};
+/**
+ * Message carrying a PEERSTORE result message
+ */
+struct PeerstoreResultMessage
+{
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Request id.
+ */
+ uint32_t rid GNUNET_PACKED;
+
+ /**
+ * Options, needed only in case of a
+ * store operation
+ */
+ uint32_t result GNUNET_PACKED;
+
+};
/**
* Message carrying record key hash
@@ -94,14 +122,15 @@ struct StoreKeyHashMessage
struct GNUNET_MessageHeader header;
/**
- * Always 0, for alignment.
+ * Request id.
*/
- uint32_t reserved GNUNET_PACKED;
+ uint32_t rid GNUNET_PACKED;
/**
* Hash of a record key
*/
struct GNUNET_HashCode keyhash;
+
};
GNUNET_NETWORK_STRUCT_END
diff --git a/src/service/peerstore/peerstore_api.c
b/src/service/peerstore/peerstore_api.c
index 3dec7e01b..394f64378 100644
--- a/src/service/peerstore/peerstore_api.c
+++ b/src/service/peerstore/peerstore_api.c
@@ -23,6 +23,7 @@
* @author Omar Tarabai
* @author Christian Grothoff
*/
+#include "gnunet_common.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_hello_uri_lib.h"
@@ -86,6 +87,11 @@ struct GNUNET_PEERSTORE_Handle
*/
struct GNUNET_TIME_Relative reconnect_delay;
+ /**
+ *
+ */
+ uint32_t last_op_id;
+
};
/**
@@ -113,6 +119,11 @@ struct GNUNET_PEERSTORE_StoreContext
*/
GNUNET_PEERSTORE_Continuation cont;
+ /**
+ * Request ID
+ */
+ uint32_t rid;
+
/**
* Closure for @e cont
*/
@@ -216,9 +227,10 @@ struct GNUNET_PEERSTORE_IterateContext
void *callback_cls;
/**
- * #GNUNET_YES if we are currently processing records.
+ * Request ID
*/
- int iterating;
+ uint32_t rid;
+
};
/**
@@ -275,6 +287,12 @@ struct GNUNET_PEERSTORE_WatchContext
* The sub system requested the watch.
*/
const char *sub_system;
+
+ /**
+ * Request ID
+ */
+ uint32_t rid;
+
};
/**
@@ -306,6 +324,12 @@ struct GNUNET_PEERSTORE_NotifyContext
* Is this request canceled.
*/
unsigned int canceled;
+
+ /**
+ * Request ID
+ */
+ uint32_t rid;
+
};
/******************************************************************************/
@@ -320,6 +344,18 @@ struct GNUNET_PEERSTORE_NotifyContext
static void
reconnect (void *cls);
+/**
+ * Get a fresh operation id to distinguish between namestore requests
+ *
+ * @param h the namestore handle
+ * @return next operation id to use
+ */
+static uint32_t
+get_op_id (struct GNUNET_PEERSTORE_Handle *h)
+{
+ return h->last_op_id++;
+}
+
/**
* Disconnect from the peerstore service.
@@ -329,25 +365,13 @@ reconnect (void *cls);
static void
disconnect (struct GNUNET_PEERSTORE_Handle *h)
{
- struct GNUNET_PEERSTORE_IterateContext *next;
-
- for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL !=
ic;
- ic = next)
+ if (NULL != h->watches)
{
- next = ic->next;
- if (GNUNET_YES == ic->iterating)
- {
- GNUNET_PEERSTORE_Processor icb;
- void *icb_cls;
-
- icb = ic->callback;
- icb_cls = ic->callback_cls;
- GNUNET_PEERSTORE_iterate_cancel (ic);
- if (NULL != icb)
- icb (icb_cls, NULL, "Iteration canceled due to reconnection");
- }
+ GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (h->watches));
+ GNUNET_CONTAINER_multihashmap_destroy (h->watches);
}
-
+ GNUNET_assert (NULL == h->iterate_head);
+ GNUNET_assert (NULL == h->store_head);
if (NULL != h->mq)
{
GNUNET_MQ_destroy (h->mq);
@@ -376,29 +400,6 @@ disconnect_and_schedule_reconnect (struct
GNUNET_PEERSTORE_Handle *h)
}
-/**
- * Callback after MQ envelope is sent
- *
- * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
- */
-static void
-store_request_sent (void *cls)
-{
- struct GNUNET_PEERSTORE_StoreContext *sc = cls;
- GNUNET_PEERSTORE_Continuation cont;
- void *cont_cls;
-
- if (NULL != sc)
- {
- cont = sc->cont;
- cont_cls = sc->cont_cls;
- GNUNET_PEERSTORE_store_cancel (sc);
- if (NULL != cont)
- cont (cont_cls, GNUNET_OK);
- }
-}
-
-
/******************************************************************************/
/******************* CONNECTION FUNCTIONS
*********************/
/******************************************************************************/
@@ -437,29 +438,12 @@ rewatch_it (void *cls, const struct GNUNET_HashCode *key,
void *value)
ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
hm->keyhash = wc->keyhash;
+ hm->rid = get_op_id (h);
GNUNET_MQ_send (h->mq, ev);
return GNUNET_YES;
}
-/**
- * Iterator over watch requests to cancel them.
- *
- * @param cls unused
- * @param key key to the watch request
- * @param value watch context
- * @return #GNUNET_YES to continue iteration
- */
-static int
-destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
-{
- struct GNUNET_PEERSTORE_WatchContext *wc = value;
-
- GNUNET_PEERSTORE_watch_cancel (wc);
- return GNUNET_YES;
-}
-
-
/**
* Connect to the PEERSTORE service.
*
@@ -493,26 +477,7 @@ GNUNET_PEERSTORE_connect (const struct
GNUNET_CONFIGURATION_Handle *cfg)
void
GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h)
{
- struct GNUNET_PEERSTORE_IterateContext *ic;
- struct GNUNET_PEERSTORE_StoreContext *sc;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
- if (NULL != h->watches)
- {
- GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
- GNUNET_CONTAINER_multihashmap_destroy (h->watches);
- h->watches = NULL;
- }
- while (NULL != (ic = h->iterate_head))
- {
- GNUNET_break (0);
- GNUNET_PEERSTORE_iterate_cancel (ic);
- }
- while (NULL != (sc = h->store_head))
- {
- GNUNET_break (0);
- GNUNET_PEERSTORE_store_cancel (sc);
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnect initiated from client.\n");
disconnect (h);
}
@@ -530,17 +495,17 @@ GNUNET_PEERSTORE_disconnect (struct
GNUNET_PEERSTORE_Handle *h)
void
GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "store cancel with sc %p \n",
- sc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel with sc %p \n",
+ sc);
GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
GNUNET_free (sc->sub_system);
GNUNET_free (sc->value);
GNUNET_free (sc->key);
GNUNET_free (sc);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "store cancel with sc %p is null\n",
- sc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "store cancel with sc %p is null\n",
+ sc);
}
@@ -581,17 +546,8 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
sub_system,
GNUNET_i2s (peer),
key);
- ev =
- PEERSTORE_create_record_mq_envelope (sub_system,
- peer,
- key,
- value,
- size,
- expiry,
- options,
- GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
-
+ sc->rid = get_op_id (h);
sc->sub_system = GNUNET_strdup (sub_system);
sc->peer = *peer;
sc->key = GNUNET_strdup (key);
@@ -602,14 +558,54 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
sc->cont = cont;
sc->cont_cls = cont_cls;
sc->h = h;
+ ev =
+ PEERSTORE_create_record_mq_envelope (sc->rid,
+ sub_system,
+ peer,
+ key,
+ value,
+ size,
+ expiry,
+ options,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
- GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
GNUNET_MQ_send (h->mq, ev);
return sc;
}
+/**
+ * When a response for store request is received
+ *
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
+ * @param msg message received
+ */
+static void
+handle_store_result (void *cls, const struct PeerstoreResultMessage *msg)
+{
+ struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerstoreResultMessage\n");
+ for (sc = h->store_head; NULL != sc; sc = sc->next)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Trying %u ?= %u\n", sc->rid, msg->rid);
+ if (sc->rid == msg->rid)
+ break;
+ }
+ if (NULL == sc)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ _("Unexpected store response.\n"));
+ return;
+ }
+ if (NULL != sc->cont)
+ sc->cont (sc->cont_cls, ntohl (msg->result));
+ GNUNET_CONTAINER_DLL_remove (h->store_head, h->store_tail, sc);
+}
+
+
/******************************************************************************/
/******************* ITERATE FUNCTIONS
*********************/
/******************************************************************************/
@@ -622,29 +618,24 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
* @param msg message received
*/
static void
-handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
+handle_iterate_end (void *cls, const struct PeerstoreResultMessage *msg)
{
struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_IterateContext *ic;
- GNUNET_PEERSTORE_Processor callback;
- void *callback_cls;
+ struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
- ic = h->iterate_head;
+ for (ic = h->iterate_head; NULL != ic; ic = ic->next)
+ if (ic->rid == msg->rid)
+ break;
if (NULL == ic)
{
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _ ("Unexpected iteration response, this should not happen.\n"));
- disconnect_and_schedule_reconnect (h);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ _ ("Unexpected iteration response.\n"));
return;
}
- callback = ic->callback;
- callback_cls = ic->callback_cls;
- ic->iterating = GNUNET_NO;
- GNUNET_PEERSTORE_iterate_cancel (ic);
- /* NOTE: set this here and not after callback because callback may free h */
- h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- if (NULL != callback)
- callback (callback_cls, NULL, NULL);
+ if (NULL != ic->callback)
+ ic->callback (ic->callback_cls, NULL, NULL);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up iteration with rid %u\n",
ic->rid);
+ GNUNET_CONTAINER_DLL_remove (h->iterate_head, h->iterate_tail, ic);
}
@@ -674,11 +665,12 @@ handle_iterate_result (void *cls, const struct
StoreRecordMessage *msg)
{
struct GNUNET_PEERSTORE_Handle *h = cls;
struct GNUNET_PEERSTORE_IterateContext *ic;
- GNUNET_PEERSTORE_Processor callback;
- void *callback_cls;
struct GNUNET_PEERSTORE_Record *record;
- ic = h->iterate_head;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received StoreRecordMessage\n");
+ for (ic = h->iterate_head; NULL != ic; ic = ic->next)
+ if (ic->rid == msg->rid)
+ break;
if (NULL == ic)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -686,21 +678,18 @@ handle_iterate_result (void *cls, const struct
StoreRecordMessage *msg)
disconnect_and_schedule_reconnect (h);
return;
}
- ic->iterating = GNUNET_YES;
- callback = ic->callback;
- callback_cls = ic->callback_cls;
- if (NULL == callback)
+ if (NULL == ic->callback)
return;
record = PEERSTORE_parse_record_message (msg);
if (NULL == record)
{
- callback (callback_cls,
- NULL,
- _ ("Received a malformed response from service."));
+ ic->callback (ic->callback_cls,
+ NULL,
+ _ ("Received a malformed response from service."));
}
else
{
- callback (callback_cls, record, NULL);
+ ic->callback (ic->callback_cls, record, NULL);
PEERSTORE_destroy_record (record);
}
}
@@ -715,11 +704,6 @@ handle_iterate_result (void *cls, const struct
StoreRecordMessage *msg)
void
GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
{
- if (GNUNET_YES == ic->iterating)
- {
- if (NULL != ic->callback)
- ic->callback (ic->callback_cls, NULL, "Iteration canceled due to
reconnection");
- }
GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
GNUNET_free (ic->sub_system);
GNUNET_free (ic->key);
@@ -738,8 +722,11 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle
*h,
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_PEERSTORE_IterateContext *ic;
+ ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
+ ic->rid = get_op_id (h);
ev =
- PEERSTORE_create_record_mq_envelope (sub_system,
+ PEERSTORE_create_record_mq_envelope (ic->rid,
+ sub_system,
peer,
key,
NULL,
@@ -747,7 +734,6 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
GNUNET_TIME_UNIT_FOREVER_ABS,
0,
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
- ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
ic->callback = callback;
ic->callback_cls = callback_cls;
ic->h = h;
@@ -831,10 +817,14 @@ static void
reconnect (void *cls)
{
struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_MQ_MessageHandler mq_handlers[] =
- { GNUNET_MQ_hd_fixed_size (iterate_end,
+ struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ GNUNET_MQ_hd_fixed_size (iterate_end,
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
- struct GNUNET_MessageHeader,
+ struct PeerstoreResultMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (store_result,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT,
+ struct PeerstoreResultMessage,
h),
GNUNET_MQ_hd_var_size (iterate_result,
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
@@ -844,7 +834,8 @@ reconnect (void *cls)
GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
struct StoreRecordMessage,
h),
- GNUNET_MQ_handler_end () };
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_MQ_Envelope *ev;
h->reconnect_task = NULL;
@@ -868,8 +859,10 @@ reconnect (void *cls)
for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL !=
ic;
ic = ic->next)
{
+ ic->rid = get_op_id(h);
ev =
- PEERSTORE_create_record_mq_envelope (ic->sub_system,
+ PEERSTORE_create_record_mq_envelope (ic->rid,
+ ic->sub_system,
&ic->peer,
ic->key,
NULL,
@@ -882,8 +875,10 @@ reconnect (void *cls)
for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
sc = sc->next)
{
+ sc->rid = get_op_id(h);
ev =
- PEERSTORE_create_record_mq_envelope (sc->sub_system,
+ PEERSTORE_create_record_mq_envelope (sc->rid,
+ sc->sub_system,
&sc->peer,
sc->key,
sc->value,
@@ -891,7 +886,6 @@ reconnect (void *cls)
sc->expiry,
sc->options,
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
- GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
GNUNET_MQ_send (h->mq, ev);
}
}
@@ -909,7 +903,7 @@ GNUNET_PEERSTORE_watch_cancel (struct
GNUNET_PEERSTORE_WatchContext *wc)
struct GNUNET_MQ_Envelope *ev;
struct StoreKeyHashMessage *hm;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Cancelling watch.\n");
if (NULL != wc->ic)
{
GNUNET_PEERSTORE_iterate_cancel (wc->ic);
@@ -919,6 +913,7 @@ GNUNET_PEERSTORE_watch_cancel (struct
GNUNET_PEERSTORE_WatchContext *wc)
ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
hm->keyhash = wc->keyhash;
+ hm->rid = get_op_id (h);
GNUNET_MQ_send (h->mq, ev);
GNUNET_assert (
GNUNET_YES ==
@@ -935,53 +930,55 @@ watch_iterate (void *cls,
struct GNUNET_PEERSTORE_WatchContext *wc = cls;
struct GNUNET_PEERSTORE_Handle *h = wc->h;
struct StoreKeyHashMessage *hm;
+ struct GNUNET_MQ_Envelope *ev;
+ const struct GNUNET_PeerIdentity *peer;
if (NULL != emsg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Got failure from PEERSTORE: %s\n",
+ emsg);
wc->callback (wc->callback_cls, NULL, emsg);
return;
}
- if (NULL == record)
+ if ((NULL != record) &&
+ (NULL != wc->callback))
{
- struct GNUNET_MQ_Envelope *ev;
- const struct GNUNET_PeerIdentity *peer;
-
- if (NULL == wc->peer)
- peer = GNUNET_new (struct GNUNET_PeerIdentity);
- else
- peer = wc->peer;
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
- PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Hash key we watch for %s\n",
- GNUNET_h2s_full (&hm->keyhash));
- wc->keyhash = hm->keyhash;
- if (NULL == h->watches)
- h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
- GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
- h->watches,
- &wc->keyhash,
- wc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
- wc->sub_system,
- GNUNET_i2s (peer),
- wc->key);
- GNUNET_MQ_send (h->mq, ev);
- wc->ic = NULL;
- if (NULL != wc->callback)
- wc->callback (wc->callback_cls, record, NULL);
- if (NULL == wc->peer)
- GNUNET_free_nz ((void *) peer);
+ wc->callback (wc->callback_cls, record, NULL);
return;
}
+ if (NULL == wc->peer)
+ peer = GNUNET_new (struct GNUNET_PeerIdentity);
+ else
+ peer = wc->peer;
+ ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+ PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
+ hm->rid = get_op_id (h);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Hash key we watch for %s\n",
+ GNUNET_h2s_full (&hm->keyhash));
+ wc->keyhash = hm->keyhash;
+ if (NULL == h->watches)
+ h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
+ GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
+ h->watches,
+ &wc->keyhash,
+ wc,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
+ wc->sub_system,
+ GNUNET_i2s (peer),
+ wc->key);
+ GNUNET_MQ_send (h->mq, ev);
+ wc->ic = NULL;
if (NULL != wc->callback)
wc->callback (wc->callback_cls, record, NULL);
+ if (NULL == wc->peer)
+ GNUNET_free_nz ((void *) peer);
+ return;
+
}
@@ -1040,32 +1037,32 @@ hello_updated (void *cls,
struct GNUNET_PEERSTORE_NotifyContext *nc = cls;
const struct GNUNET_MessageHeader *hello;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "hello_updated\n");
if (NULL != emsg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Got failure from PEERSTORE: %s\n",
+ emsg);
nc->callback (nc->callback_cls, NULL, NULL, emsg);
return;
}
if (NULL == record)
return;
hello = record->value;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated with expired %s and size %u for peer %s\n",
- GNUNET_STRINGS_absolute_time_to_string (
- GNUNET_HELLO_builder_get_expiration_time (hello)),
- ntohs (hello->size),
- GNUNET_i2s (&record->peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "hello_updated with expired %s and size %u for peer %s\n",
+ GNUNET_STRINGS_absolute_time_to_string (
+ GNUNET_HELLO_builder_get_expiration_time (hello)),
+ ntohs (hello->size),
+ GNUNET_i2s (&record->peer));
if ((0 == record->value_size))
{
GNUNET_break (0);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "hello_updated call callback\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "hello_updated call callback\n");
nc->callback (nc->callback_cls, &record->peer, hello, NULL);
}
@@ -1119,13 +1116,13 @@ merge_success (void *cls, int success)
if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_remove
(huc->store_context_map,
huc->pid,
shu_cls->sc))
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "There was no store context to be removed after storing hello
for peer %s\n",
- GNUNET_i2s (huc->pid));
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "There was no store context to be removed after storing hello for
peer %s\n",
+ GNUNET_i2s (huc->pid));
if (GNUNET_OK != success)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Storing hello uri failed\n");
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Storing hello uri failed\n");
huc->cont (huc->cont_cls, success);
GNUNET_free (huc->hello);
GNUNET_free (huc->pid);
@@ -1139,18 +1136,18 @@ merge_success (void *cls, int success)
huc->wc = NULL;
huc->cont (huc->cont_cls, GNUNET_OK);
huc->success = GNUNET_OK;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Storing hello uri succeeded for peer %s!\n",
- GNUNET_i2s (huc->pid));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Storing hello uri succeeded for peer %s!\n",
+ GNUNET_i2s (huc->pid));
GNUNET_free (huc->hello);
GNUNET_free (huc->pid);
GNUNET_free (huc);
GNUNET_free (shu_cls);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got notified during storing hello uri for peer %s!\n",
- GNUNET_i2s (huc->pid));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got notified during storing hello uri for peer %s!\n",
+ GNUNET_i2s (huc->pid));
GNUNET_free (shu_cls);
}
@@ -1176,9 +1173,9 @@ store_hello (struct GNUNET_PEERSTORE_StoreHelloContext
*huc,
GNUNET_PEERSTORE_STOREOPTION_REPLACE,
merge_success,
shu_cls);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "store_hello with expiration %s\n",
- GNUNET_STRINGS_absolute_time_to_string (hello_exp));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "store_hello with expiration %s\n",
+ GNUNET_STRINGS_absolute_time_to_string (hello_exp));
GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_put (
huc->store_context_map,
huc->pid,
@@ -1202,27 +1199,27 @@ merge_uri (void *cls,
if (NULL != emsg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got failure from PEERSTORE: %s\n",
- emsg);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Got failure from PEERSTORE: %s\n",
+ emsg);
return;
}
if (NULL == record && GNUNET_NO == huc->success)
{
huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "merge_uri just store for peer %s with expiration %s\n",
- GNUNET_i2s (huc->pid),
- GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "merge_uri just store for peer %s with expiration %s\n",
+ GNUNET_i2s (huc->pid),
+ GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
store_hello (huc, huc->hello);
}
else if (GNUNET_NO == huc->success && 0 == GNUNET_memcmp (huc->pid,
&record->peer))
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "merge_uri record for peer %s\n",
- GNUNET_i2s (&record->peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "merge_uri record for peer %s\n",
+ GNUNET_i2s (&record->peer));
hello = record->value;
if ((0 == record->value_size))
{
diff --git a/src/service/peerstore/peerstore_common.c
b/src/service/peerstore/peerstore_common.c
index e3bb77d86..5d4d06c0c 100644
--- a/src/service/peerstore/peerstore_common.c
+++ b/src/service/peerstore/peerstore_common.c
@@ -59,21 +59,9 @@ PEERSTORE_hash_key (const char *sub_system,
}
-/**
- * Creates a MQ envelope for a single record
- *
- * @param sub_system sub system string
- * @param peer Peer identity (can be NULL)
- * @param key record key string (can be NULL)
- * @param value record value BLOB (can be NULL)
- * @param value_size record value size in bytes (set to 0 if value is NULL)
- * @param expiry time after which the record expires
- * @param options options specific to the storage operation
- * @param msg_type message type to be set in header
- * @return pointer to record message struct
- */
struct GNUNET_MQ_Envelope *
-PEERSTORE_create_record_mq_envelope (const char *sub_system,
+PEERSTORE_create_record_mq_envelope (uint32_t rid,
+ const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
const void *value,
@@ -106,6 +94,7 @@ PEERSTORE_create_record_mq_envelope (const char *sub_system,
srm->peer_set = htons (GNUNET_YES);
srm->peer = *peer;
}
+ srm->rid = rid;
srm->sub_system_size = htons (ss_size);
srm->value_size = htons (value_size);
srm->options = htonl (options);
diff --git a/src/service/peerstore/peerstore_common.h
b/src/service/peerstore/peerstore_common.h
index f5352f5a5..56f1f8b8b 100644
--- a/src/service/peerstore/peerstore_common.h
+++ b/src/service/peerstore/peerstore_common.h
@@ -40,6 +40,7 @@ PEERSTORE_hash_key (const char *sub_system,
/**
* Creates a MQ envelope for a single record
*
+ * @param rid request ID
* @param sub_system sub system string
* @param peer Peer identity (can be NULL)
* @param key record key string (can be NULL)
@@ -51,7 +52,8 @@ PEERSTORE_hash_key (const char *sub_system,
* @return pointer to record message struct
*/
struct GNUNET_MQ_Envelope *
-PEERSTORE_create_record_mq_envelope (const char *sub_system,
+PEERSTORE_create_record_mq_envelope (uint32_t rid,
+ const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
const void *value,
diff --git a/src/service/peerstore/perf_peerstore_store.c
b/src/service/peerstore/perf_peerstore_store.c
index e59af61e5..e328be93e 100644
--- a/src/service/peerstore/perf_peerstore_store.c
+++ b/src/service/peerstore/perf_peerstore_store.c
@@ -32,31 +32,48 @@ static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
+static struct GNUNET_PEERSTORE_WatchContext *wc;
+
static char *ss = "test_peerstore_stress";
static struct GNUNET_PeerIdentity p;
static char *k = "test_peerstore_stress_key";
static char *v = "test_peerstore_stress_val";
static int count = 0;
+static int count_fin = 0;
static void
-disconnect ()
+disconnect (void *cls)
{
+ GNUNET_PEERSTORE_watch_cancel (wc);
if (NULL != h)
GNUNET_PEERSTORE_disconnect (h);
GNUNET_SCHEDULER_shutdown ();
}
+static void
+store_cont (void *cls, int ret)
+{
+ count_fin++;
+ if (count_fin == count)
+ {
+ ok = 0;
+ GNUNET_SCHEDULER_add_now (&disconnect, NULL);
+ }
+}
+
+
static void
store ()
{
+ count++;
GNUNET_PEERSTORE_store (h, ss, &p, k, v, strlen (v) + 1,
GNUNET_TIME_UNIT_FOREVER_ABS,
(count ==
0) ? GNUNET_PEERSTORE_STOREOPTION_REPLACE :
- GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, NULL, NULL);
- count++;
+ GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, store_cont,
+ NULL);
}
@@ -65,12 +82,7 @@ watch_cb (void *cls, const struct GNUNET_PEERSTORE_Record
*record,
const char *emsg)
{
GNUNET_assert (NULL == emsg);
- if (STORES == count)
- {
- ok = 0;
- disconnect ();
- }
- else
+ if (STORES > count)
store ();
}
@@ -82,7 +94,7 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg,
memset (&p, 5, sizeof(p));
h = GNUNET_PEERSTORE_connect (cfg);
GNUNET_assert (NULL != h);
- GNUNET_PEERSTORE_watch (h, ss, &p, k, &watch_cb, NULL);
+ wc = GNUNET_PEERSTORE_watch (h, ss, &p, k, &watch_cb, NULL);
store ();
}
diff --git a/src/service/peerstore/test_peerstore_api_iterate.c
b/src/service/peerstore/test_peerstore_api_iterate.c
index 59367a110..0fa58f3fd 100644
--- a/src/service/peerstore/test_peerstore_api_iterate.c
+++ b/src/service/peerstore/test_peerstore_api_iterate.c
@@ -21,6 +21,7 @@
* @file peerstore/test_peerstore_api_iterate.c
* @brief testcase for peerstore iteration operation
*/
+#include "gnunet_common.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_testing_lib.h"
@@ -40,6 +41,13 @@ static char *k3 = "test_peerstore_api_iterate_key3";
static char *val = "test_peerstore_api_iterate_val";
static int count = 0;
+static void
+finish (void *cls)
+{
+ GNUNET_PEERSTORE_disconnect (h);
+ GNUNET_SCHEDULER_shutdown ();
+}
+
static void
iter3_cb (void *cls,
@@ -58,8 +66,7 @@ iter3_cb (void *cls,
}
GNUNET_assert (count == 3);
ok = 0;
- GNUNET_PEERSTORE_disconnect (h);
- GNUNET_SCHEDULER_shutdown ();
+ GNUNET_SCHEDULER_add_now (&finish, NULL);
}
@@ -104,6 +111,7 @@ iter1_cb (void *cls,
count++;
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u is count\n", count);
GNUNET_assert (count == 1);
count = 0;
ic = GNUNET_PEERSTORE_iterate (h,
@@ -115,6 +123,50 @@ iter1_cb (void *cls,
}
+static void
+store_cont (void *cls, int success)
+{
+ GNUNET_assert (GNUNET_OK == success);
+ if (0 == count)
+ {
+ GNUNET_PEERSTORE_store (h,
+ ss,
+ &p1,
+ k2,
+ val,
+ strlen (val) + 1,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+ &store_cont,
+ NULL);
+ }
+ else if (1 == count)
+ {
+ GNUNET_PEERSTORE_store (h,
+ ss,
+ &p2,
+ k3,
+ val,
+ strlen (val) + 1,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+ &store_cont,
+ NULL);
+ }
+ else
+ {
+ count = 0;
+ ic = GNUNET_PEERSTORE_iterate (h,
+ ss,
+ &p1,
+ k1,
+ &iter1_cb, NULL);
+ return;
+ }
+ count++;
+}
+
+
static void
run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -124,6 +176,7 @@ run (void *cls,
GNUNET_assert (NULL != h);
memset (&p1, 1, sizeof(p1));
memset (&p2, 2, sizeof(p2));
+ count = 0;
GNUNET_PEERSTORE_store (h,
ss,
&p1,
@@ -132,33 +185,8 @@ run (void *cls,
strlen (val) + 1,
GNUNET_TIME_UNIT_FOREVER_ABS,
GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- NULL,
- NULL);
- GNUNET_PEERSTORE_store (h,
- ss,
- &p1,
- k2,
- val,
- strlen (val) + 1,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- NULL,
+ &store_cont,
NULL);
- GNUNET_PEERSTORE_store (h,
- ss,
- &p2,
- k3,
- val,
- strlen (val) + 1,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- NULL,
- NULL);
- ic = GNUNET_PEERSTORE_iterate (h,
- ss,
- &p1,
- k1,
- &iter1_cb, NULL);
}
diff --git a/src/service/peerstore/test_peerstore_api_store.c
b/src/service/peerstore/test_peerstore_api_store.c
index 77e8a17c1..8cf0e60a7 100644
--- a/src/service/peerstore/test_peerstore_api_store.c
+++ b/src/service/peerstore/test_peerstore_api_store.c
@@ -21,6 +21,7 @@
* @file peerstore/test_peerstore_api_store.c
* @brief testcase for peerstore store operation
*/
+#include "gnunet_common.h"
#include "platform.h"
#include "gnunet_peerstore_service.h"
#include "gnunet_testing_lib.h"
@@ -40,6 +41,13 @@ static char *val3 = "test_peerstore_api_store_val3--";
static int count = 0;
+static void
+finish (void *cls)
+{
+ GNUNET_PEERSTORE_disconnect (h);
+ GNUNET_SCHEDULER_shutdown ();
+}
+
static void
test3_cont2 (void *cls,
const struct GNUNET_PEERSTORE_Record *record,
@@ -57,8 +65,7 @@ test3_cont2 (void *cls,
}
GNUNET_assert (count == 1);
ok = 0;
- GNUNET_PEERSTORE_disconnect (h, GNUNET_YES);
- GNUNET_SCHEDULER_shutdown ();
+ GNUNET_SCHEDULER_add_now (&finish, NULL);
}
@@ -158,7 +165,10 @@ test1_cont2 (void *cls,
const char *emsg)
{
if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Error received: %s\n", emsg);
return;
+ }
if (NULL != record)
{
GNUNET_assert ((strlen (val1) + 1) == record->value_size);
@@ -175,6 +185,7 @@ test1_cont2 (void *cls,
static void
test1_cont (void *cls, int success)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Store done, ret=%d...\n", success);
if (GNUNET_YES != success)
return;
count = 0;
@@ -193,6 +204,7 @@ test1_cont (void *cls, int success)
static void
test1 ()
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test1 start\n");
GNUNET_PEERSTORE_store (h,
subsystem,
&pid,
diff --git a/src/service/peerstore/test_peerstore_api_sync.c
b/src/service/peerstore/test_peerstore_api_sync.c
deleted file mode 100644
index 4e16afae8..000000000
--- a/src/service/peerstore/test_peerstore_api_sync.c
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- This file is part of GNUnet.
- Copyright (C) 2015 GNUnet e.V.
-
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-
- SPDX-License-Identifier: AGPL3.0-or-later
- */
-/**
- * @file peerstore/test_peerstore_api_sync.c
- * @brief testcase for peerstore sync-on-disconnect feature. Stores
- * a value just before disconnecting, and then checks that
- * this value is actually stored.
- * @author Omar Tarabai
- * @author Christian Grothoff (minor fix, comments)
- */
-#include "platform.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_testing_lib.h"
-#include "gnunet_peerstore_service.h"
-
-/**
- * Overall result, 0 for success.
- */
-static int ok = 404;
-
-/**
- * Configuration we use.
- */
-static const struct GNUNET_CONFIGURATION_Handle *cfg;
-
-/**
- * handle to talk to the peerstore.
- */
-static struct GNUNET_PEERSTORE_Handle *h;
-
-/**
- * Subsystem we store the value for.
- */
-static const char *subsystem = "test_peerstore_api_sync";
-
-/**
- * Fake PID under which we store the value.
- */
-static struct GNUNET_PeerIdentity pid;
-
-/**
- * Test key we're storing the test value under.
- */
-static const char *key = "test_peerstore_api_store_key";
-
-/**
- * Test value we are storing.
- */
-static const char *val = "test_peerstore_api_store_val";
-
-
-/**
- * Timeout
- */
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
-
-/**
- * Timeout task
- */
-static struct GNUNET_SCHEDULER_Task *to;
-
-/**
- * Iterate handle
- */
-static struct GNUNET_PEERSTORE_IterateContext *it;
-
-static void
-test_cont (void *cls);
-
-/**
- * Function that should be called with the result of the
- * lookup, and finally once with NULL to signal the end
- * of the iteration.
- *
- * Upon the first call, we set "ok" to success. On the
- * second call (end of iteration) we terminate the test.
- *
- * @param cls NULL
- * @param record the information stored in the peerstore
- * @param emsg any error message
- * @return #GNUNET_YES (all good, continue)
- */
-static void
-iterate_cb (void *cls,
- const struct GNUNET_PEERSTORE_Record *record,
- const char *emsg)
-{
- const char *rec_val;
-
- GNUNET_break (NULL == emsg);
- if (NULL == record)
- {
- it = NULL;
- if (0 == ok)
- {
- GNUNET_PEERSTORE_disconnect (h);
- if (NULL != to)
- {
- GNUNET_SCHEDULER_cancel (to);
- to = NULL;
- }
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- /**
- * Try again
- */
- GNUNET_SCHEDULER_add_now (&test_cont,
- NULL);
- return;
- }
- rec_val = record->value;
- GNUNET_break (0 == strcmp (rec_val, val));
- ok = 0;
-}
-
-
-static void
-timeout_task (void *cls)
-{
- to = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Timeout reached\n");
- if (NULL != it)
- GNUNET_PEERSTORE_iterate_cancel (it);
- it = NULL;
- GNUNET_PEERSTORE_disconnect (h,
- GNUNET_NO);
- GNUNET_SCHEDULER_shutdown ();
- return;
-}
-
-
-/**
- * Run the 2nd stage of the test where we fetch the
- * data that should have been stored.
- *
- * @param cls NULL
- */
-static void
-test_cont (void *cls)
-{
- it = GNUNET_PEERSTORE_iterate (h,
- subsystem,
- &pid, key,
- &iterate_cb,
- NULL);
-}
-
-
-static void
-disc_cont (void *cls)
-{
- GNUNET_PEERSTORE_disconnect (h, GNUNET_YES);
- h = GNUNET_PEERSTORE_connect (cfg);
- GNUNET_SCHEDULER_add_now (&test_cont,
- NULL);
-}
-
-
-static void
-store_cont (void *cls, int success)
-{
- ok = success;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Success: %s\n",
- (GNUNET_SYSERR == ok) ? "no" : "yes");
- /* We need to wait a little bit to give the disconnect
- a chance to actually finish the operation; otherwise,
- the test may fail non-deterministically if the new
- connection is faster than the cleanup routine of the
- old one. */
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &disc_cont,
- NULL);
-}
-
-
-/**
- * Actually run the test.
- */
-static void
-test1 ()
-{
- h = GNUNET_PEERSTORE_connect (cfg);
- GNUNET_PEERSTORE_store (h,
- subsystem,
- &pid,
- key,
- val, strlen (val) + 1,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- &store_cont, NULL);
-}
-
-
-/**
- * Initialize globals and launch the test.
- *
- * @param cls NULL
- * @param c configuration to use
- * @param peer handle to our peer (unused)
- */
-static void
-run (void *cls,
- const struct GNUNET_CONFIGURATION_Handle *c,
- struct GNUNET_TESTING_Peer *peer)
-{
- cfg = c;
- memset (&pid, 1, sizeof(pid));
- to = GNUNET_SCHEDULER_add_delayed (TIMEOUT,
- &timeout_task,
- NULL);
- GNUNET_SCHEDULER_add_now (&test1, NULL);
-}
-
-
-int
-main (int argc, char *argv[])
-{
- if (0 !=
- GNUNET_TESTING_service_run ("test-gnunet-peerstore-sync",
- "peerstore",
- "peerstore.conf",
- &run, NULL))
- return 1;
- if (0 != ok)
- fprintf (stderr,
- "Test failed: %d\n",
- ok);
- return ok;
-}
-
-
-/* end of test_peerstore_api_sync.c */
diff --git a/src/service/peerstore/test_peerstore_api_watch.c
b/src/service/peerstore/test_peerstore_api_watch.c
index 126b321df..63b0e896b 100644
--- a/src/service/peerstore/test_peerstore_api_watch.c
+++ b/src/service/peerstore/test_peerstore_api_watch.c
@@ -21,6 +21,8 @@
* @file peerstore/test_peerstore_api_watch.c
* @brief testcase for peerstore watch functionality
*/
+#include "gnunet_common.h"
+#include "gnunet_time_lib.h"
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_testing_lib.h"
@@ -31,12 +33,42 @@ static int ok = 1;
static struct GNUNET_PEERSTORE_Handle *h;
+static struct GNUNET_PEERSTORE_WatchContext *wc;
+
static char *ss = "test_peerstore_api_watch";
static char *k = "test_peerstore_api_watch_key";
static char *val = "test_peerstore_api_watch_val";
+static struct GNUNET_PeerIdentity p;
+
+static void
+finish (void *cls)
+{
+ GNUNET_PEERSTORE_watch_cancel (wc);
+ GNUNET_PEERSTORE_disconnect (h);
+ GNUNET_SCHEDULER_shutdown ();
+}
+
+
+static void
+cont (void *cls)
+{
+ GNUNET_PEERSTORE_store (h,
+ ss,
+ &p,
+ k,
+ val,
+ strlen (val) + 1,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+ NULL,
+ NULL);
+}
+
+
+static int initial_iteration = GNUNET_YES;
static void
watch_cb (void *cls,
@@ -44,12 +76,28 @@ watch_cb (void *cls,
const char *emsg)
{
GNUNET_assert (NULL == emsg);
+ if (GNUNET_YES == initial_iteration)
+ {
+ if (NULL != record)
+ {
+ GNUNET_break (0);
+ return; // Ignore this record, FIXME: Test unclean
+ }
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &cont, NULL);
+ initial_iteration = GNUNET_NO;
+ return;
+ }
+ if (NULL == record)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received record: %s\n",
+ (char*) record->value);
GNUNET_assert (0 == strcmp (val,
(char *) record->value));
ok = 0;
- GNUNET_PEERSTORE_disconnect (h,
- GNUNET_NO);
- GNUNET_SCHEDULER_shutdown ();
+ GNUNET_SCHEDULER_add_now (&finish, NULL);
}
@@ -58,29 +106,18 @@ run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_TESTING_Peer *peer)
{
- struct GNUNET_PeerIdentity p;
h = GNUNET_PEERSTORE_connect (cfg);
GNUNET_assert (NULL != h);
memset (&p,
4,
sizeof(p));
- GNUNET_PEERSTORE_watch (h,
- ss,
- &p,
- k,
- &watch_cb,
- NULL);
- GNUNET_PEERSTORE_store (h,
- ss,
- &p,
- k,
- val,
- strlen (val) + 1,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_PEERSTORE_STOREOPTION_REPLACE,
- NULL,
- NULL);
+ wc = GNUNET_PEERSTORE_watch (h,
+ ss,
+ &p,
+ k,
+ &watch_cb,
+ NULL);
}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet] branch master updated: PEERSTORE: Major API overhault to fix a variety of race conditions.,
gnunet <=