gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: PEERSTORE: Major API overhault to fix a


From: gnunet
Subject: [gnunet] branch master updated: PEERSTORE: Major API overhault to fix a variety of race conditions.
Date: Thu, 23 Nov 2023 19:18:37 +0100

This is an automated email from the git hooks/post-receive script.

martin-schanzenbach pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new d51ba6435 PEERSTORE: Major API overhault to fix a variety of race 
conditions.
     new fbe2292e2 Merge branch 'master' of git+ssh://git.gnunet.org/gnunet
d51ba6435 is described below

commit d51ba6435b62772ccf2a20d13e99c164898bdc9d
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Thu Nov 23 19:18:00 2023 +0100

    PEERSTORE: Major API overhault to fix a variety of race conditions.
---
 src/include/gnunet_protocols.h                     |   6 +
 src/service/meson.build                            |   2 +-
 src/service/peerstore/Makefile.am                  |   8 -
 src/service/peerstore/gnunet-service-peerstore.c   | 133 +++++--
 src/service/peerstore/meson.build                  |  55 +++
 src/service/peerstore/peerstore.h                  |  33 +-
 src/service/peerstore/peerstore_api.c              | 419 ++++++++++-----------
 src/service/peerstore/peerstore_common.c           |  17 +-
 src/service/peerstore/peerstore_common.h           |   4 +-
 src/service/peerstore/perf_peerstore_store.c       |  32 +-
 src/service/peerstore/test_peerstore_api_iterate.c |  84 +++--
 src/service/peerstore/test_peerstore_api_store.c   |  16 +-
 src/service/peerstore/test_peerstore_api_sync.c    | 252 -------------
 src/service/peerstore/test_peerstore_api_watch.c   |  77 +++-
 14 files changed, 549 insertions(+), 589 deletions(-)

diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 0daa37bed..a998344b9 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2628,6 +2628,12 @@ extern "C" {
  */
 #define GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL 826
 
+/**
+ * Store result message
+ */
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT 827
+
+
 
/*******************************************************************************
  * SOCIAL message types
  
******************************************************************************/
diff --git a/src/service/meson.build b/src/service/meson.build
index b5c4bc1ae..fcd3f7520 100644
--- a/src/service/meson.build
+++ b/src/service/meson.build
@@ -7,8 +7,8 @@ endif
 subdir('util')
 subdir('statistics')
 subdir('arm')
-subdir('peerstore')
 subdir('testing')
+subdir('peerstore')
 subdir('nat')
 subdir('nat-auto')
 subdir('transport')
diff --git a/src/service/peerstore/Makefile.am 
b/src/service/peerstore/Makefile.am
index 6ca8e7925..a12e8ff8b 100644
--- a/src/service/peerstore/Makefile.am
+++ b/src/service/peerstore/Makefile.am
@@ -44,7 +44,6 @@ check_PROGRAMS = \
  test_peerstore_api_store \
  test_peerstore_api_iterate \
  test_peerstore_api_watch \
- test_peerstore_api_sync \
  perf_peerstore_store
 
 EXTRA_DIST = \
@@ -76,13 +75,6 @@ test_peerstore_api_watch_LDADD = \
   $(top_builddir)/src/service/testing/libgnunettesting.la \
   $(top_builddir)/src/lib/util/libgnunetutil.la
 
-test_peerstore_api_sync_SOURCES = \
- test_peerstore_api_sync.c
-test_peerstore_api_sync_LDADD = \
-  libgnunetpeerstore.la  \
-  $(top_builddir)/src/service/testing/libgnunettesting.la \
-  $(top_builddir)/src/lib/util/libgnunetutil.la
-
 perf_peerstore_store_SOURCES = \
  perf_peerstore_store.c
 perf_peerstore_store_LDADD = \
diff --git a/src/service/peerstore/gnunet-service-peerstore.c 
b/src/service/peerstore/gnunet-service-peerstore.c
index 364900674..77523aa2e 100644
--- a/src/service/peerstore/gnunet-service-peerstore.c
+++ b/src/service/peerstore/gnunet-service-peerstore.c
@@ -100,6 +100,39 @@ do_shutdown ()
 }
 
 
+struct IterationContext
+{
+  /**
+   * The record that was stored.
+   */
+  struct GNUNET_PEERSTORE_Record *record;
+
+  /**
+   * The request ID
+   */
+  uint32_t rid;
+
+};
+
+struct StoreRecordContext
+{
+  /**
+   * The record that was stored.
+   */
+  struct GNUNET_PEERSTORE_Record *record;
+
+  /**
+   * The request ID
+   */
+  uint32_t rid;
+
+  /**
+   * The client
+   */
+  struct GNUNET_SERVICE_Client *client;
+};
+
+
 /**
  * Task run during shutdown.
  *
@@ -245,31 +278,37 @@ record_iterator (void *cls,
                  const struct GNUNET_PEERSTORE_Record *record,
                  const char *emsg)
 {
-  struct GNUNET_PEERSTORE_Record *cls_record = cls;
+  struct IterationContext *ic = cls;
   struct GNUNET_MQ_Envelope *env;
 
   if (NULL == record)
   {
     /* No more records */
-    struct GNUNET_MessageHeader *endmsg;
+    struct PeerstoreResultMessage *endmsg;
 
     env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
-    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env);
+    endmsg->rid = ic->rid;
     if (NULL == emsg)
     {
-      GNUNET_SERVICE_client_continue (cls_record->client);
+      endmsg->result = htonl (GNUNET_OK);
+      GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
+      GNUNET_SERVICE_client_continue (ic->record->client);
     }
     else
     {
+      endmsg->result = htonl (GNUNET_SYSERR);
+      GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
       GNUNET_break (0);
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to iterate: %s\n", emsg);
-      GNUNET_SERVICE_client_drop (cls_record->client);
+      GNUNET_SERVICE_client_drop (ic->record->client);
     }
-    PEERSTORE_destroy_record (cls_record);
+    PEERSTORE_destroy_record (ic->record);
+    GNUNET_free (ic);
     return;
   }
 
   env = PEERSTORE_create_record_mq_envelope (
+    ic->rid,
     record->sub_system,
     &record->peer,
     record->key,
@@ -278,7 +317,7 @@ record_iterator (void *cls,
     record->expiry,
     0,
     GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
-  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env);
+  GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (ic->record->client), env);
 }
 
 
@@ -300,6 +339,7 @@ watch_notifier_it (void *cls, const struct GNUNET_HashCode 
*key, void *value)
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
   env = PEERSTORE_create_record_mq_envelope (
+    0,
     record->sub_system,
     &record->peer,
     record->key,
@@ -415,27 +455,28 @@ check_iterate (void *cls, const struct StoreRecordMessage 
*srm)
 static void
 handle_iterate (void *cls, const struct StoreRecordMessage *srm)
 {
-  struct GNUNET_SERVICE_Client *client = cls;
-  struct GNUNET_PEERSTORE_Record *record;
+  struct IterationContext *ic = GNUNET_new (struct IterationContext);
 
-  record = PEERSTORE_parse_record_message (srm);
+  ic->record = PEERSTORE_parse_record_message (srm);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Iterate request: ss `%s', peer `%s', key `%s'\n",
-              record->sub_system,
-              GNUNET_i2s (&record->peer),
-              (NULL == record->key) ? "NULL" : record->key);
-  record->client = client;
+              ic->record->sub_system,
+              GNUNET_i2s (&ic->record->peer),
+              (NULL == ic->record->key) ? "NULL" : ic->record->key);
+  ic->record->client = cls;
+  ic->rid = srm->rid;
   if (GNUNET_OK !=
       db->iterate_records (db->cls,
-                           record->sub_system,
-                           (ntohs (srm->peer_set)) ? &record->peer : NULL,
-                           record->key,
+                           ic->record->sub_system,
+                           (ntohs (srm->peer_set)) ? &ic->record->peer : NULL,
+                           ic->record->key,
                            &record_iterator,
-                           record))
+                           ic))
   {
     GNUNET_break (0);
-    GNUNET_SERVICE_client_drop (client);
-    PEERSTORE_destroy_record (record);
+    GNUNET_SERVICE_client_drop (ic->record->client);
+    PEERSTORE_destroy_record (ic->record);
+    GNUNET_free (ic);
   }
 }
 
