[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated: -DHT: deduplicate monitor matching logic
From: |
gnunet |
Subject: |
[gnunet] branch master updated: -DHT: deduplicate monitor matching logic |
Date: |
Sun, 02 Jan 2022 20:46:09 +0100 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new c891e4d29 -DHT: deduplicate monitor matching logic
c891e4d29 is described below
commit c891e4d29ca772b6b246b928a1bda8d8c9ef499f
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sun Jan 2 20:46:06 2022 +0100
-DHT: deduplicate monitor matching logic
---
src/dht/gnunet-service-dht.h | 2 +-
src/dht/gnunet-service-dht_clients.c | 539 +++++++++++++++++++++--------------
2 files changed, 323 insertions(+), 218 deletions(-)
diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h
index d520cc905..e9b1ff63a 100644
--- a/src/dht/gnunet-service-dht.h
+++ b/src/dht/gnunet-service-dht.h
@@ -89,7 +89,7 @@ GDS_CLIENTS_handle_reply (const struct
GDS_DATACACHE_BlockData *bd,
* @param key Key of the requested data.
*/
void
-GDS_CLIENTS_process_get (uint32_t options,
+GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
enum GNUNET_BLOCK_Type type,
uint32_t hop_count,
uint32_t desired_replication_level,
diff --git a/src/dht/gnunet-service-dht_clients.c
b/src/dht/gnunet-service-dht_clients.c
index b520cda41..8acde2fe7 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -855,78 +855,6 @@ handle_dht_local_get_stop (
}
-/**
- * Handler for monitor start messages
- *
- * @param cls the client we received this message from
- * @param msg the actual message received
- *
- */
-static void
-handle_dht_local_monitor (void *cls,
- const struct GNUNET_DHT_MonitorStartStopMessage *msg)
-{
- struct ClientHandle *ch = cls;
- struct ClientMonitorRecord *r;
-
- r = GNUNET_new (struct ClientMonitorRecord);
- r->ch = ch;
- r->type = ntohl (msg->type);
- r->get = ntohs (msg->get);
- r->get_resp = ntohs (msg->get_resp);
- r->put = ntohs (msg->put);
- if (0 != ntohs (msg->filter_key))
- r->key = msg->key;
- GNUNET_CONTAINER_DLL_insert (monitor_head,
- monitor_tail,
- r);
- GNUNET_SERVICE_client_continue (ch->client);
-}
-
-
-/**
- * Handler for monitor stop messages
- *
- * @param cls the client we received this message from
- * @param msg the actual message received
- */
-static void
-handle_dht_local_monitor_stop (
- void *cls,
- const struct GNUNET_DHT_MonitorStartStopMessage *msg)
-{
- struct ClientHandle *ch = cls;
-
- GNUNET_SERVICE_client_continue (ch->client);
- for (struct ClientMonitorRecord *r = monitor_head;
- NULL != r;
- r = r->next)
- {
- bool keys_match;
-
- keys_match =
- (GNUNET_is_zero (&r->key))
- ? (0 == ntohs (msg->filter_key))
- : ( (0 != ntohs (msg->filter_key)) &&
- (! GNUNET_memcmp (&r->key,
- &msg->key)) );
- if ( (ch == r->ch) &&
- (ntohl (msg->type) == r->type) &&
- (r->get == msg->get) &&
- (r->get_resp == msg->get_resp) &&
- (r->put == msg->put) &&
- keys_match)
- {
- GNUNET_CONTAINER_DLL_remove (monitor_head,
- monitor_tail,
- r);
- GNUNET_free (r);
- return; /* Delete only ONE entry */
- }
- }
-}
-
-
/**
* Closure for #forward_reply()
*/
@@ -1132,26 +1060,106 @@ GDS_CLIENTS_handle_reply (const struct
GDS_DATACACHE_BlockData *bd,
}
+/* ************* logic for monitors ************** */
+
+
/**
- * Check if some client is monitoring GET messages and notify
- * them in that case. If tracked, @a path should include the local peer.
+ * Handler for monitor start messages
+ *
+ * @param cls the client we received this message from
+ * @param msg the actual message received
*
- * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
- * @param type The type of data in the request.
- * @param hop_count Hop count so far.
- * @param path_length number of entries in path (or 0 if not recorded).
- * @param path peers on the GET path (or NULL if not recorded).
- * @param desired_replication_level Desired replication level.
- * @param key Key of the requested data.
*/
-void
-GDS_CLIENTS_process_get (uint32_t options,
- enum GNUNET_BLOCK_Type type,
- uint32_t hop_count,
- uint32_t desired_replication_level,
- unsigned int path_length,
- const struct GNUNET_PeerIdentity *path,
- const struct GNUNET_HashCode *key)
+static void
+handle_dht_local_monitor (void *cls,
+ const struct GNUNET_DHT_MonitorStartStopMessage *msg)
+{
+ struct ClientHandle *ch = cls;
+ struct ClientMonitorRecord *r;
+
+ r = GNUNET_new (struct ClientMonitorRecord);
+ r->ch = ch;
+ r->type = ntohl (msg->type);
+ r->get = ntohs (msg->get);
+ r->get_resp = ntohs (msg->get_resp);
+ r->put = ntohs (msg->put);
+ if (0 != ntohs (msg->filter_key))
+ r->key = msg->key;
+ GNUNET_CONTAINER_DLL_insert (monitor_head,
+ monitor_tail,
+ r);
+ GNUNET_SERVICE_client_continue (ch->client);
+}
+
+
+/**
+ * Handler for monitor stop messages
+ *
+ * @param cls the client we received this message from
+ * @param msg the actual message received
+ */
+static void
+handle_dht_local_monitor_stop (
+ void *cls,
+ const struct GNUNET_DHT_MonitorStartStopMessage *msg)
+{
+ struct ClientHandle *ch = cls;
+
+ GNUNET_SERVICE_client_continue (ch->client);
+ for (struct ClientMonitorRecord *r = monitor_head;
+ NULL != r;
+ r = r->next)
+ {
+ bool keys_match;
+
+ keys_match =
+ (GNUNET_is_zero (&r->key))
+ ? (0 == ntohs (msg->filter_key))
+ : ( (0 != ntohs (msg->filter_key)) &&
+ (! GNUNET_memcmp (&r->key,
+ &msg->key)) );
+ if ( (ch == r->ch) &&
+ (ntohl (msg->type) == r->type) &&
+ (r->get == msg->get) &&
+ (r->get_resp == msg->get_resp) &&
+ (r->put == msg->put) &&
+ keys_match)
+ {
+ GNUNET_CONTAINER_DLL_remove (monitor_head,
+ monitor_tail,
+ r);
+ GNUNET_free (r);
+ return; /* Delete only ONE entry */
+ }
+ }
+}
+
+
+/**
+ * Function to call by #for_matching_monitors().
+ *
+ * @param cls closure
+ * @param m a matching monitor
+ */
+typedef void
+(*MonitorAction)(void *cls,
+ struct ClientMonitorRecord *m);
+
+
+/**
+ * Call @a cb on all monitors that watch for blocks of @a type
+ * and key @a key.
+ *
+ * @param type the type to match
+ * @param key the key to match
+ * @param cb function to call
+ * @param cb_cls closure for @a cb
+ */
+static void
+for_matching_monitors (enum GNUNET_BLOCK_Type type,
+ const struct GNUNET_HashCode *key,
+ MonitorAction cb,
+ void *cb_cls)
{
struct ClientHandle **cl = NULL;
unsigned int cl_size = 0;
@@ -1161,16 +1169,12 @@ GDS_CLIENTS_process_get (uint32_t options,
m = m->next)
{
if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
- (m->type == type)) &&
+ (m->type == type) ) &&
( (GNUNET_is_zero (&m->key)) ||
(0 ==
GNUNET_memcmp (key,
- &m->key))))
+ &m->key)) ) )
{
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_DHT_MonitorGetMessage *mmsg;
- struct GNUNET_PeerIdentity *msg_path;
- size_t msize;
unsigned int i;
/* Don't send duplicates */
@@ -1182,87 +1186,230 @@ GDS_CLIENTS_process_get (uint32_t options,
GNUNET_array_append (cl,
cl_size,
m->ch);
- msize = path_length * sizeof(struct GNUNET_PeerIdentity);
- env = GNUNET_MQ_msg_extra (mmsg,
- msize,
- GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
- mmsg->options = htonl (options);
- mmsg->type = htonl (type);
- mmsg->hop_count = htonl (hop_count);
- mmsg->desired_replication_level = htonl (desired_replication_level);
- mmsg->get_path_length = htonl (path_length);
- mmsg->key = *key;
- msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
- GNUNET_memcpy (msg_path,
- path,
- path_length * sizeof(struct GNUNET_PeerIdentity));
- GNUNET_MQ_send (m->ch->mq,
- env);
+ cb (cb_cls,
+ m);
}
}
GNUNET_free (cl);
}
+/**
+ * Closure for #get_action();
+ */
+struct GetActionContext
+{
+ enum GNUNET_DHT_RouteOption options;
+ enum GNUNET_BLOCK_Type type;
+ uint32_t hop_count;
+ uint32_t desired_replication_level;
+ unsigned int get_path_length;
+ const struct GNUNET_PeerIdentity *get_path;
+ const struct GNUNET_HashCode *key;
+};
+
+
+/**
+ * Function called on monitors that match a GET.
+ * Sends the GET notification to the monitor.
+ *
+ * @param cls a `struct GetActionContext`
+ * @param m a matching monitor
+ */
+static void
+get_action (void *cls,
+ struct ClientMonitorRecord *m)
+{
+ struct GetActionContext *gac = cls;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_DHT_MonitorGetMessage *mmsg;
+ struct GNUNET_PeerIdentity *msg_path;
+ size_t msize;
+
+ msize = gac->get_path_length * sizeof(struct GNUNET_PeerIdentity);
+ env = GNUNET_MQ_msg_extra (mmsg,
+ msize,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
+ mmsg->options = htonl (gac->options);
+ mmsg->type = htonl (gac->type);
+ mmsg->hop_count = htonl (gac->hop_count);
+ mmsg->desired_replication_level = htonl (gac->desired_replication_level);
+ mmsg->get_path_length = htonl (gac->get_path_length);
+ mmsg->key = *gac->key;
+ msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+ GNUNET_memcpy (msg_path,
+ gac->get_path,
+ gac->get_path_length * sizeof(struct GNUNET_PeerIdentity));
+ GNUNET_MQ_send (m->ch->mq,
+ env);
+}
+
+
+/**
+ * Check if some client is monitoring GET messages and notify
+ * them in that case. If tracked, @a path should include the local peer.
+ *
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param key Key of the requested data.
+ */
+void
+GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ const struct GNUNET_HashCode *key)
+{
+ struct GetActionContext gac = {
+ .options = options,
+ .type = type,
+ .hop_count = hop_count,
+ .desired_replication_level = desired_replication_level,
+ .get_path_length = path_length,
+ .get_path = path,
+ .key = key
+ };
+
+ for_matching_monitors (type,
+ key,
+ &get_action,
+ &gac);
+}
+
+
+/**
+ * Closure for response_action().
+ */
+struct ResponseActionContext
+{
+ const struct GDS_DATACACHE_BlockData *bd;
+ const struct GNUNET_PeerIdentity *get_path;
+ unsigned int get_path_length;
+};
+
+
+/**
+ * Function called on monitors that match a response.
+ * Sends the response notification to the monitor.
+ *
+ * @param cls a `struct ResponseActionContext`
+ * @param m a matching monitor
+ */
+static void
+response_action (void *cls,
+ struct ClientMonitorRecord *m)
+{
+ const struct ResponseActionContext *resp_ctx = cls;
+ const struct GDS_DATACACHE_BlockData *bd = resp_ctx->bd;
+
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
+ struct GNUNET_PeerIdentity *path;
+ size_t msize;
+
+ msize = bd->data_size;
+ msize += (resp_ctx->get_path_length + bd->put_path_length)
+ * sizeof(struct GNUNET_PeerIdentity);
+ env = GNUNET_MQ_msg_extra (mmsg,
+ msize,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
+ mmsg->type = htonl (bd->type);
+ mmsg->put_path_length = htonl (bd->put_path_length);
+ mmsg->get_path_length = htonl (resp_ctx->get_path_length);
+ mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
+ mmsg->key = bd->key;
+ path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+ GNUNET_memcpy (path,
+ bd->put_path,
+ bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (path,
+ resp_ctx->get_path,
+ resp_ctx->get_path_length * sizeof(struct
+ GNUNET_PeerIdentity));
+ GNUNET_memcpy (&path[resp_ctx->get_path_length],
+ bd->data,
+ bd->data_size);
+ GNUNET_MQ_send (m->ch->mq,
+ env);
+}
+
+
void
GDS_CLIENTS_process_get_resp (const struct GDS_DATACACHE_BlockData *bd,
const struct GNUNET_PeerIdentity *get_path,
unsigned int get_path_length)
{
- struct ClientHandle **cl = NULL;
- unsigned int cl_size = 0;
+ struct ResponseActionContext rac = {
+ .bd = bd,
+ .get_path = get_path,
+ .get_path_length = get_path_length
+ };
- for (struct ClientMonitorRecord *m = monitor_head;
- NULL != m;
- m = m->next)
- {
- if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
- (m->type == bd->type) ) &&
- ( (GNUNET_is_zero (&m->key)) ||
- (0 == GNUNET_memcmp (&bd->key,
- &m->key)) ) )
- {
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
- struct GNUNET_PeerIdentity *path;
- size_t msize;
- unsigned int i;
+ for_matching_monitors (bd->type,
+ &bd->key,
+ &response_action,
+ &rac);
+}
- /* Don't send duplicates */
- for (i = 0; i < cl_size; i++)
- if (cl[i] == m->ch)
- break;
- if (i < cl_size)
- continue;
- GNUNET_array_append (cl,
- cl_size,
- m->ch);
- msize = bd->data_size;
- msize += (get_path_length + bd->put_path_length)
- * sizeof(struct GNUNET_PeerIdentity);
- env = GNUNET_MQ_msg_extra (mmsg,
- msize,
- GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
- mmsg->type = htonl (bd->type);
- mmsg->put_path_length = htonl (bd->put_path_length);
- mmsg->get_path_length = htonl (get_path_length);
- mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
- mmsg->key = bd->key;
- path = (struct GNUNET_PeerIdentity *) &mmsg[1];
- GNUNET_memcpy (path,
- bd->put_path,
- bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
- GNUNET_memcpy (path,
- get_path,
- get_path_length * sizeof(struct GNUNET_PeerIdentity));
- GNUNET_memcpy (&path[get_path_length],
- bd->data,
- bd->data_size);
- GNUNET_MQ_send (m->ch->mq,
- env);
- }
- }
- GNUNET_free (cl);
+
+/**
+ * Closure for put_action().
+ */
+struct PutActionContext
+{
+ const struct GDS_DATACACHE_BlockData *bd;
+ enum GNUNET_DHT_RouteOption options;
+ uint32_t hop_count;
+ uint32_t desired_replication_level;
+};
+
+
+/**
+ * Function called on monitors that match a PUT.
+ * Sends the PUT notification to the monitor.
+ *
+ * @param cls a `struct PutActionContext`
+ * @param m a matching monitor
+ */
+static void
+put_action (void *cls,
+ struct ClientMonitorRecord *m)
+{
+ const struct PutActionContext *put_ctx = cls;
+ const struct GDS_DATACACHE_BlockData *bd = put_ctx->bd;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_DHT_MonitorPutMessage *mmsg;
+ struct GNUNET_PeerIdentity *msg_path;
+ size_t msize;
+
+ msize = bd->data_size
+ + bd->put_path_length
+ * sizeof(struct GNUNET_PeerIdentity);
+ env = GNUNET_MQ_msg_extra (mmsg,
+ msize,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
+ mmsg->options = htonl (put_ctx->options);
+ mmsg->type = htonl (bd->type);
+ mmsg->hop_count = htonl (put_ctx->hop_count);
+ mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level);
+ mmsg->put_path_length = htonl (bd->put_path_length);
+ mmsg->key = bd->key;
+ mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
+ msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+ GNUNET_memcpy (msg_path,
+ bd->put_path,
+ bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (&msg_path[bd->put_path_length],
+ bd->data,
+ bd->data_size);
+ GNUNET_MQ_send (m->ch->mq,
+ env);
}
@@ -1272,59 +1419,17 @@ GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption
options,
uint32_t hop_count,
uint32_t desired_replication_level)
{
- struct ClientHandle **cl = NULL;
- unsigned int cl_size = 0;
-
- for (struct ClientMonitorRecord *m = monitor_head;
- NULL != m;
- m = m->next)
- {
- if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
- (m->type == bd->type) ) &&
- ( (GNUNET_is_zero (&m->key)) ||
- (0 ==
- GNUNET_memcmp (&bd->key,
- &m->key)) ) )
- {
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_DHT_MonitorPutMessage *mmsg;
- struct GNUNET_PeerIdentity *msg_path;
- size_t msize;
- unsigned int i;
+ struct PutActionContext put_ctx = {
+ .bd = bd,
+ .hop_count = hop_count,
+ .desired_replication_level = desired_replication_level,
+ .options = options
+ };
- /* Don't send duplicates */
- for (i = 0; i < cl_size; i++)
- if (cl[i] == m->ch)
- break;
- if (i < cl_size)
- continue;
- GNUNET_array_append (cl,
- cl_size,
- m->ch);
- msize = bd->data_size;
- msize += bd->put_path_length * sizeof(struct GNUNET_PeerIdentity);
- env = GNUNET_MQ_msg_extra (mmsg,
- msize,
- GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
- mmsg->options = htonl (options);
- mmsg->type = htonl (bd->type);
- mmsg->hop_count = htonl (hop_count);
- mmsg->desired_replication_level = htonl (desired_replication_level);
- mmsg->put_path_length = htonl (bd->put_path_length);
- mmsg->key = bd->key;
- mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
- msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
- GNUNET_memcpy (msg_path,
- bd->put_path,
- bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
- GNUNET_memcpy (&msg_path[bd->put_path_length],
- bd->data,
- bd->data_size);
- GNUNET_MQ_send (m->ch->mq,
- env);
- }
- }
- GNUNET_free (cl);
+ for_matching_monitors (bd->type,
+ &bd->key,
+ &put_action,
+ &put_ctx);
}
@@ -1334,7 +1439,7 @@ GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption
options,
* @param server the initialized server
*/
static void
-GDS_CLIENTS_init ()
+GDS_CLIENTS_init (void)
{
forward_map
= GNUNET_CONTAINER_multihashmap_create (1024,
@@ -1348,7 +1453,7 @@ GDS_CLIENTS_init ()
* Shutdown client subsystem.
*/
static void
-GDS_CLIENTS_stop ()
+GDS_CLIENTS_stop (void)
{
if (NULL != retry_task)
{
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet] branch master updated: -DHT: deduplicate monitor matching logic,
gnunet <=