@@ -449,19 +490,28 @@ handle_iterate (void *cls, const struct 
StoreRecordMessage *srm)
 static void
 store_record_continuation (void *cls, int success)
 {
-  struct GNUNET_PEERSTORE_Record *record = cls;
+  struct StoreRecordContext *src = cls;
+  struct PeerstoreResultMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
 
   if (GNUNET_OK == success)
   {
-    watch_notifier (record);
-    GNUNET_SERVICE_client_continue (record->client);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
+    env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT);
+    msg->rid = src->rid;
+    msg->result = htonl (success);
+    GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (src->client), env);
+    watch_notifier (src->record);
+    GNUNET_SERVICE_client_continue (src->client);
   }
   else
   {
     GNUNET_break (0);
-    GNUNET_SERVICE_client_drop (record->client);
+    GNUNET_SERVICE_client_drop (src->record->client);
   }
-  PEERSTORE_destroy_record (record);
+  PEERSTORE_destroy_record (src->record);
+  GNUNET_free (src);
 }
 
 
@@ -504,35 +554,38 @@ static void
 handle_store (void *cls, const struct StoreRecordMessage *srm)
 {
   struct GNUNET_SERVICE_Client *client = cls;
-  struct GNUNET_PEERSTORE_Record *record;
-
-  record = PEERSTORE_parse_record_message (srm);
+  struct StoreRecordContext *src = GNUNET_new (struct StoreRecordContext);
+  src->record = PEERSTORE_parse_record_message (srm);
   GNUNET_log (
     GNUNET_ERROR_TYPE_DEBUG,
     "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: 
%u.\n",
-    record->sub_system,
-    GNUNET_i2s (&record->peer),
-    record->key,
+    src->record->sub_system,
+    GNUNET_i2s (&src->record->peer),
+    src->record->key,
     (uint32_t) ntohl (srm->options));
-  record->client = client;
+  src->record->client = client;
+  src->rid = srm->rid;
+  src->client = client;
   if (GNUNET_OK != db->store_record (db->cls,
-                                     record->sub_system,
-                                     &record->peer,
-                                     record->key,
-                                     record->value,
-                                     record->value_size,
-                                     record->expiry,
+                                     src->record->sub_system,
+                                     &src->record->peer,
+                                     src->record->key,
+                                     src->record->value,
+                                     src->record->value_size,
+                                     src->record->expiry,
                                      ntohl (srm->options),
                                      &store_record_continuation,
-                                     record))
+                                     src))
   {
     GNUNET_break (0);
-    PEERSTORE_destroy_record (record);
+    PEERSTORE_destroy_record (src->record);
+    GNUNET_free (src);
     GNUNET_SERVICE_client_drop (client);
     return;
   }
 }
 
+
 static void
 store_hello_continuation (void *cls, int success)
 {
diff --git a/src/service/peerstore/meson.build 
b/src/service/peerstore/meson.build
index db70b0b9e..2c6f7eba8 100644
--- a/src/service/peerstore/meson.build
+++ b/src/service/peerstore/meson.build
@@ -37,3 +37,58 @@ executable ('gnunet-service-peerstore',
             install: true,
             install_dir: get_option('libdir') / 'gnunet' / 'libexec')
 
+testpeerstore_api_iterate = executable ('test_peerstore_api_iterate',
+            ['test_peerstore_api_iterate.c'],
+            dependencies: [
+              libgnunetpeerstore_dep,
+              libgnunettesting_dep,
+              libgnunetutil_dep
+              ],
+            include_directories: [incdir, configuration_inc],
+            install: false)
+
+testpeerstore_api_store = executable ('test_peerstore_api_store',
+            ['test_peerstore_api_store.c'],
+            dependencies: [
+              libgnunetpeerstore_dep,
+              libgnunetutil_dep,
+              libgnunettesting_dep,
+              ],
+            include_directories: [incdir, configuration_inc],
+            install: false)
+
+testpeerstore_api_watch = executable ('test_peerstore_api_watch',
+            ['test_peerstore_api_watch.c'],
+            dependencies: [
+              libgnunetpeerstore_dep,
+              libgnunetutil_dep,
+              libgnunettesting_dep,
+              ],
+            include_directories: [incdir, configuration_inc],
+            install: false)
+testpeerstore_api_perf = executable ('perf_peerstore_store',
+            ['perf_peerstore_store.c'],
+            dependencies: [
+              libgnunetpeerstore_dep,
+              libgnunetutil_dep,
+              libgnunettesting_dep,
+              ],
+            include_directories: [incdir, configuration_inc],
+            install: false)
+
+configure_file(input : 'test_peerstore_api_data.conf',
+               output : 'test_peerstore_api_data.conf',
+               copy: true)
+
+test('test_peerstore_api_store', testpeerstore_api_store,
+  suite: 'peerstore', workdir: meson.current_build_dir())
+test('test_peerstore_api_watch', testpeerstore_api_watch,
+  suite: 'peerstore', workdir: meson.current_build_dir())
+test('test_peerstore_api_iterate', testpeerstore_api_iterate,
+  suite: 'peerstore', workdir: meson.current_build_dir())
+test('perf_peerstore_store', testpeerstore_api_perf,
+  suite: 'peerstore', workdir: meson.current_build_dir())
+
+
+
+
diff --git a/src/service/peerstore/peerstore.h 
b/src/service/peerstore/peerstore.h
index 0dec03443..26c656f00 100644
--- a/src/service/peerstore/peerstore.h
+++ b/src/service/peerstore/peerstore.h
@@ -30,6 +30,7 @@
 
 
 GNUNET_NETWORK_STRUCT_BEGIN
+
 /**
  * Message carrying a PEERSTORE record message
  */
@@ -73,6 +74,11 @@ struct StoreRecordMessage
    */
   uint16_t value_size GNUNET_PACKED;
 
+  /**
+   * Request id.
+   */
+  uint32_t rid GNUNET_PACKED;
+
   /**
    * Options, needed only in case of a
    * store operation
@@ -82,6 +88,28 @@ struct StoreRecordMessage
   /* Followed by key and value */
 };
 
+/**
+ * Message carrying a PEERSTORE result message
+ */
+struct PeerstoreResultMessage
+{
+  /**
+   * GNUnet message header
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Request id.
+   */
+  uint32_t rid GNUNET_PACKED;
+
+  /**
+   * Options, needed only in case of a
+   * store operation
+   */
+  uint32_t result GNUNET_PACKED;
+
+};
 
 /**
  * Message carrying record key hash
@@ -94,14 +122,15 @@ struct StoreKeyHashMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Always 0, for alignment.
+   * Request id.
    */
-  uint32_t reserved GNUNET_PACKED;
+  uint32_t rid GNUNET_PACKED;
 
   /**
    * Hash of a record key
    */
   struct GNUNET_HashCode keyhash;
+
 };
 
 GNUNET_NETWORK_STRUCT_END
diff --git a/src/service/peerstore/peerstore_api.c 
b/src/service/peerstore/peerstore_api.c
index 3dec7e01b..394f64378 100644
--- a/src/service/peerstore/peerstore_api.c
+++ b/src/service/peerstore/peerstore_api.c
@@ -23,6 +23,7 @@
  * @author Omar Tarabai
  * @author Christian Grothoff
  */
+#include "gnunet_common.h"
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_hello_uri_lib.h"
@@ -86,6 +87,11 @@ struct GNUNET_PEERSTORE_Handle
    */
   struct GNUNET_TIME_Relative reconnect_delay;
 
+  /**
+   *
+   */
+  uint32_t last_op_id;
+
 };
 
 /**
@@ -113,6 +119,11 @@ struct GNUNET_PEERSTORE_StoreContext
    */
   GNUNET_PEERSTORE_Continuation cont;
 
+  /**
+   * Request ID
+   */
+  uint32_t rid;
+
   /**
    * Closure for @e cont
    */
@@ -216,9 +227,10 @@ struct GNUNET_PEERSTORE_IterateContext
   void *callback_cls;
 
   /**
-   * #GNUNET_YES if we are currently processing records.
+   * Request ID
    */
-  int iterating;
+  uint32_t rid;
+
 };
 
 /**
@@ -275,6 +287,12 @@ struct GNUNET_PEERSTORE_WatchContext
    * The sub system requested the watch.
    */
   const char *sub_system;
+
+  /**
+   * Request ID
+   */
+  uint32_t rid;
+
 };
 
 /**
@@ -306,6 +324,12 @@ struct GNUNET_PEERSTORE_NotifyContext
    * Is this request canceled.
    */
   unsigned int canceled;
+
+  /**
+   * Request ID
+   */
+  uint32_t rid;
+
 };
 
 
/******************************************************************************/
@@ -320,6 +344,18 @@ struct GNUNET_PEERSTORE_NotifyContext
 static void
 reconnect (void *cls);
 
+/**
+ * Get a fresh operation id to distinguish between namestore requests
+ *
+ * @param h the namestore handle
+ * @return next operation id to use
+ */
+static uint32_t
+get_op_id (struct GNUNET_PEERSTORE_Handle *h)
+{
+  return h->last_op_id++;
+}
+
 
 /**
  * Disconnect from the peerstore service.
@@ -329,25 +365,13 @@ reconnect (void *cls);
 static void
 disconnect (struct GNUNET_PEERSTORE_Handle *h)
 {
-  struct GNUNET_PEERSTORE_IterateContext *next;
-
-  for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != 
ic;
-       ic = next)
+  if (NULL != h->watches)
   {
-    next = ic->next;
-    if (GNUNET_YES == ic->iterating)
-    {
-      GNUNET_PEERSTORE_Processor icb;
-      void *icb_cls;
-
-      icb = ic->callback;
-      icb_cls = ic->callback_cls;
-      GNUNET_PEERSTORE_iterate_cancel (ic);
-      if (NULL != icb)
-        icb (icb_cls, NULL, "Iteration canceled due to reconnection");
-    }
+    GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (h->watches));
+    GNUNET_CONTAINER_multihashmap_destroy (h->watches);
   }
-
+  GNUNET_assert (NULL == h->iterate_head);
+  GNUNET_assert (NULL == h->store_head);
   if (NULL != h->mq)
   {
     GNUNET_MQ_destroy (h->mq);
@@ -376,29 +400,6 @@ disconnect_and_schedule_reconnect (struct 
GNUNET_PEERSTORE_Handle *h)
 }
 
 
-/**
- * Callback after MQ envelope is sent
- *
- * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
- */
-static void
-store_request_sent (void *cls)
-{
-  struct GNUNET_PEERSTORE_StoreContext *sc = cls;
-  GNUNET_PEERSTORE_Continuation cont;
-  void *cont_cls;
-
-  if (NULL != sc)
-  {
-    cont = sc->cont;
-    cont_cls = sc->cont_cls;
-    GNUNET_PEERSTORE_store_cancel (sc);
-    if (NULL != cont)
-      cont (cont_cls, GNUNET_OK);
-  }
-}
-
-
 
/******************************************************************************/
 /*******************         CONNECTION FUNCTIONS         
*********************/
 
/******************************************************************************/
@@ -437,29 +438,12 @@ rewatch_it (void *cls, const struct GNUNET_HashCode *key, 
void *value)
 
   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
   hm->keyhash = wc->keyhash;
+  hm->rid = get_op_id (h);
   GNUNET_MQ_send (h->mq, ev);
   return GNUNET_YES;
 }
 
 
-/**
- * Iterator over watch requests to cancel them.
- *
- * @param cls unused
- * @param key key to the watch request
- * @param value watch context
- * @return #GNUNET_YES to continue iteration
- */
-static int
-destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
-{
-  struct GNUNET_PEERSTORE_WatchContext *wc = value;
-
-  GNUNET_PEERSTORE_watch_cancel (wc);
-  return GNUNET_YES;
-}
-
-
 /**
  * Connect to the PEERSTORE service.
  *
@@ -493,26 +477,7 @@ GNUNET_PEERSTORE_connect (const struct 
GNUNET_CONFIGURATION_Handle *cfg)
 void
 GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h)
 {
-  struct GNUNET_PEERSTORE_IterateContext *ic;
-  struct GNUNET_PEERSTORE_StoreContext *sc;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
-  if (NULL != h->watches)
-  {
-    GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
-    GNUNET_CONTAINER_multihashmap_destroy (h->watches);
-    h->watches = NULL;
-  }
-  while (NULL != (ic = h->iterate_head))
-  {
-    GNUNET_break (0);
-    GNUNET_PEERSTORE_iterate_cancel (ic);
-  }
-  while (NULL != (sc = h->store_head))
-  {
-    GNUNET_break (0);
-    GNUNET_PEERSTORE_store_cancel (sc);
-  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnect initiated from client.\n");
   disconnect (h);
 }
 
@@ -530,17 +495,17 @@ GNUNET_PEERSTORE_disconnect (struct 
GNUNET_PEERSTORE_Handle *h)
 void
 GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "store cancel with sc %p \n",
-              sc);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "store cancel with sc %p \n",
+       sc);
   GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
   GNUNET_free (sc->sub_system);
   GNUNET_free (sc->value);
   GNUNET_free (sc->key);
   GNUNET_free (sc);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "store cancel with sc %p is null\n",
-              sc);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "store cancel with sc %p is null\n",
+       sc);
 }
 
 
@@ -581,17 +546,8 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
        sub_system,
        GNUNET_i2s (peer),
        key);
-  ev =
-    PEERSTORE_create_record_mq_envelope (sub_system,
-                                         peer,
-                                         key,
-                                         value,
-                                         size,
-                                         expiry,
-                                         options,
-                                         GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
   sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
-
+  sc->rid = get_op_id (h);
   sc->sub_system = GNUNET_strdup (sub_system);
   sc->peer = *peer;
   sc->key = GNUNET_strdup (key);
@@ -602,14 +558,54 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
   sc->cont = cont;
   sc->cont_cls = cont_cls;
   sc->h = h;
+  ev =
+    PEERSTORE_create_record_mq_envelope (sc->rid,
+                                         sub_system,
+                                         peer,
+                                         key,
+                                         value,
+                                         size,
+                                         expiry,
+                                         options,
+                                         GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
 
   GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
-  GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
   GNUNET_MQ_send (h->mq, ev);
   return sc;
 }
 
 
+/**
+ * When a response for store request is received
+ *
+ * @param cls a `struct GNUNET_PEERSTORE_Handle *`
+ * @param msg message received
+ */
+static void
+handle_store_result (void *cls, const struct PeerstoreResultMessage *msg)
+{
+  struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerstoreResultMessage\n");
+  for (sc = h->store_head; NULL != sc; sc = sc->next)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Trying %u ?= %u\n", sc->rid, msg->rid);
+    if (sc->rid == msg->rid)
+      break;
+  }
+  if (NULL == sc)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         _("Unexpected store response.\n"));
+    return;
+  }
+  if (NULL != sc->cont)
+    sc->cont (sc->cont_cls, ntohl (msg->result));
+  GNUNET_CONTAINER_DLL_remove (h->store_head, h->store_tail, sc);
+}
+
+
 
/******************************************************************************/
 /*******************           ITERATE FUNCTIONS          
*********************/
 
/******************************************************************************/
@@ -622,29 +618,24 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
  * @param msg message received
  */
 static void
-handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
+handle_iterate_end (void *cls, const struct PeerstoreResultMessage *msg)
 {
   struct GNUNET_PEERSTORE_Handle *h = cls;
-  struct GNUNET_PEERSTORE_IterateContext *ic;
-  GNUNET_PEERSTORE_Processor callback;
-  void *callback_cls;
+  struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
 
-  ic = h->iterate_head;
+  for (ic = h->iterate_head; NULL != ic; ic = ic->next)
+    if (ic->rid == msg->rid)
+      break;
   if (NULL == ic)
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         _ ("Unexpected iteration response, this should not happen.\n"));
-    disconnect_and_schedule_reconnect (h);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         _ ("Unexpected iteration response.\n"));
     return;
   }
-  callback = ic->callback;
-  callback_cls = ic->callback_cls;
-  ic->iterating = GNUNET_NO;
-  GNUNET_PEERSTORE_iterate_cancel (ic);
-  /* NOTE: set this here and not after callback because callback may free h */
-  h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
-  if (NULL != callback)
-    callback (callback_cls, NULL, NULL);
+  if (NULL != ic->callback)
+    ic->callback (ic->callback_cls, NULL, NULL);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up iteration with rid %u\n", 
ic->rid);
+  GNUNET_CONTAINER_DLL_remove (h->iterate_head, h->iterate_tail, ic);
 }
 
 
@@ -674,11 +665,12 @@ handle_iterate_result (void *cls, const struct 
StoreRecordMessage *msg)
 {
   struct GNUNET_PEERSTORE_Handle *h = cls;
   struct GNUNET_PEERSTORE_IterateContext *ic;
-  GNUNET_PEERSTORE_Processor callback;
-  void *callback_cls;
   struct GNUNET_PEERSTORE_Record *record;
 
-  ic = h->iterate_head;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received StoreRecordMessage\n");
+  for (ic = h->iterate_head; NULL != ic; ic = ic->next)
+    if (ic->rid == msg->rid)
+      break;
   if (NULL == ic)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -686,21 +678,18 @@ handle_iterate_result (void *cls, const struct 
StoreRecordMessage *msg)
     disconnect_and_schedule_reconnect (h);
     return;
   }
-  ic->iterating = GNUNET_YES;
-  callback = ic->callback;
-  callback_cls = ic->callback_cls;
-  if (NULL == callback)
+  if (NULL == ic->callback)
     return;
   record = PEERSTORE_parse_record_message (msg);
   if (NULL == record)
   {
-    callback (callback_cls,
-              NULL,
-              _ ("Received a malformed response from service."));
+    ic->callback (ic->callback_cls,
+                  NULL,
+                  _ ("Received a malformed response from service."));
   }
   else
   {
-    callback (callback_cls, record, NULL);
+    ic->callback (ic->callback_cls, record, NULL);
     PEERSTORE_destroy_record (record);
   }
 }
@@ -715,11 +704,6 @@ handle_iterate_result (void *cls, const struct 
StoreRecordMessage *msg)
 void
 GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
 {
-  if (GNUNET_YES == ic->iterating)
-  {
-    if (NULL != ic->callback)
-      ic->callback (ic->callback_cls, NULL, "Iteration canceled due to 
reconnection");
-  }
   GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
   GNUNET_free (ic->sub_system);
   GNUNET_free (ic->key);
@@ -738,8 +722,11 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle 
*h,
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_PEERSTORE_IterateContext *ic;
 
+  ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
+  ic->rid = get_op_id (h);
   ev =
-    PEERSTORE_create_record_mq_envelope (sub_system,
+    PEERSTORE_create_record_mq_envelope (ic->rid,
+                                         sub_system,
                                          peer,
                                          key,
                                          NULL,
@@ -747,7 +734,6 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
                                          GNUNET_TIME_UNIT_FOREVER_ABS,
                                          0,
                                          
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
-  ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
   ic->callback = callback;
   ic->callback_cls = callback_cls;
   ic->h = h;
@@ -831,10 +817,14 @@ static void
 reconnect (void *cls)
 {
   struct GNUNET_PEERSTORE_Handle *h = cls;
-  struct GNUNET_MQ_MessageHandler mq_handlers[] =
-  { GNUNET_MQ_hd_fixed_size (iterate_end,
+  struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    GNUNET_MQ_hd_fixed_size (iterate_end,
                              GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
-                             struct GNUNET_MessageHeader,
+                             struct PeerstoreResultMessage,
+                             h),
+    GNUNET_MQ_hd_fixed_size (store_result,
+                             GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT,
+                             struct PeerstoreResultMessage,
                              h),
     GNUNET_MQ_hd_var_size (iterate_result,
                            GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
@@ -844,7 +834,8 @@ reconnect (void *cls)
                            GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
                            struct StoreRecordMessage,
                            h),
-    GNUNET_MQ_handler_end () };
+    GNUNET_MQ_handler_end ()
+  };
   struct GNUNET_MQ_Envelope *ev;
 
   h->reconnect_task = NULL;
@@ -868,8 +859,10 @@ reconnect (void *cls)
   for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != 
ic;
        ic = ic->next)
   {
+    ic->rid = get_op_id(h);
     ev =
-      PEERSTORE_create_record_mq_envelope (ic->sub_system,
+      PEERSTORE_create_record_mq_envelope (ic->rid,
+                                           ic->sub_system,
                                            &ic->peer,
                                            ic->key,
                                            NULL,
@@ -882,8 +875,10 @@ reconnect (void *cls)
   for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
        sc = sc->next)
   {
+    sc->rid = get_op_id(h);
     ev =
-      PEERSTORE_create_record_mq_envelope (sc->sub_system,
+      PEERSTORE_create_record_mq_envelope (sc->rid,
+                                           sc->sub_system,
                                            &sc->peer,
                                            sc->key,
                                            sc->value,
@@ -891,7 +886,6 @@ reconnect (void *cls)
                                            sc->expiry,
                                            sc->options,
                                            
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
-    GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
     GNUNET_MQ_send (h->mq, ev);
   }
 }
@@ -909,7 +903,7 @@ GNUNET_PEERSTORE_watch_cancel (struct 
GNUNET_PEERSTORE_WatchContext *wc)
   struct GNUNET_MQ_Envelope *ev;
   struct StoreKeyHashMessage *hm;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Cancelling watch.\n");
   if (NULL != wc->ic)
   {
     GNUNET_PEERSTORE_iterate_cancel (wc->ic);
@@ -919,6 +913,7 @@ GNUNET_PEERSTORE_watch_cancel (struct 
GNUNET_PEERSTORE_WatchContext *wc)
 
   ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
   hm->keyhash = wc->keyhash;
+  hm->rid = get_op_id (h);
   GNUNET_MQ_send (h->mq, ev);
   GNUNET_assert (
     GNUNET_YES ==
@@ -935,53 +930,55 @@ watch_iterate (void *cls,
   struct GNUNET_PEERSTORE_WatchContext *wc = cls;
   struct GNUNET_PEERSTORE_Handle *h = wc->h;
   struct StoreKeyHashMessage *hm;
+  struct GNUNET_MQ_Envelope *ev;
+  const struct GNUNET_PeerIdentity *peer;
 
   if (NULL != emsg)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Got failure from PEERSTORE: %s\n",
-                emsg);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Got failure from PEERSTORE: %s\n",
+         emsg);
     wc->callback (wc->callback_cls, NULL, emsg);
     return;
   }
-  if (NULL == record)
+  if ((NULL != record) &&
+      (NULL != wc->callback))
   {
-    struct GNUNET_MQ_Envelope *ev;
-    const struct GNUNET_PeerIdentity *peer;
-
-    if (NULL == wc->peer)
-      peer = GNUNET_new (struct GNUNET_PeerIdentity);
-    else
-      peer = wc->peer;
-    ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
-    PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Hash key we watch for %s\n",
-         GNUNET_h2s_full (&hm->keyhash));
-    wc->keyhash = hm->keyhash;
-    if (NULL == h->watches)
-      h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
-    GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
-                     h->watches,
-                     &wc->keyhash,
-                     wc,
-                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
-         wc->sub_system,
-         GNUNET_i2s (peer),
-         wc->key);
-    GNUNET_MQ_send (h->mq, ev);
-    wc->ic = NULL;
-    if (NULL != wc->callback)
-      wc->callback (wc->callback_cls, record, NULL);
-    if (NULL == wc->peer)
-      GNUNET_free_nz ((void *) peer);
+    wc->callback (wc->callback_cls, record, NULL);
     return;
   }
 
+  if (NULL == wc->peer)
+    peer = GNUNET_new (struct GNUNET_PeerIdentity);
+  else
+    peer = wc->peer;
+  ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+  PEERSTORE_hash_key (wc->sub_system, peer, wc->key, &hm->keyhash);
+  hm->rid = get_op_id (h);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Hash key we watch for %s\n",
+       GNUNET_h2s_full (&hm->keyhash));
+  wc->keyhash = hm->keyhash;
+  if (NULL == h->watches)
+    h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
+  GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
+                   h->watches,
+                   &wc->keyhash,
+                   wc,
+                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
+       wc->sub_system,
+       GNUNET_i2s (peer),
+       wc->key);
+  GNUNET_MQ_send (h->mq, ev);
+  wc->ic = NULL;
   if (NULL != wc->callback)
     wc->callback (wc->callback_cls, record, NULL);
+  if (NULL == wc->peer)
+    GNUNET_free_nz ((void *) peer);
+  return;
+
 }
 
 
@@ -1040,32 +1037,32 @@ hello_updated (void *cls,
   struct GNUNET_PEERSTORE_NotifyContext *nc = cls;
   const struct GNUNET_MessageHeader *hello;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "hello_updated\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "hello_updated\n");
   if (NULL != emsg)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Got failure from PEERSTORE: %s\n",
-                emsg);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Got failure from PEERSTORE: %s\n",
+         emsg);
     nc->callback (nc->callback_cls, NULL, NULL, emsg);
     return;
   }
   if (NULL == record)
     return;
   hello = record->value;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "hello_updated with expired %s and size %u for peer %s\n",
-              GNUNET_STRINGS_absolute_time_to_string (
-                GNUNET_HELLO_builder_get_expiration_time (hello)),
-              ntohs (hello->size),
-              GNUNET_i2s (&record->peer));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "hello_updated with expired %s and size %u for peer %s\n",
+       GNUNET_STRINGS_absolute_time_to_string (
+         GNUNET_HELLO_builder_get_expiration_time (hello)),
+       ntohs (hello->size),
+       GNUNET_i2s (&record->peer));
   if ((0 == record->value_size))
   {
     GNUNET_break (0);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "hello_updated call callback\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "hello_updated call callback\n");
   nc->callback (nc->callback_cls, &record->peer, hello, NULL);
 }
 
@@ -1119,13 +1116,13 @@ merge_success (void *cls, int success)
 
   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_remove 
(huc->store_context_map,
                                                          huc->pid, 
shu_cls->sc))
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "There was no store context to be removed after storing hello 
for peer %s\n",
-                GNUNET_i2s (huc->pid));
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "There was no store context to be removed after storing hello for 
peer %s\n",
+         GNUNET_i2s (huc->pid));
   if (GNUNET_OK != success)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Storing hello uri failed\n");
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Storing hello uri failed\n");
     huc->cont (huc->cont_cls, success);
     GNUNET_free (huc->hello);
     GNUNET_free (huc->pid);
@@ -1139,18 +1136,18 @@ merge_success (void *cls, int success)
     huc->wc = NULL;
     huc->cont (huc->cont_cls, GNUNET_OK);
     huc->success = GNUNET_OK;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Storing hello uri succeeded for peer %s!\n",
-                GNUNET_i2s (huc->pid));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Storing hello uri succeeded for peer %s!\n",
+         GNUNET_i2s (huc->pid));
     GNUNET_free (huc->hello);
     GNUNET_free (huc->pid);
     GNUNET_free (huc);
     GNUNET_free (shu_cls);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Got notified during storing hello uri for peer %s!\n",
-              GNUNET_i2s (huc->pid));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got notified during storing hello uri for peer %s!\n",
+       GNUNET_i2s (huc->pid));
   GNUNET_free (shu_cls);
 }
 
@@ -1176,9 +1173,9 @@ store_hello (struct GNUNET_PEERSTORE_StoreHelloContext 
*huc,
                                GNUNET_PEERSTORE_STOREOPTION_REPLACE,
                                merge_success,
                                shu_cls);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "store_hello with expiration %s\n",
-              GNUNET_STRINGS_absolute_time_to_string (hello_exp));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "store_hello with expiration %s\n",
+       GNUNET_STRINGS_absolute_time_to_string (hello_exp));
   GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_multipeermap_put (
                    huc->store_context_map,
                    huc->pid,
@@ -1202,27 +1199,27 @@ merge_uri  (void *cls,
 
   if (NULL != emsg)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Got failure from PEERSTORE: %s\n",
-                emsg);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Got failure from PEERSTORE: %s\n",
+         emsg);
     return;
   }
 
   if (NULL == record && GNUNET_NO == huc->success)
   {
     huc_hello_exp_time = GNUNET_HELLO_builder_get_expiration_time (huc->hello);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "merge_uri just store for peer %s with expiration %s\n",
-                GNUNET_i2s (huc->pid),
-                GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "merge_uri just store for peer %s with expiration %s\n",
+         GNUNET_i2s (huc->pid),
+         GNUNET_STRINGS_absolute_time_to_string (huc_hello_exp_time));
     store_hello (huc, huc->hello);
   }
   else if (GNUNET_NO == huc->success && 0 == GNUNET_memcmp (huc->pid,
                                                             &record->peer))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "merge_uri record for peer %s\n",
-                GNUNET_i2s (&record->peer));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "merge_uri record for peer %s\n",
+         GNUNET_i2s (&record->peer));
     hello = record->value;
     if ((0 == record->value_size))
     {
diff --git a/src/service/peerstore/peerstore_common.c 
b/src/service/peerstore/peerstore_common.c
index e3bb77d86..5d4d06c0c 100644
--- a/src/service/peerstore/peerstore_common.c
+++ b/src/service/peerstore/peerstore_common.c
@@ -59,21 +59,9 @@ PEERSTORE_hash_key (const char *sub_system,
 }
 
 
-/**
- * Creates a MQ envelope for a single record
- *
- * @param sub_system sub system string
- * @param peer Peer identity (can be NULL)
- * @param key record key string (can be NULL)
- * @param value record value BLOB (can be NULL)
- * @param value_size record value size in bytes (set to 0 if value is NULL)
- * @param expiry time after which the record expires
- * @param options options specific to the storage operation
- * @param msg_type message type to be set in header
- * @return pointer to record message struct
- */
 struct GNUNET_MQ_Envelope *
-PEERSTORE_create_record_mq_envelope (const char *sub_system,
+PEERSTORE_create_record_mq_envelope (uint32_t rid,
+                                     const char *sub_system,
                                      const struct GNUNET_PeerIdentity *peer,
                                      const char *key,
                                      const void *value,
@@ -106,6 +94,7 @@ PEERSTORE_create_record_mq_envelope (const char *sub_system,
     srm->peer_set = htons (GNUNET_YES);
     srm->peer = *peer;
   }
+  srm->rid = rid;
   srm->sub_system_size = htons (ss_size);
   srm->value_size = htons (value_size);
   srm->options = htonl (options);
diff --git a/src/service/peerstore/peerstore_common.h 
b/src/service/peerstore/peerstore_common.h
index f5352f5a5..56f1f8b8b 100644
--- a/src/service/peerstore/peerstore_common.h
+++ b/src/service/peerstore/peerstore_common.h
@@ -40,6 +40,7 @@ PEERSTORE_hash_key (const char *sub_system,
 /**
  * Creates a MQ envelope for a single record
  *
+ * @param rid request ID
  * @param sub_system sub system string
  * @param peer Peer identity (can be NULL)
  * @param key record key string (can be NULL)
@@ -51,7 +52,8 @@ PEERSTORE_hash_key (const char *sub_system,
  * @return pointer to record message struct
  */
 struct GNUNET_MQ_Envelope *
-PEERSTORE_create_record_mq_envelope (const char *sub_system,
+PEERSTORE_create_record_mq_envelope (uint32_t rid,
+                                     const char *sub_system,
                                      const struct GNUNET_PeerIdentity *peer,
                                      const char *key,
                                      const void *value,
diff --git a/src/service/peerstore/perf_peerstore_store.c 
b/src/service/peerstore/perf_peerstore_store.c
index e59af61e5..e328be93e 100644
--- a/src/service/peerstore/perf_peerstore_store.c
+++ b/src/service/peerstore/perf_peerstore_store.c
@@ -32,31 +32,48 @@ static int ok = 1;
 
 static struct GNUNET_PEERSTORE_Handle *h;
 
+static struct GNUNET_PEERSTORE_WatchContext *wc;
+
 static char *ss = "test_peerstore_stress";
 static struct GNUNET_PeerIdentity p;
 static char *k = "test_peerstore_stress_key";
 static char *v = "test_peerstore_stress_val";
 
 static int count = 0;
+static int count_fin = 0;
 
 static void
-disconnect ()
+disconnect (void *cls)
 {
+  GNUNET_PEERSTORE_watch_cancel (wc);
   if (NULL != h)
     GNUNET_PEERSTORE_disconnect (h);
   GNUNET_SCHEDULER_shutdown ();
 }
 
 
+static void
+store_cont (void *cls, int ret)
+{
+  count_fin++;
+  if (count_fin == count)
+  {
+    ok = 0;
+    GNUNET_SCHEDULER_add_now (&disconnect, NULL);
+  }
+}
+
+
 static void
 store ()
 {
+  count++;
   GNUNET_PEERSTORE_store (h, ss, &p, k, v, strlen (v) + 1,
                           GNUNET_TIME_UNIT_FOREVER_ABS,
                           (count ==
                            0) ? GNUNET_PEERSTORE_STOREOPTION_REPLACE :
-                          GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, NULL, NULL);
-  count++;
+                          GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, store_cont,
+                          NULL);
 }
 
 
@@ -65,12 +82,7 @@ watch_cb (void *cls, const struct GNUNET_PEERSTORE_Record 
*record,
           const char *emsg)
 {
   GNUNET_assert (NULL == emsg);
-  if (STORES == count)
-  {
-    ok = 0;
-    disconnect ();
-  }
-  else
+  if (STORES > count)
     store ();
 }
 
@@ -82,7 +94,7 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg,
   memset (&p, 5, sizeof(p));
   h = GNUNET_PEERSTORE_connect (cfg);
   GNUNET_assert (NULL != h);
-  GNUNET_PEERSTORE_watch (h, ss, &p, k, &watch_cb, NULL);
+  wc = GNUNET_PEERSTORE_watch (h, ss, &p, k, &watch_cb, NULL);
   store ();
 }
 
diff --git a/src/service/peerstore/test_peerstore_api_iterate.c 
b/src/service/peerstore/test_peerstore_api_iterate.c
index 59367a110..0fa58f3fd 100644
--- a/src/service/peerstore/test_peerstore_api_iterate.c
+++ b/src/service/peerstore/test_peerstore_api_iterate.c
@@ -21,6 +21,7 @@
  * @file peerstore/test_peerstore_api_iterate.c
  * @brief testcase for peerstore iteration operation
  */
+#include "gnunet_common.h"
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_testing_lib.h"
@@ -40,6 +41,13 @@ static char *k3 = "test_peerstore_api_iterate_key3";
 static char *val = "test_peerstore_api_iterate_val";
 static int count = 0;
 
+static void
+finish (void *cls)
+{
+  GNUNET_PEERSTORE_disconnect (h);
+  GNUNET_SCHEDULER_shutdown ();
+}
+
 
 static void
 iter3_cb (void *cls,
@@ -58,8 +66,7 @@ iter3_cb (void *cls,
   }
   GNUNET_assert (count == 3);
   ok = 0;
-  GNUNET_PEERSTORE_disconnect (h);
-  GNUNET_SCHEDULER_shutdown ();
+  GNUNET_SCHEDULER_add_now (&finish, NULL);
 }
 
 
@@ -104,6 +111,7 @@ iter1_cb (void *cls,
     count++;
     return;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u is count\n", count);
   GNUNET_assert (count == 1);
   count = 0;
   ic = GNUNET_PEERSTORE_iterate (h,
@@ -115,6 +123,50 @@ iter1_cb (void *cls,
 }
 
 
+static void
+store_cont (void *cls, int success)
+{
+  GNUNET_assert (GNUNET_OK == success);
+  if (0 == count)
+  {
+    GNUNET_PEERSTORE_store (h,
+                            ss,
+                            &p1,
+                            k2,
+                            val,
+                            strlen (val) + 1,
+                            GNUNET_TIME_UNIT_FOREVER_ABS,
+                            GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                            &store_cont,
+                            NULL);
+  }
+  else if (1 == count)
+  {
+    GNUNET_PEERSTORE_store (h,
+                            ss,
+                            &p2,
+                            k3,
+                            val,
+                            strlen (val) + 1,
+                            GNUNET_TIME_UNIT_FOREVER_ABS,
+                            GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                            &store_cont,
+                            NULL);
+  }
+  else
+  {
+    count = 0;
+    ic = GNUNET_PEERSTORE_iterate (h,
+                                   ss,
+                                   &p1,
+                                   k1,
+                                   &iter1_cb, NULL);
+    return;
+  }
+  count++;
+}
+
+
 static void
 run (void *cls,
      const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -124,6 +176,7 @@ run (void *cls,
   GNUNET_assert (NULL != h);
   memset (&p1, 1, sizeof(p1));
   memset (&p2, 2, sizeof(p2));
+  count = 0;
   GNUNET_PEERSTORE_store (h,
                           ss,
                           &p1,
@@ -132,33 +185,8 @@ run (void *cls,
                           strlen (val) + 1,
                           GNUNET_TIME_UNIT_FOREVER_ABS,
                           GNUNET_PEERSTORE_STOREOPTION_REPLACE,
-                          NULL,
-                          NULL);
-  GNUNET_PEERSTORE_store (h,
-                          ss,
-                          &p1,
-                          k2,
-                          val,
-                          strlen (val) + 1,
-                          GNUNET_TIME_UNIT_FOREVER_ABS,
-                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
-                          NULL,
+                          &store_cont,
                           NULL);
-  GNUNET_PEERSTORE_store (h,
-                          ss,
-                          &p2,
-                          k3,
-                          val,
-                          strlen (val) + 1,
-                          GNUNET_TIME_UNIT_FOREVER_ABS,
-                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
-                          NULL,
-                          NULL);
-  ic = GNUNET_PEERSTORE_iterate (h,
-                                 ss,
-                                 &p1,
-                                 k1,
-                                 &iter1_cb, NULL);
 }
 
 
diff --git a/src/service/peerstore/test_peerstore_api_store.c 
b/src/service/peerstore/test_peerstore_api_store.c
index 77e8a17c1..8cf0e60a7 100644
--- a/src/service/peerstore/test_peerstore_api_store.c
+++ b/src/service/peerstore/test_peerstore_api_store.c
@@ -21,6 +21,7 @@
  * @file peerstore/test_peerstore_api_store.c
  * @brief testcase for peerstore store operation
  */
+#include "gnunet_common.h"
 #include "platform.h"
 #include "gnunet_peerstore_service.h"
 #include "gnunet_testing_lib.h"
@@ -40,6 +41,13 @@ static char *val3 = "test_peerstore_api_store_val3--";
 static int count = 0;
 
 
+static void
+finish (void *cls)
+{
+  GNUNET_PEERSTORE_disconnect (h);
+  GNUNET_SCHEDULER_shutdown ();
+}
+
 static void
 test3_cont2 (void *cls,
              const struct GNUNET_PEERSTORE_Record *record,
@@ -57,8 +65,7 @@ test3_cont2 (void *cls,
   }
   GNUNET_assert (count == 1);
   ok = 0;
-  GNUNET_PEERSTORE_disconnect (h, GNUNET_YES);
-  GNUNET_SCHEDULER_shutdown ();
+  GNUNET_SCHEDULER_add_now (&finish, NULL);
 }
 
 
@@ -158,7 +165,10 @@ test1_cont2 (void *cls,
              const char *emsg)
 {
   if (NULL != emsg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Error received: %s\n", emsg);
     return;
+  }
   if (NULL != record)
   {
     GNUNET_assert ((strlen (val1) + 1) == record->value_size);
@@ -175,6 +185,7 @@ test1_cont2 (void *cls,
 static void
 test1_cont (void *cls, int success)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Store done, ret=%d...\n", success);
   if (GNUNET_YES != success)
     return;
   count = 0;
@@ -193,6 +204,7 @@ test1_cont (void *cls, int success)
 static void
 test1 ()
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test1 start\n");
   GNUNET_PEERSTORE_store (h,
                           subsystem,
                           &pid,
diff --git a/src/service/peerstore/test_peerstore_api_sync.c 
b/src/service/peerstore/test_peerstore_api_sync.c
deleted file mode 100644
index 4e16afae8..000000000
--- a/src/service/peerstore/test_peerstore_api_sync.c
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
-     This file is part of GNUnet.
-     Copyright (C) 2015 GNUnet e.V.
-
-     GNUnet is free software: you can redistribute it and/or modify it
-     under the terms of the GNU Affero General Public License as published
-     by the Free Software Foundation, either version 3 of the License,
-     or (at your option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     Affero General Public License for more details.
-
-     You should have received a copy of the GNU Affero General Public License
-     along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-     SPDX-License-Identifier: AGPL3.0-or-later
- */
-/**
- * @file peerstore/test_peerstore_api_sync.c
- * @brief testcase for peerstore sync-on-disconnect feature. Stores
- *        a value just before disconnecting, and then checks that
- *        this value is actually stored.
- * @author Omar Tarabai
- * @author Christian Grothoff (minor fix, comments)
- */
-#include "platform.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_testing_lib.h"
-#include "gnunet_peerstore_service.h"
-
-/**
- * Overall result, 0 for success.
- */
-static int ok = 404;
-
-/**
- * Configuration we use.
- */
-static const struct GNUNET_CONFIGURATION_Handle *cfg;
-
-/**
- * handle to talk to the peerstore.
- */
-static struct GNUNET_PEERSTORE_Handle *h;
-
-/**
- * Subsystem we store the value for.
- */
-static const char *subsystem = "test_peerstore_api_sync";
-
-/**
- * Fake PID under which we store the value.
- */
-static struct GNUNET_PeerIdentity pid;
-
-/**
- * Test key we're storing the test value under.
- */
-static const char *key = "test_peerstore_api_store_key";
-
-/**
- * Test value we are storing.
- */
-static const char *val = "test_peerstore_api_store_val";
-
-
-/**
- * Timeout
- */
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
-
-/**
- * Timeout task
- */
-static struct GNUNET_SCHEDULER_Task *to;
-
-/**
- * Iterate handle
- */
-static struct GNUNET_PEERSTORE_IterateContext *it;
-
-static void
-test_cont (void *cls);
-
-/**
- * Function that should be called with the result of the
- * lookup, and finally once with NULL to signal the end
- * of the iteration.
- *
- * Upon the first call, we set "ok" to success. On the
- * second call (end of iteration) we terminate the test.
- *
- * @param cls NULL
- * @param record the information stored in the peerstore
- * @param emsg any error message
- * @return #GNUNET_YES (all good, continue)
- */
-static void
-iterate_cb (void *cls,
-            const struct GNUNET_PEERSTORE_Record *record,
-            const char *emsg)
-{
-  const char *rec_val;
-
-  GNUNET_break (NULL == emsg);
-  if (NULL == record)
-  {
-    it = NULL;
-    if (0 == ok)
-    {
-      GNUNET_PEERSTORE_disconnect (h);
-      if (NULL != to)
-      {
-        GNUNET_SCHEDULER_cancel (to);
-        to = NULL;
-      }
-      GNUNET_SCHEDULER_shutdown ();
-      return;
-    }
-    /**
-     * Try again
-     */
-    GNUNET_SCHEDULER_add_now (&test_cont,
-                              NULL);
-    return;
-  }
-  rec_val = record->value;
-  GNUNET_break (0 == strcmp (rec_val, val));
-  ok = 0;
-}
-
-
-static void
-timeout_task (void *cls)
-{
-  to = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-              "Timeout reached\n");
-  if (NULL != it)
-    GNUNET_PEERSTORE_iterate_cancel (it);
-  it = NULL;
-  GNUNET_PEERSTORE_disconnect (h,
-                               GNUNET_NO);
-  GNUNET_SCHEDULER_shutdown ();
-  return;
-}
-
-
-/**
- * Run the 2nd stage of the test where we fetch the
- * data that should have been stored.
- *
- * @param cls NULL
- */
-static void
-test_cont (void *cls)
-{
-  it = GNUNET_PEERSTORE_iterate (h,
-                                 subsystem,
-                                 &pid, key,
-                                 &iterate_cb,
-                                 NULL);
-}
-
-
-static void
-disc_cont (void *cls)
-{
-  GNUNET_PEERSTORE_disconnect (h, GNUNET_YES);
-  h = GNUNET_PEERSTORE_connect (cfg);
-  GNUNET_SCHEDULER_add_now (&test_cont,
-                            NULL);
-}
-
-
-static void
-store_cont (void *cls, int success)
-{
-  ok = success;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Success: %s\n",
-              (GNUNET_SYSERR == ok) ? "no" : "yes");
-  /* We need to wait a little bit to give the disconnect
-     a chance to actually finish the operation; otherwise,
-     the test may fail non-deterministically if the new
-     connection is faster than the cleanup routine of the
-     old one. */
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-                                &disc_cont,
-                                NULL);
-}
-
-
-/**
- * Actually run the test.
- */
-static void
-test1 ()
-{
-  h = GNUNET_PEERSTORE_connect (cfg);
-  GNUNET_PEERSTORE_store (h,
-                          subsystem,
-                          &pid,
-                          key,
-                          val, strlen (val) + 1,
-                          GNUNET_TIME_UNIT_FOREVER_ABS,
-                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
-                          &store_cont, NULL);
-}
-
-
-/**
- * Initialize globals and launch the test.
- *
- * @param cls NULL
- * @param c configuration to use
- * @param peer handle to our peer (unused)
- */
-static void
-run (void *cls,
-     const struct GNUNET_CONFIGURATION_Handle *c,
-     struct GNUNET_TESTING_Peer *peer)
-{
-  cfg = c;
-  memset (&pid, 1, sizeof(pid));
-  to = GNUNET_SCHEDULER_add_delayed (TIMEOUT,
-                                     &timeout_task,
-                                     NULL);
-  GNUNET_SCHEDULER_add_now (&test1, NULL);
-}
-
-
-int
-main (int argc, char *argv[])
-{
-  if (0 !=
-      GNUNET_TESTING_service_run ("test-gnunet-peerstore-sync",
-                                  "peerstore",
-                                  "peerstore.conf",
-                                  &run, NULL))
-    return 1;
-  if (0 != ok)
-    fprintf (stderr,
-             "Test failed: %d\n",
-             ok);
-  return ok;
-}
-
-
-/* end of test_peerstore_api_sync.c */
diff --git a/src/service/peerstore/test_peerstore_api_watch.c 
b/src/service/peerstore/test_peerstore_api_watch.c
index 126b321df..63b0e896b 100644
--- a/src/service/peerstore/test_peerstore_api_watch.c
+++ b/src/service/peerstore/test_peerstore_api_watch.c
@@ -21,6 +21,8 @@
  * @file peerstore/test_peerstore_api_watch.c
  * @brief testcase for peerstore watch functionality
  */
+#include "gnunet_common.h"
+#include "gnunet_time_lib.h"
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_testing_lib.h"
@@ -31,12 +33,42 @@ static int ok = 1;
 
 static struct GNUNET_PEERSTORE_Handle *h;
 
+static struct GNUNET_PEERSTORE_WatchContext *wc;
+
 static char *ss = "test_peerstore_api_watch";
 
 static char *k = "test_peerstore_api_watch_key";
 
 static char *val = "test_peerstore_api_watch_val";
 
+static struct GNUNET_PeerIdentity p;
+
+static void
+finish (void *cls)
+{
+  GNUNET_PEERSTORE_watch_cancel (wc);
+  GNUNET_PEERSTORE_disconnect (h);
+  GNUNET_SCHEDULER_shutdown ();
+}
+
+
+static void
+cont (void *cls)
+{
+  GNUNET_PEERSTORE_store (h,
+                          ss,
+                          &p,
+                          k,
+                          val,
+                          strlen (val) + 1,
+                          GNUNET_TIME_UNIT_FOREVER_ABS,
+                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
+                          NULL,
+                          NULL);
+}
+
+
+static int initial_iteration = GNUNET_YES;
 
 static void
 watch_cb (void *cls,
@@ -44,12 +76,28 @@ watch_cb (void *cls,
           const char *emsg)
 {
   GNUNET_assert (NULL == emsg);
+  if (GNUNET_YES == initial_iteration)
+  {
+    if (NULL != record)
+    {
+      GNUNET_break (0);
+      return; // Ignore this record, FIXME: Test unclean
+    }
+    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &cont, NULL);
+    initial_iteration = GNUNET_NO;
+    return;
+  }
+  if (NULL == record)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received record: %s\n",
+              (char*) record->value);
   GNUNET_assert (0 == strcmp (val,
                               (char *) record->value));
   ok = 0;
-  GNUNET_PEERSTORE_disconnect (h,
-                               GNUNET_NO);
-  GNUNET_SCHEDULER_shutdown ();
+  GNUNET_SCHEDULER_add_now (&finish, NULL);
 }
 
 
@@ -58,29 +106,18 @@ run (void *cls,
      const struct GNUNET_CONFIGURATION_Handle *cfg,
      struct GNUNET_TESTING_Peer *peer)
 {
-  struct GNUNET_PeerIdentity p;
 
   h = GNUNET_PEERSTORE_connect (cfg);
   GNUNET_assert (NULL != h);
   memset (&p,
           4,
           sizeof(p));
-  GNUNET_PEERSTORE_watch (h,
-                          ss,
-                          &p,
-                          k,
-                          &watch_cb,
-                          NULL);
-  GNUNET_PEERSTORE_store (h,
-                          ss,
-                          &p,
-                          k,
-                          val,
-                          strlen (val) + 1,
-                          GNUNET_TIME_UNIT_FOREVER_ABS,
-                          GNUNET_PEERSTORE_STOREOPTION_REPLACE,
-                          NULL,
-                          NULL);
+  wc = GNUNET_PEERSTORE_watch (h,
+                               ss,
+                               &p,
+                               k,
+                               &watch_cb,
+                               NULL);
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]