[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30511 - in gnunet/src: include set util
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30511 - in gnunet/src: include set util |
Date: |
Tue, 5 Nov 2013 01:08:13 +0100 |
Author: dold
Date: 2013-11-05 01:08:13 +0100 (Tue, 05 Nov 2013)
New Revision: 30511
Added:
gnunet/src/set/test_set_union_result_full.c
Modified:
gnunet/src/include/gnunet_container_lib.h
gnunet/src/include/gnunet_secretsharing_service.h
gnunet/src/set/Makefile.am
gnunet/src/set/gnunet-service-set.c
gnunet/src/set/gnunet-service-set.h
gnunet/src/set/gnunet-service-set_union.c
gnunet/src/set/ibf.h
gnunet/src/set/strata_estimator.h
gnunet/src/set/test_set.conf
gnunet/src/util/container_multihashmap32.c
gnunet/src/util/mq.c
Log:
- implemented missing set functionality
- secretsharing api changes
Modified: gnunet/src/include/gnunet_container_lib.h
===================================================================
--- gnunet/src/include/gnunet_container_lib.h 2013-11-04 17:04:28 UTC (rev
30510)
+++ gnunet/src/include/gnunet_container_lib.h 2013-11-05 00:08:13 UTC (rev
30511)
@@ -1082,6 +1082,14 @@
/**
* @ingroup hashmap
+ * Opaque handle to an iterator over
+ * a 32-bit key multihashmap.
+ */
+struct GNUNET_CONTAINER_MultiHashMap32Iterator;
+
+
+/**
+ * @ingroup hashmap
* Iterator over hash map entries.
*
* @param cls closure
@@ -1267,8 +1275,51 @@
void *it_cls);
+/**
+ * Create an iterator for a 32-bit multihashmap.
+ * The iterator can be used to retrieve all the elements in the multihashmap
+ * one by one, without having to handle all elements at once (in contrast to
+ * #GNUNET_CONTAINER_multihashmap32_iterate). Note that the iterator can not
be
+ * used anymore if elements have been removed from 'map' after the creation of
+ * the iterator, or 'map' has been destroyed. Adding elements to 'map' may
+ * result in skipped or repeated elements.
+ *
+ * @param map the map to create an iterator for
+ * @return an iterator over the given multihashmap map
+ */
+struct GNUNET_CONTAINER_MultiHashMap32Iterator *
+GNUNET_CONTAINER_multihashmap32_iterator_create (const struct
GNUNET_CONTAINER_MultiHashMap32 *map);
+/**
+ * Retrieve the next element from the hash map at the iterator's position.
+ * If there are no elements left, GNUNET_NO is returned, and 'key' and 'value'
+ * are not modified.
+ * This operation is only allowed if no elements have been removed from the
+ * multihashmap since the creation of 'iter', and the map has not been
destroyed.
+ * Adding elements may result in repeating or skipping elements.
+ *
+ * @param iter the iterator to get the next element from
+ * @param key pointer to store the key in, can be NULL
+ * @param value pointer to store the value in, can be NULL
+ * @return #GNUNET_YES we returned an element,
+ * #GNUNET_NO if we are out of elements
+ */
+int
+GNUNET_CONTAINER_multihashmap32_iterator_next (struct
GNUNET_CONTAINER_MultiHashMap32Iterator *iter,
+ uint32_t *key,
+ const void **value);
+
+
+/**
+ * Destroy a 32-bit multihashmap iterator.
+ *
+ * @param iter the iterator to destroy
+ */
+void
+GNUNET_CONTAINER_multihashmap32_iterator_destroy (struct
GNUNET_CONTAINER_MultiHashMapIterator *iter);
+
+
/* ******************** doubly-linked list *************** */
/* To avoid mistakes: head->prev == tail->next == NULL */
Modified: gnunet/src/include/gnunet_secretsharing_service.h
===================================================================
--- gnunet/src/include/gnunet_secretsharing_service.h 2013-11-04 17:04:28 UTC
(rev 30510)
+++ gnunet/src/include/gnunet_secretsharing_service.h 2013-11-05 00:08:13 UTC
(rev 30511)
@@ -48,7 +48,12 @@
*/
struct GNUNET_SECRETSHARING_Session;
+/**
+ * Share of a secret shared with a group of peers.
+ */
+struct GNUNET_SECRETSHARING_Share;
+
/**
* Handle to cancel a cooperative decryption operation.
*/
@@ -56,22 +61,14 @@
/**
- * Parameters of the crypto system.
+ * Public key of a group sharing a secret.
*/
-struct GNUNET_SECRETSHARING_Parameters
+struct GNUNET_SECRETSHARING_PublicKey
{
/**
- * Prime with p = 2q+1.
+ * Value of the private key.
*/
- gcry_mpi_t p;
- /**
- * Prime.
- */
- gcry_mpi_t q;
- /**
- * Generator of G_q.
- */
- gcry_mpi_t g;
+ gcry_mpi_t value;
};
@@ -92,20 +89,35 @@
/**
+ * Plain, unencrypted message that can be encrypted with
+ * a group public key.
+ */
+struct GNUNET_SECRETSHARING_Message
+{
+ /**
+ * Value of the message.
+ */
+ gcry_mpi_t value;
+};
+
+
+/**
* Called once the secret has been established with all peers, or the deadline
is due.
*
* Note that the number of peers can be smaller that 'k' (this threshold
parameter), which
- * makes the threshold crypto system useledd. However, in this case one can
still determine which peers
+ * makes the threshold crypto system useless. However, in this case one can
still determine which peers
* were able to participate in the secret sharing successfully.
*
* @param cls closure
+ * @param my_share the share of this peer
* @param public_key public key of the session
- * @param num_ready_peers number of peers in @ready_peers
- * @parem ready_peers peers that successfuly participated in establishing
+ * @param num_ready_peers number of peers in ready_peers
+ * @param ready_peers peers that successfuly participated in establishing
* the shared secret
*/
typedef void (*GNUNET_SECRETSHARING_SecretReadyCallback) (void *cls,
- gcry_mpi_t
public_key,
+ const struct
GNUNET_SECRETSHARING_Share *my_share,
+ const struct
GNUNET_SECRETSHARING_PublicKey public_key,
unsigned int
num_ready_peers,
const struct
GNUNET_PeerIdentity *ready_peers);
@@ -114,10 +126,10 @@
* Called when a decryption has succeeded.
*
* @param cls closure
- * @param result decrypted value
+ * @param result decrypted value, must be free'd by the callback eventually
*/
typedef void (*GNUNET_SECRETSHARING_DecryptCallback) (void *cls,
- gcry_mpi_t result);
+ struct
GNUNET_SECRETSHARING_Message *result);
/**
@@ -125,11 +137,11 @@
* with the other peers.
*
* @param cfg configuration to use
- * @param num_peers number of peers in @peers
+ * @param num_peers number of peers in 'peers'
+ * @param peers array of peers that we will share secrets with, can optionally
contain the local peer
* @param session_id unique session id
* @param deadline point in time where the session must be established; taken
as hint
* by underlying consensus sessions
- * @param parameters parameters for the crypto system
* @param threshold minimum number of peers that must cooperate to decrypt a
value
* @param cb called when the secret has been established
* @param cls closure for cb
@@ -140,13 +152,51 @@
const struct GNUNET_PeerIdentity *peers,
const struct GNUNET_HashCode *session_id,
struct GNUNET_TIME_Absolute deadline,
- struct GNUNET_SECRETSHARING_Parameters
*parameters,
unsigned int threshold,
GNUNET_SECRETSHARING_SecretReadyCallback
*cb,
void *cls);
/**
+ * Load a session from an existing share.
+ *
+ * @param cfg configuration to use for connecting to the secretsharing service
+ * @param share share to load the session from
+ */
+struct GNUNET_SECRETSHARING_Session *
+GNUNET_SECRETSHARING_load_session (const struct GNUNET_CONFIGURATION_Handle
*cfg,
+ const struct GNUNET_SECRETSHARING_Share
*share);
+
+/**
+ * Convert a secret share to a string.
+ *
+ * @param share share to serialize
+ * @return the serialized secret share, to be freed by the caller
+ */
+char *
+GNUNET_SECRETSHARING_share_to_string (const struct GNUNET_SECRETSHARING_Share
*share);
+
+
+/**
+ * Convert a secret share to a string.
+ *
+ * @param str string to deserialize
+ * @return the serialized secret share, to be freed by the caller
+ */
+const struct GNUNET_SECRETSHARING_Share *
+GNUNET_SECRETSHARING_share_from_string (const char *str);
+
+
+/**
+ * Destroy a secret share.
+ *
+ * @param share secret share to destroy
+ */
+void
+GNUNET_SECRETSHARING_share_destroy (const struct GNUNET_SECRETSHARING_Share
*share);
+
+
+/**
* Destroy a secret sharing session.
*
* @param session session to destroy
@@ -165,12 +215,12 @@
* @param session session to take the key for encryption from,
* the session's ready callback must have been already called
* @param message message to encrypt
- * @param result_cyphertext pointer to store the resulting ciphertext
+ * @param result_ciphertext pointer to store the resulting ciphertext
* @return GNUNET_YES on succes, GNUNET_SYSERR if the message is invalid
(invalid range)
*/
int
GNUNET_SECRETSHARING_encrypt (const struct GNUNET_SECRETSHARING_Session
*session,
- gcry_mpi_t message,
+ const struct GNUNET_SECRETSHARING_Message
*message,
struct GNUNET_SECRETSHARING_Ciphertext
*result_ciphertext);
@@ -206,6 +256,8 @@
GNUNET_SECRETSHARING_cancel_decrypt (struct
GNUNET_SECRETSHARING_DecryptionHandle *decryption_handle);
+
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: gnunet/src/set/Makefile.am
===================================================================
--- gnunet/src/set/Makefile.am 2013-11-04 17:04:28 UTC (rev 30510)
+++ gnunet/src/set/Makefile.am 2013-11-05 00:08:13 UTC (rev 30511)
@@ -63,7 +63,7 @@
if HAVE_TESTING
check_PROGRAMS = \
- test_set_api
+ test_set_api test_set_union_result_full
endif
if ENABLE_TEST_RUN
@@ -79,6 +79,15 @@
test_set_api_DEPENDENCIES = \
libgnunetset.la
+test_set_union_result_full_SOURCES = \
+ test_set_union_result_full.c
+test_set_union_result_full_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/set/libgnunetset.la
+test_set_union_result_full_DEPENDENCIES = \
+ libgnunetset.la
+
EXTRA_DIST = \
test_set.conf
Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-11-04 17:04:28 UTC (rev 30510)
+++ gnunet/src/set/gnunet-service-set.c 2013-11-05 00:08:13 UTC (rev 30511)
@@ -28,41 +28,20 @@
/**
- * Peer that has connected to us, but is not yet evaluating a set operation.
- * Once the peer has sent a request, and the client has
- * accepted or rejected it, this information will be deleted.
+ * State of an operation where the peer has connected to us, but is not yet
+ * evaluating a set operation. Once the peer has sent a concrete request, and
+ * the client has accepted or rejected it, this information will be deleted
+ * and replaced by the real set operation state.
*/
-struct Incoming
+struct OperationState
{
/**
- * Incoming peers are held in a linked list
- */
- struct Incoming *next;
-
- /**
- * Incoming peers are held in a linked list
- */
- struct Incoming *prev;
-
- /**
- * Detail information about the operation.
- * NULL as long as we did not receive the operation
- * request from the remote peer.
- */
- struct OperationSpecification *spec;
-
- /**
* The identity of the requesting peer. Needs to
* be stored here as the op spec might not have been created yet.
*/
struct GNUNET_PeerIdentity peer;
/**
- * Tunnel to the peer.
- */
- struct GNUNET_MESH_Tunnel *tunnel;
-
- /**
* Unique request id for the request from
* a remote peer, sent to the client, which will
* accept or reject the request.
@@ -76,12 +55,6 @@
* after the timeout, it will be disconnected.
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
- /**
- * Tunnel context, needs to be stored here as a client's accept will change
- * the tunnel context.
- */
- struct TunnelContext *tc;
};
@@ -160,13 +133,13 @@
* Incoming sockets from remote peers are
* held in a doubly linked list.
*/
-static struct Incoming *incoming_head;
+static struct Operation *incoming_head;
/**
* Incoming sockets from remote peers are
* held in a doubly linked list.
*/
-static struct Incoming *incoming_tail;
+static struct Operation *incoming_tail;
/**
* Counter for allocating unique IDs for clients,
@@ -221,14 +194,14 @@
* @return the incoming socket associated with the id,
* or NULL if there is none
*/
-static struct Incoming *
+static struct Operation *
get_incoming (uint32_t id)
{
- struct Incoming *incoming;
+ struct Operation *op;
- for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
- if (incoming->suggest_id == id)
- return incoming;
+ for (op = incoming_head; NULL != op; op = op)
+ if (op->state->suggest_id == id)
+ return op;
return NULL;
}
@@ -261,7 +234,8 @@
/**
- * Iterator over hash map entries.
+ * Iterator over hash map entries to free
+ * element entries.
*
* @param cls closure
* @param key current key code
@@ -283,6 +257,100 @@
/**
+ * Collect and destroy elements that are not needed anymore, because
+ * their lifetime (as determined by their generation) does not overlap with
any active
+ * set operation.
+ */
+void
+collect_generation_garbage (struct Set *set)
+{
+ struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+ struct ElementEntry *ee;
+ struct GNUNET_CONTAINER_MultiHashMap *new_elements;
+ int res;
+ struct Operation *op;
+
+ new_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->elements);
+ while (GNUNET_OK ==
+ (res = GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL,
(const void **) &ee)))
+ {
+ if (GNUNET_NO == ee->removed)
+ goto still_needed;
+ for (op = set->ops_head; NULL != op; op = op->next)
+ if ( (op->generation_created >= ee->generation_added) &&
+ (op->generation_created < ee->generation_removed) )
+ goto still_needed;
+ GNUNET_free (ee);
+ continue;
+still_needed:
+ // we don't expect collisions, thus the replace option
+ GNUNET_CONTAINER_multihashmap_put (new_elements, &ee->element_hash, ee,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ }
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+ GNUNET_CONTAINER_multihashmap_destroy (set->elements);
+ set->elements = new_elements;
+}
+
+
+/**
+ * Destroy the given operation. Call the implementation-specific cancel
function
+ * of the operation. Disconnects from the remote peer.
+ * Does not disconnect the client, as there may be multiple operations per set.
+ *
+ * @param op operation to destroy
+ */
+void
+_GSS_operation_destroy (struct Operation *op)
+{
+ struct Set *set;
+
+ if (NULL == op->vt)
+ return;
+
+ set = op->spec->set;
+
+ GNUNET_assert (GNUNET_NO == op->is_incoming);
+ GNUNET_assert (NULL != op->spec);
+ GNUNET_CONTAINER_DLL_remove (op->spec->set->ops_head,
+ op->spec->set->ops_tail,
+ op);
+
+ op->vt->cancel (op);
+ op->vt = NULL;
+
+ if (NULL != op->spec)
+ {
+ if (NULL != op->spec->context_msg)
+ {
+ GNUNET_free (op->spec->context_msg);
+ op->spec->context_msg = NULL;
+ }
+ GNUNET_free (op->spec);
+ op->spec = NULL;
+ }
+
+ if (NULL != op->mq)
+ {
+ GNUNET_MQ_destroy (op->mq);
+ op->mq = NULL;
+ }
+
+ if (NULL != op->tunnel)
+ {
+ GNUNET_MESH_tunnel_destroy (op->tunnel);
+ op->tunnel = NULL;
+ }
+
+ collect_generation_garbage (set);
+
+ /* We rely on the tunnel end handler to free 'op'. When 'op->tunnel' was
NULL,
+ * there was a tunnel end handler that will free 'op' on the call stack. */
+}
+
+
+/**
* Destroy a set, and free all resources associated with it.
*
* @param set the set to destroy
@@ -302,6 +370,8 @@
return;
}
GNUNET_assert (NULL != set->state);
+ while (NULL != set->ops_head)
+ _GSS_operation_destroy (set->ops_head);
set->vt->destroy_set (set->state);
set->state = NULL;
if (NULL != set->client_mq)
@@ -364,25 +434,40 @@
* @param incoming remote request to destroy
*/
static void
-incoming_destroy (struct Incoming *incoming)
+incoming_destroy (struct Operation *incoming)
{
+ GNUNET_assert (GNUNET_YES == incoming->is_incoming);
GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
- if (GNUNET_SCHEDULER_NO_TASK != incoming->timeout_task)
+ if (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task)
{
- GNUNET_SCHEDULER_cancel (incoming->timeout_task);
- incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (incoming->state->timeout_task);
+ incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK;
}
- if (NULL != incoming->tunnel)
- {
- struct GNUNET_MESH_Tunnel *t = incoming->tunnel;
- incoming->tunnel = NULL;
- GNUNET_MESH_tunnel_destroy (t);
- return;
- }
- GNUNET_free (incoming);
+ GNUNET_free (incoming->state);
}
+static void
+incoming_retire (struct Operation *incoming)
+{
+ GNUNET_assert (NULL != incoming->spec);
+ GNUNET_assert (GNUNET_YES == incoming->is_incoming);
+ incoming->is_incoming = GNUNET_NO;
+ GNUNET_free (incoming->state);
+ incoming->state = NULL;
+ GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
+}
+
+
+/**
+ * Find a listener that is interested in the given operation type
+ * and application id.
+ *
+ * @param op operation type to look for
+ * @param app_id application id to look for
+ * @return a matching listener, or NULL if no listener matches the
+ * given operation and application id
+ */
static struct Listener *
listener_get_by_target (enum GNUNET_SET_OperationType op,
const struct GNUNET_HashCode *app_id)
@@ -409,23 +494,24 @@
* @param listener the listener to suggest the request to
*/
static void
-incoming_suggest (struct Incoming *incoming, struct Listener *listener)
+incoming_suggest (struct Operation *incoming, struct Listener *listener)
{
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_RequestMessage *cmsg;
- GNUNET_assert (0 == incoming->suggest_id);
GNUNET_assert (NULL != incoming->spec);
- incoming->suggest_id = suggest_id++;
+ GNUNET_assert (0 == incoming->state->suggest_id);
+ incoming->state->suggest_id = suggest_id++;
- GNUNET_SCHEDULER_cancel (incoming->timeout_task);
- incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task);
+ GNUNET_SCHEDULER_cancel (incoming->state->timeout_task);
+ incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK;
mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
incoming->spec->context_msg);
GNUNET_assert (NULL != mqm);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id
%u\n",
- incoming->suggest_id);
- cmsg->accept_id = htonl (incoming->suggest_id);
+ incoming->state->suggest_id);
+ cmsg->accept_id = htonl (incoming->state->suggest_id);
cmsg->peer_id = incoming->spec->peer;
GNUNET_MQ_send (listener->client_mq, mqm);
}
@@ -441,10 +527,9 @@
* GNUNET_SYSERR to destroy the tunnel
*/
static int
-handle_incoming_msg (struct OperationState *op,
+handle_incoming_msg (struct Operation *op,
const struct GNUNET_MessageHeader *mh)
{
- struct Incoming *incoming = (struct Incoming *) op;
const struct OperationRequestMessage *msg = (const struct
OperationRequestMessage *) mh;
struct Listener *listener;
struct OperationSpecification *spec;
@@ -457,7 +542,7 @@
return GNUNET_SYSERR;
}
- if (NULL != incoming->spec)
+ if (NULL != op->spec)
{
/* double operation request */
GNUNET_break_op (0);
@@ -471,9 +556,9 @@
spec->operation = ntohl (msg->operation);
spec->app_id = msg->app_id;
spec->salt = ntohl (msg->salt);
- spec->peer = incoming->peer;
+ spec->peer = op->state->peer;
- incoming->spec = spec;
+ op->spec = spec;
if ( (NULL != spec->context_msg) &&
(ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE)
)
@@ -491,11 +576,19 @@
"no listener matches incoming request, waiting with
timeout\n");
return GNUNET_OK;
}
- incoming_suggest (incoming, listener);
+ incoming_suggest (op, listener);
return GNUNET_OK;
}
+/**
+ * Send the next element of a set to the set's client. The next element is
given by
+ * the set's current hashmap iterator. The set's iterator will be set to NULL
if there
+ * are no more elements in the set. The caller must ensure that the set's
iterator is
+ * valid.
+ *
+ * @param set set that should send its next element to its client
+ */
static void
send_client_element (struct Set *set)
{
@@ -508,6 +601,8 @@
if (GNUNET_NO == ret)
{
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
+ set->iter = NULL;
}
else
{
@@ -588,7 +683,8 @@
switch (ntohs (msg->operation))
{
case GNUNET_SET_OPERATION_INTERSECTION:
-// set->vt = _GSS_intersection_vt ();
+ // FIXME: implement intersection vt
+ // set->vt = _GSS_intersection_vt ();
break;
case GNUNET_SET_OPERATION_UNION:
set->vt = _GSS_union_vt ();
@@ -623,7 +719,7 @@
{
struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m;
struct Listener *listener;
- struct Incoming *incoming;
+ struct Operation *op;
if (NULL != listener_get (client))
{
@@ -639,24 +735,26 @@
GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app
%s)\n",
listener->operation, GNUNET_h2s (&listener->app_id));
- for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
+ /* check for incoming requests the listener is interested in */
+ for (op = incoming_head; NULL != op; op = op->next)
{
- if (NULL == incoming->spec)
+ if (NULL == op->spec)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n");
continue;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s,
suggest: %u)\n",
- incoming->spec->operation, GNUNET_h2s
(&incoming->spec->app_id), incoming->suggest_id);
+ op->spec->operation, GNUNET_h2s (&op->spec->app_id),
op->state->suggest_id);
- if (0 != incoming->suggest_id)
+ /* don't consider the incoming request if it has been already suggested to
a listener */
+ if (0 != op->state->suggest_id)
continue;
- if (listener->operation != incoming->spec->operation)
+ if (listener->operation != op->spec->operation)
continue;
- if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
&incoming->spec->app_id))
+ if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &op->spec->app_id))
continue;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n");
- incoming_suggest (incoming, listener);
+ incoming_suggest (op, listener);
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n");
GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -676,8 +774,9 @@
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
- struct Incoming *incoming;
+ struct Operation *incoming;
const struct GNUNET_SET_AcceptRejectMessage *msg;
+ struct GNUNET_MESH_Tunnel *tunnel;
msg = (const struct GNUNET_SET_AcceptRejectMessage *) m;
GNUNET_break (0 == ntohl (msg->request_id));
@@ -689,13 +788,17 @@
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n");
- GNUNET_MESH_tunnel_destroy (incoming->tunnel);
+ /* set the incoming's tunnel to NULL so that we don't accidentally destroy
+ * the tunnel again. */
+ tunnel = incoming->tunnel;
+ incoming->tunnel = NULL;
+ GNUNET_MESH_tunnel_destroy (tunnel);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
- * Called when a client wants to add an element to a
+ * Called when a client wants to add/remove an element to/from a
* set it inhabits.
*
* @param cls unused
@@ -784,10 +887,9 @@
const struct GNUNET_MessageHeader *m)
{
struct Set *set;
- struct TunnelContext *tc;
- struct GNUNET_MESH_Tunnel *tunnel;
struct GNUNET_SET_EvaluateMessage *msg;
struct OperationSpecification *spec;
+ struct Operation *op;
set = set_get (client);
if (NULL == set)
@@ -798,7 +900,6 @@
}
msg = (struct GNUNET_SET_EvaluateMessage *) m;
- tc = GNUNET_new (struct TunnelContext);
spec = GNUNET_new (struct OperationSpecification);
spec->operation = set->operation;
spec->app_id = msg->app_id;
@@ -811,13 +912,20 @@
if (NULL != spec->context_msg)
spec->context_msg = GNUNET_copy_message (spec->context_msg);
- tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer,
- GNUNET_APPLICATION_TYPE_SET,
- GNUNET_YES,
- GNUNET_YES);
+ op = GNUNET_new (struct Operation);
+ op->spec = spec;
+ op->generation_created = set->current_generation++;
+ op->vt = set->vt;
+ GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
- set->vt->evaluate (spec, tunnel, tc);
+ op->tunnel = GNUNET_MESH_tunnel_create (mesh, op, &msg->target_peer,
+ GNUNET_APPLICATION_TYPE_SET,
+ GNUNET_YES,
+ GNUNET_YES);
+ op->mq = GNUNET_MESH_mq_create (op->tunnel);
+
+ set->vt->evaluate (op);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -857,8 +965,8 @@
/**
- * Handle a request from the client to accept
- * a set operation that came from a remote peer.
+ * Handle a request from the client to
+ * cancel a running set operation.
*
* @param cls unused
* @param client the client
@@ -872,6 +980,8 @@
const struct GNUNET_SET_CancelMessage *msg =
(const struct GNUNET_SET_CancelMessage *) mh;
struct Set *set;
+ struct Operation *op;
+ int found;
set = set_get (client);
if (NULL == set)
@@ -880,8 +990,24 @@
GNUNET_SERVER_client_disconnect (client);
return;
}
- /* FIXME: maybe cancel should return success/error code? */
- set->vt->cancel (set->state, ntohl (msg->request_id));
+ found = GNUNET_NO;
+ for (op = set->ops_head; NULL != op; op = op->next)
+ {
+ if (op->spec->client_request_id == msg->request_id)
+ {
+ found = GNUNET_YES;
+ break;
+ }
+ }
+
+ if (GNUNET_NO == found)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+
+ _GSS_operation_destroy (op);
}
@@ -899,20 +1025,22 @@
const struct GNUNET_MessageHeader *mh)
{
struct Set *set;
- struct Incoming *incoming;
struct GNUNET_SET_AcceptRejectMessage *msg = (struct
GNUNET_SET_AcceptRejectMessage *) mh;
+ struct Operation *op;
- incoming = get_incoming (ntohl (msg->accept_reject_id));
+ op = get_incoming (ntohl (msg->accept_reject_id));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl
(msg->accept_reject_id));
-
- if (NULL == incoming)
+ if (NULL == op)
{
GNUNET_break (0);
GNUNET_SERVER_client_disconnect (client);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl
(msg->accept_reject_id));
+
+ GNUNET_assert (GNUNET_YES == op->is_incoming);
+
set = set_get (client);
if (NULL == set)
@@ -922,13 +1050,21 @@
return;
}
- incoming->spec->set = set;
- incoming->spec->client_request_id = ntohl (msg->request_id);
- incoming->spec->result_mode = ntohs (msg->result_mode);
- set->vt->accept (incoming->spec, incoming->tunnel, incoming->tc);
- /* tunnel ownership goes to operation */
- incoming->tunnel = NULL;
- incoming_destroy (incoming);
+ op->spec->set = set;
+
+ incoming_retire (op);
+
+ GNUNET_assert (NULL != op->spec->set);
+ GNUNET_assert (NULL != op->spec->set->vt);
+
+ GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
+
+ op->spec->client_request_id = ntohl (msg->request_id);
+ op->spec->result_mode = ntohs (msg->result_mode);
+ op->generation_created = set->current_generation++;
+ op->vt = op->spec->set->vt;
+ GNUNET_assert (NULL != op->vt->accept);
+ set->vt->accept (op);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -952,9 +1088,8 @@
while (NULL != sets_head)
set_destroy (sets_head);
-
- /* it's important to destroy mesh at the end, as tunnels
- * must be destroyed first! */
+ /* it's important to destroy mesh at the end, as all tunnels
+ * must be destroyed before the mesh handle! */
if (NULL != mesh)
{
GNUNET_MESH_disconnect (mesh);
@@ -966,7 +1101,8 @@
/**
- * Signature of the main function of a task.
+ * Handle an incoming peer timeout, that is, disconnect a peer if
+ * has not requested an operation for some amount of time.
*
* @param cls closure
* @param tc context information (why was this task triggered now)
@@ -975,8 +1111,10 @@
incoming_timeout_cb (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct Incoming *incoming = cls;
+ struct Operation *incoming = cls;
+ GNUNET_assert (GNUNET_YES == incoming->is_incoming);
+
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
@@ -986,13 +1124,12 @@
static void
-handle_incoming_disconnect (struct OperationState *op_state)
+handle_incoming_disconnect (struct Operation *op)
{
- struct Incoming *incoming = (struct Incoming *) op_state;
- if (NULL == incoming->tunnel)
+ if (NULL == op->tunnel)
return;
- incoming_destroy (incoming);
+ incoming_destroy (op);
}
@@ -1017,7 +1154,7 @@
const struct GNUNET_PeerIdentity *initiator,
uint32_t port)
{
- struct Incoming *incoming;
+ struct Operation *incoming;
static const struct SetVT incoming_vt = {
.msg_handler = handle_incoming_msg,
.peer_disconnect = handle_incoming_disconnect
@@ -1026,17 +1163,18 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n");
GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
- incoming = GNUNET_new (struct Incoming);
- incoming->peer = *initiator;
+ incoming = GNUNET_new (struct Operation);
+ incoming->is_incoming = GNUNET_YES;
+ incoming->state = GNUNET_new (struct OperationState);
+ incoming->state->peer = *initiator;
incoming->tunnel = tunnel;
- incoming->tc = GNUNET_new (struct TunnelContext);;
- incoming->tc->vt = &incoming_vt;
- incoming->tc->op = (struct OperationState *) incoming;
- incoming->timeout_task =
+ incoming->mq = GNUNET_MESH_mq_create (incoming->tunnel);
+ incoming->vt = &incoming_vt;
+ incoming->state->timeout_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
incoming_timeout_cb, incoming);
GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
- return incoming->tc;
+ return incoming;
}
@@ -1055,9 +1193,14 @@
tunnel_end_cb (void *cls,
const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
{
- struct TunnelContext *ctx = tunnel_ctx;
+ struct Operation *op = tunnel_ctx;
- ctx->vt->peer_disconnect (ctx->op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tunnel end cb called\n");
+
+ op->tunnel = NULL;
+
+ if (NULL != op->vt)
+ op->vt->peer_disconnect (op);
/* mesh will never call us with the context again! */
GNUNET_free (tunnel_ctx);
}
@@ -1085,14 +1228,14 @@
void **tunnel_ctx,
const struct GNUNET_MessageHeader *message)
{
- struct TunnelContext *tc = *tunnel_ctx;
+ struct Operation *op = *tunnel_ctx;
int ret;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n",
ntohs (message->type));
/* do this before the handler, as the handler might kill the tunnel */
GNUNET_MESH_receive_done (tunnel);
- ret = tc->vt->msg_handler (tc->op, message);
+ ret = op->vt->msg_handler (op, message);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n",
ntohs (message->type));
return ret;
Modified: gnunet/src/set/gnunet-service-set.h
===================================================================
--- gnunet/src/set/gnunet-service-set.h 2013-11-04 17:04:28 UTC (rev 30510)
+++ gnunet/src/set/gnunet-service-set.h 2013-11-05 00:08:13 UTC (rev 30511)
@@ -55,8 +55,8 @@
/* forward declarations */
struct Set;
-struct TunnelContext;
struct ElementEntry;
+struct Operation;
/**
@@ -135,7 +135,7 @@
*
* @param op the set operation, contains implementation-specific data
*/
-typedef void (*PeerDisconnectImpl) (struct OperationState *op);
+typedef void (*PeerDisconnectImpl) (struct Operation *op);
/**
@@ -151,13 +151,9 @@
* Signature of functions that implement the creation of set operations
* (currently evaluate and accept).
*
- * @param spec specification of the set operation to be created
- * @param tunnel the tunnel with the other peer
- * @param tc tunnel context
+ * @param op operation that is created, should be initialized by the
implementation
*/
-typedef void (*OpCreateImpl) (struct OperationSpecification *spec,
- struct GNUNET_MESH_Tunnel *tunnel,
- struct TunnelContext *tc);
+typedef void (*OpCreateImpl) (struct Operation *op);
/**
@@ -169,11 +165,10 @@
* @return GNUNET_OK on success, GNUNET_SYSERR to
* destroy the operation and the tunnel
*/
-typedef int (*MsgHandlerImpl) (struct OperationState *op,
+typedef int (*MsgHandlerImpl) (struct Operation *op,
const struct GNUNET_MessageHeader *msg);
-typedef void (*CancelImpl) (struct SetState *set,
- uint32_t request_id);
+typedef void (*CancelImpl) (struct Operation *op);
/**
@@ -263,6 +258,7 @@
/**
* Hash of the element.
+ * For set union:
* Will be used to derive the different IBF keys
* for different salts.
*/
@@ -294,6 +290,63 @@
};
+struct Operation
+{
+ /**
+ * V-Table for the operation belonging
+ * to the tunnel contest.
+ */
+ const struct SetVT *vt;
+
+ /**
+ * Tunnel to the peer.
+ */
+ struct GNUNET_MESH_Tunnel *tunnel;
+
+ /**
+ * Message queue for the tunnel.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * GNUNET_YES if this is not a "real" set operation yet, and we still
+ * need to wait for the other peer to give us more details.
+ */
+ int is_incoming;
+
+ /**
+ * Generation in which the operation handle
+ * was created.
+ */
+ unsigned int generation_created;
+
+ /**
+ * Detail information about the set operation,
+ * including the set to use.
+ * When 'spec' is NULL, the operation is not yet entirely
+ * initialized.
+ */
+ struct OperationSpecification *spec;
+
+ /**
+ * Operation-specific operation state.
+ */
+ struct OperationState *state;
+
+ /**
+ * Evaluate operations are held in
+ * a linked list.
+ */
+ struct Operation *next;
+
+ /**
+ * Evaluate operations are held in
+ * a linked list.
+ */
+ struct Operation *prev;
+};
+
+
/**
* A set that supports a specific operation
* with other peers.
@@ -353,28 +406,25 @@
* previously executed operations on this set
*/
unsigned int current_generation;
-};
-
-/**
- * Information about a tunnel we are connected to.
- * Used as tunnel context with mesh.
- */
-struct TunnelContext
-{
/**
- * V-Table for the operation belonging
- * to the tunnel contest.
+ * Evaluate operations are held in
+ * a linked list.
*/
- const struct SetVT *vt;
+ struct Operation *ops_head;
/**
- * Implementation-specific operation state.
+ * Evaluate operations are held in
+ * a linked list.
*/
- struct OperationState *op;
+ struct Operation *ops_tail;
};
+void
+_GSS_operation_destroy (struct Operation *op);
+
+
/**
* Get the table with implementing functions for
* set union.
@@ -382,6 +432,7 @@
const struct SetVT *
_GSS_union_vt (void);
+
/**
* Get the table with implementing functions for
* set intersection.
Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c 2013-11-04 17:04:28 UTC (rev
30510)
+++ gnunet/src/set/gnunet-service-set_union.c 2013-11-05 00:08:13 UTC (rev
30511)
@@ -112,12 +112,6 @@
struct GNUNET_MESH_Tunnel *tunnel;
/**
- * Detail information about the set operation,
- * including the set to use.
- */
- struct OperationSpecification *spec;
-
- /**
* Message queue for the peer.
*/
struct GNUNET_MQ_Handle *mq;
@@ -151,35 +145,16 @@
struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
/**
- * Current state of the operation.
+ * Iterator for sending elements on the key to element mapping to the client.
*/
- enum UnionOperationPhase phase;
+ struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
/**
- * Generation in which the operation handle
- * was created.
+ * Current state of the operation.
*/
- unsigned int generation_created;
+ enum UnionOperationPhase phase;
/**
- * Set state of the set that this operation
- * belongs to.
- */
- struct Set *set;
-
- /**
- * Evaluate operations are held in
- * a linked list.
- */
- struct OperationState *next;
-
- /**
- * Evaluate operations are held in
- * a linked list.
- */
- struct OperationState *prev;
-
- /**
* Did we send the client that we are done?
*/
int client_done_sent;
@@ -198,13 +173,13 @@
struct IBF_Key ibf_key;
/**
- * The actual element associated with the key
+ * The actual element associated with the key.
*/
struct ElementEntry *element;
/**
* Element that collides with this element
- * on the ibf key
+ * on the ibf key. All colliding entries must have the same ibf key.
*/
struct KeyEntry *next_colliding;
};
@@ -226,7 +201,7 @@
* Operation for which the elements
* should be sent.
*/
- struct OperationState *eo;
+ struct Operation *op;
};
@@ -242,18 +217,6 @@
* salt=0.
*/
struct StrataEstimator *se;
-
- /**
- * Evaluate operations are held in
- * a linked list.
- */
- struct OperationState *ops_head;
-
- /**
- * Evaluate operations are held in
- * a linked list.
- */
- struct OperationState *ops_tail;
};
@@ -263,9 +226,9 @@
* @param cls closure
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
+ * @return #GNUNET_YES if we should continue to
* iterate,
- * GNUNET_NO if not.
+ * #GNUNET_NO if not.
*/
static int
destroy_key_to_element_iter (void *cls,
@@ -290,65 +253,40 @@
/**
- * Destroy a union operation, and free all resources
- * associated with it.
- *
- * @param eo the union operation to destroy
+ * Destroy the union operation. Only things specific to the union operation
are destroyed.
+ *
+ * @param op union operation to destroy
*/
static void
-union_operation_destroy (struct OperationState *eo)
+union_op_cancel (struct Operation *op)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
- GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
- eo->set->state->ops_tail,
- eo);
- if (NULL != eo->mq)
+ /* check if the op was canceled twice */
+ GNUNET_assert (NULL != op->state);
+ if (NULL != op->state->remote_ibf)
{
- GNUNET_MQ_destroy (eo->mq);
- eo->mq = NULL;
+ ibf_destroy (op->state->remote_ibf);
+ op->state->remote_ibf = NULL;
}
- if (NULL != eo->tunnel)
+ if (NULL != op->state->local_ibf)
{
- struct GNUNET_MESH_Tunnel *t = eo->tunnel;
- eo->tunnel = NULL;
- GNUNET_MESH_tunnel_destroy (t);
+ ibf_destroy (op->state->local_ibf);
+ op->state->local_ibf = NULL;
}
- if (NULL != eo->remote_ibf)
+ if (NULL != op->state->se)
{
- ibf_destroy (eo->remote_ibf);
- eo->remote_ibf = NULL;
+ strata_estimator_destroy (op->state->se);
+ op->state->se = NULL;
}
- if (NULL != eo->local_ibf)
+ if (NULL != op->state->key_to_element)
{
- ibf_destroy (eo->local_ibf);
- eo->local_ibf = NULL;
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
destroy_key_to_element_iter, NULL);
+ GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
+ op->state->key_to_element = NULL;
}
- if (NULL != eo->se)
- {
- strata_estimator_destroy (eo->se);
- eo->se = NULL;
- }
- if (NULL != eo->key_to_element)
- {
- GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
destroy_key_to_element_iter, NULL);
- GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
- eo->key_to_element = NULL;
- }
- if (NULL != eo->spec)
- {
- if (NULL != eo->spec->context_msg)
- {
- GNUNET_free (eo->spec->context_msg);
- eo->spec->context_msg = NULL;
- }
- GNUNET_free (eo->spec);
- eo->spec = NULL;
- }
- GNUNET_free (eo);
-
+ GNUNET_free (op->state);
+ op->state = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
-
- /* FIXME: do a garbage collection of the set generations */
}
@@ -356,20 +294,22 @@
* Inform the client that the union operation has failed,
* and proceed to destroy the evaluate operation.
*
- * @param eo the union operation to fail
+ * @param op the union operation to fail
*/
static void
-fail_union_operation (struct OperationState *eo)
+fail_union_operation (struct Operation *op)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *msg;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
+
ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
- msg->request_id = htonl (eo->spec->client_request_id);
+ msg->request_id = htonl (op->spec->client_request_id);
msg->element_type = htons (0);
- GNUNET_MQ_send (eo->spec->set->client_mq, ev);
- union_operation_destroy (eo);
+ GNUNET_MQ_send (op->spec->set->client_mq, ev);
+ _GSS_operation_destroy (op);
}
@@ -382,7 +322,7 @@
* @return the derived IBF key
*/
static struct IBF_Key
-get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
+get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
{
struct IBF_Key key;
@@ -398,40 +338,39 @@
/**
* Send a request for the evaluate operation to a remote peer
*
- * @param eo operation with the other peer
+ * @param op operation with the other peer
*/
static void
-send_operation_request (struct OperationState *eo)
+send_operation_request (struct Operation *op)
{
struct GNUNET_MQ_Envelope *ev;
struct OperationRequestMessage *msg;
ev = GNUNET_MQ_msg_nested_mh (msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- eo->spec->context_msg);
+ op->spec->context_msg);
if (NULL == ev)
{
/* the context message is too large */
GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (eo->spec->set->client);
+ GNUNET_SERVER_client_disconnect (op->spec->set->client);
return;
}
msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
- msg->app_id = eo->spec->app_id;
- msg->salt = htonl (eo->spec->salt);
- GNUNET_MQ_send (eo->mq, ev);
+ msg->app_id = op->spec->app_id;
+ msg->salt = htonl (op->spec->salt);
+ GNUNET_MQ_send (op->mq, ev);
- if (NULL != eo->spec->context_msg)
+ if (NULL != op->spec->context_msg)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context
message\n");
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context
message\n");
- if (NULL != eo->spec->context_msg)
+ if (NULL != op->spec->context_msg)
{
- GNUNET_free (eo->spec->context_msg);
- eo->spec->context_msg = NULL;
+ GNUNET_free (op->spec->context_msg);
+ op->spec->context_msg = NULL;
}
-
}
@@ -442,34 +381,89 @@
* @param cls closure
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
+ * @return #GNUNET_YES if we should continue to
* iterate,
- * GNUNET_NO if not.
+ * #GNUNET_NO if not.
*/
static int
op_register_element_iterator (void *cls,
- uint32_t key,
- void *value)
+ uint32_t key,
+ void *value)
{
struct KeyEntry *const new_k = cls;
struct KeyEntry *old_k = value;
GNUNET_assert (NULL != old_k);
- do
+ /* check if our ibf key collides with the ibf key in the existing entry */
+ if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
{
- if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
- {
- new_k->next_colliding = old_k->next_colliding;
- old_k->next_colliding = new_k;
+ /* insert the the new key in the collision chain */
+ new_k->next_colliding = old_k->next_colliding;
+ old_k->next_colliding = new_k;
+ /* signal to the caller that we were able to insert into a colliding
bucket */
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Iterator to create the mapping between ibf keys
+ * and element entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to
+ * iterate,
+ * #GNUNET_NO if not.
+ */
+static int
+op_has_element_iterator (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct GNUNET_HashCode *element_hash = cls;
+ struct KeyEntry *k = value;
+
+ GNUNET_assert (NULL != k);
+ while (NULL != k)
+ {
+ if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
return GNUNET_NO;
- }
- old_k = old_k->next_colliding;
- } while (NULL != old_k);
+ k = k->next_colliding;
+ }
return GNUNET_YES;
}
/**
+ * Determine whether the given element is already in the operation's element
+ * set.
+ *
+ * @param op operation that should be tested for 'element_hash'
+ * @param element_hash hash of the element to look for
+ * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
+ */
+static int
+op_has_element (struct Operation *op, const struct GNUNET_HashCode
*element_hash)
+{
+ int ret;
+ struct IBF_Key ibf_key;
+
+ ibf_key = get_ibf_key (element_hash, op->spec->salt);
+ ret = GNUNET_CONTAINER_multihashmap32_get_multiple
(op->state->key_to_element,
+ (uint32_t)
ibf_key.key_val,
+ op_has_element_iterator,
(void *) element_hash);
+
+ /* was the iteration aborted because we found the element? */
+ if (GNUNET_SYSERR == ret)
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+/**
* Insert an element into the union operation's
* key-to-element mapping. Takes ownership of 'ee'.
* Note that this does not insert the element in the set,
@@ -477,21 +471,21 @@
* This is done to speed up re-tried operations, if some elements
* were transmitted, and then the IBF fails to decode.
*
- * @param eo the union operation
+ * @param op the union operation
* @param ee the element entry
*/
static void
-op_register_element (struct OperationState *eo, struct ElementEntry *ee)
+op_register_element (struct Operation *op, struct ElementEntry *ee)
{
int ret;
struct IBF_Key ibf_key;
struct KeyEntry *k;
- ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
+ ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
k = GNUNET_new (struct KeyEntry);
k->element = ee;
k->ibf_key = ibf_key;
- ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
+ ret = GNUNET_CONTAINER_multihashmap32_get_multiple
(op->state->key_to_element,
(uint32_t)
ibf_key.key_val,
op_register_element_iterator, k);
@@ -499,7 +493,7 @@
if (GNUNET_SYSERR == ret)
return;
- GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t)
ibf_key.key_val, k,
+ GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t)
ibf_key.key_val, k,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
@@ -542,19 +536,19 @@
const struct GNUNET_HashCode *key,
void *value)
{
- struct OperationState *eo = cls;
+ struct Operation *op = cls;
struct ElementEntry *e = value;
/* make sure that the element belongs to the set at the time
* of creating the operation */
- if ( (e->generation_added > eo->generation_created) ||
+ if ( (e->generation_added > op->generation_created) ||
( (GNUNET_YES == e->removed) &&
- (e->generation_removed < eo->generation_created)))
+ (e->generation_removed < op->generation_created)))
return GNUNET_YES;
GNUNET_assert (GNUNET_NO == e->remote);
- op_register_element (eo, e);
+ op_register_element (op, e);
return GNUNET_YES;
}
@@ -563,45 +557,45 @@
* Create an ibf with the operation's elements
* of the specified size
*
- * @param eo the union operation
+ * @param op the union operation
* @param size size of the ibf to create
*/
static void
-prepare_ibf (struct OperationState *eo, uint16_t size)
+prepare_ibf (struct Operation *op, uint16_t size)
{
- if (NULL == eo->key_to_element)
+ if (NULL == op->state->key_to_element)
{
unsigned int len;
- len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements);
- eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
- GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements,
- init_key_to_element_iterator, eo);
+ len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
+ op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len +
1);
+ GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
+ init_key_to_element_iterator, op);
}
- if (NULL != eo->local_ibf)
- ibf_destroy (eo->local_ibf);
- eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
- GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
- prepare_ibf_iterator,
eo->local_ibf);
+ if (NULL != op->state->local_ibf)
+ ibf_destroy (op->state->local_ibf);
+ op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ prepare_ibf_iterator,
op->state->local_ibf);
}
/**
* Send an ibf of appropriate size.
*
- * @param eo the union operation
+ * @param op the union operation
* @param ibf_order order of the ibf to send, size=2^order
*/
static void
-send_ibf (struct OperationState *eo, uint16_t ibf_order)
+send_ibf (struct Operation *op, uint16_t ibf_order)
{
unsigned int buckets_sent = 0;
struct InvertibleBloomFilter *ibf;
- prepare_ibf (eo, 1<<ibf_order);
+ prepare_ibf (op, 1<<ibf_order);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n",
1<<ibf_order);
- ibf = eo->local_ibf;
+ ibf = op->state->local_ibf;
while (buckets_sent < (1 << ibf_order))
{
@@ -624,20 +618,20 @@
buckets_sent += buckets_in_message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
buckets_in_message, buckets_sent, 1<<ibf_order);
- GNUNET_MQ_send (eo->mq, ev);
+ GNUNET_MQ_send (op->mq, ev);
}
- eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
+ op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
}
/**
* Send a strata estimator to the remote peer.
*
- * @param eo the union operation with the remote peer
+ * @param op the union operation with the remote peer
*/
static void
-send_strata_estimator (struct OperationState *eo)
+send_strata_estimator (struct Operation *op)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MessageHeader *strata_msg;
@@ -645,9 +639,9 @@
ev = GNUNET_MQ_msg_header_extra (strata_msg,
SE_STRATA_COUNT * IBF_BUCKET_SIZE *
SE_IBF_SIZE,
GNUNET_MESSAGE_TYPE_SET_P2P_SE);
- strata_estimator_write (eo->set->state->se, &strata_msg[1]);
- GNUNET_MQ_send (eo->mq, ev);
- eo->phase = PHASE_EXPECT_IBF;
+ strata_estimator_write (op->state->se, &strata_msg[1]);
+ GNUNET_MQ_send (op->mq, ev);
+ op->state->phase = PHASE_EXPECT_IBF;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
}
@@ -682,27 +676,27 @@
static void
handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct OperationState *eo = cls;
+ struct Operation *op = cls;
struct StrataEstimator *remote_se;
int diff;
- if (eo->phase != PHASE_EXPECT_SE)
+ if (op->state->phase != PHASE_EXPECT_SE)
{
- fail_union_operation (eo);
+ fail_union_operation (op);
GNUNET_break (0);
return;
}
remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
SE_IBF_HASH_NUM);
strata_estimator_read (&mh[1], remote_se);
- GNUNET_assert (NULL != eo->se);
- diff = strata_estimator_difference (remote_se, eo->se);
+ GNUNET_assert (NULL != op->state->se);
+ diff = strata_estimator_difference (remote_se, op->state->se);
strata_estimator_destroy (remote_se);
- strata_estimator_destroy (eo->se);
- eo->se = NULL;
+ strata_estimator_destroy (op->state->se);
+ op->state->se = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
diff, 1<<get_order_from_difference (diff));
- send_ibf (eo, get_order_from_difference (diff));
+ send_ibf (op, get_order_from_difference (diff));
}
@@ -721,7 +715,7 @@
{
struct SendElementClosure *sec = cls;
struct IBF_Key ibf_key = sec->ibf_key;
- struct OperationState *eo = sec->eo;
+ struct Operation *op = sec->op;
struct KeyEntry *ke = value;
if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -743,7 +737,7 @@
memcpy (&mh[1], element->data, element->size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
GNUNET_h2s (&ke->element->element_hash));
- GNUNET_MQ_send (eo->mq, ev);
+ GNUNET_MQ_send (op->mq, ev);
ke = ke->next_colliding;
}
return GNUNET_NO;
@@ -753,17 +747,17 @@
* Send all elements that have the specified IBF key
* to the remote peer of the union operation
*
- * @param eo union operation
+ * @param op union operation
* @param ibf_key IBF key of interest
*/
static void
-send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
+send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
{
struct SendElementClosure send_cls;
send_cls.ibf_key = ibf_key;
- send_cls.eo = eo;
- GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t)
ibf_key.key_val,
+ send_cls.op = op;
+ GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
(uint32_t) ibf_key.key_val,
&send_element_iterator,
&send_cls);
}
@@ -772,10 +766,10 @@
* Decode which elements are missing on each side, and
* send the appropriate elemens and requests
*
- * @param eo union operation
+ * @param op union operation
*/
static void
-decode_and_send (struct OperationState *eo)
+decode_and_send (struct Operation *op)
{
struct IBF_Key key;
struct IBF_Key last_key;
@@ -783,14 +777,14 @@
unsigned int num_decoded;
struct InvertibleBloomFilter *diff_ibf;
- GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
+ GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
- prepare_ibf (eo, eo->remote_ibf->size);
- diff_ibf = ibf_dup (eo->local_ibf);
- ibf_subtract (diff_ibf, eo->remote_ibf);
+ prepare_ibf (op, op->state->remote_ibf->size);
+ diff_ibf = ibf_dup (op->state->local_ibf);
+ ibf_subtract (diff_ibf, op->state->remote_ibf);
- ibf_destroy (eo->remote_ibf);
- eo->remote_ibf = NULL;
+ ibf_destroy (op->state->remote_ibf);
+ op->state->remote_ibf = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n",
diff_ibf->size);
@@ -829,7 +823,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"decoding failed, sending larger ibf (size %u)\n",
1<<next_order);
- send_ibf (eo, next_order);
+ send_ibf (op, next_order);
}
else
{
@@ -844,28 +838,26 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending
DONE\n");
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
- GNUNET_MQ_send (eo->mq, ev);
+ GNUNET_MQ_send (op->mq, ev);
break;
}
if (1 == side)
{
- send_elements_for_key (eo, key);
+ send_elements_for_key (op, key);
}
else if (-1 == side)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MessageHeader *msg;
- /* FIXME: before sending the request, check if we may just have the
element */
- /* FIXME: merge multiple requests */
- /* FIXME: remember somewhere that we already requested the element,
- * so that we don't request it again with the next ibf if decoding fails
*/
+ /* It may be nice to merge multiple requests, but with mesh's corking it
is not worth
+ * the effort additional complexity. */
ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
*(struct IBF_Key *) &msg[1] = key;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
- GNUNET_MQ_send (eo->mq, ev);
+ GNUNET_MQ_send (op->mq, ev);
}
else
{
@@ -885,32 +877,32 @@
static void
handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct OperationState *eo = cls;
+ struct Operation *op = cls;
struct IBFMessage *msg = (struct IBFMessage *) mh;
unsigned int buckets_in_message;
- if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
- (eo->phase == PHASE_EXPECT_IBF) )
+ if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
+ (op->state->phase == PHASE_EXPECT_IBF) )
{
- eo->phase = PHASE_EXPECT_IBF_CONT;
- GNUNET_assert (NULL == eo->remote_ibf);
+ op->state->phase = PHASE_EXPECT_IBF_CONT;
+ GNUNET_assert (NULL == op->state->remote_ibf);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n",
1<<msg->order);
- eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
- eo->ibf_buckets_received = 0;
+ op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+ op->state->ibf_buckets_received = 0;
if (0 != ntohs (msg->offset))
{
GNUNET_break (0);
- fail_union_operation (eo);
+ fail_union_operation (op);
return;
}
}
- else if (eo->phase == PHASE_EXPECT_IBF_CONT)
+ else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
{
- if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
- (1<<msg->order != eo->remote_ibf->size) )
+ if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
+ (1<<msg->order != op->state->remote_ibf->size) )
{
GNUNET_break (0);
- fail_union_operation (eo);
+ fail_union_operation (op);
return;
}
}
@@ -920,25 +912,25 @@
if (0 == buckets_in_message)
{
GNUNET_break_op (0);
- fail_union_operation (eo);
+ fail_union_operation (op);
return;
}
if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message *
IBF_BUCKET_SIZE)
{
GNUNET_break (0);
- fail_union_operation (eo);
+ fail_union_operation (op);
return;
}
- ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message,
eo->remote_ibf);
- eo->ibf_buckets_received += buckets_in_message;
+ ibf_read_slice (&msg[1], op->state->ibf_buckets_received,
buckets_in_message, op->state->remote_ibf);
+ op->state->ibf_buckets_received += buckets_in_message;
- if (eo->ibf_buckets_received == eo->remote_ibf->size)
+ if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
- eo->phase = PHASE_EXPECT_ELEMENTS;
- decode_and_send (eo);
+ op->state->phase = PHASE_EXPECT_ELEMENTS;
+ decode_and_send (op);
}
}
@@ -947,18 +939,18 @@
* Send a result message to the client indicating
* that there is a new element.
*
- * @param eo union operation
+ * @param op union operation
* @param element element to send
*/
static void
-send_client_element (struct OperationState *eo,
+send_client_element (struct Operation *op,
struct GNUNET_SET_Element *element)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to
client\n", element->size);
- GNUNET_assert (0 != eo->spec->client_request_id);
+ GNUNET_assert (0 != op->spec->client_request_id);
ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
if (NULL == ev)
{
@@ -967,42 +959,116 @@
return;
}
rm->result_status = htons (GNUNET_SET_STATUS_OK);
- rm->request_id = htonl (eo->spec->client_request_id);
+ rm->request_id = htonl (op->spec->client_request_id);
rm->element_type = element->type;
memcpy (&rm[1], element->data, element->size);
- GNUNET_MQ_send (eo->spec->set->client_mq, ev);
+ GNUNET_MQ_send (op->spec->set->client_mq, ev);
}
/**
- * Send a result message to the client indicating
- * that the operation is over.
- * After the result done message has been sent to the client,
- * destroy the evaluate operation.
+ * Signal to the client that the operation has finished and
+ * destroy the operation.
*
- * @param eo union operation
+ * @param cls operation to destroy
*/
static void
-send_client_done_and_destroy (struct OperationState *eo)
+send_done_and_destroy (void *cls)
{
+ struct Operation *op = cls;
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
-
- GNUNET_assert (GNUNET_NO == eo->client_done_sent);
-
- eo->client_done_sent = GNUNET_YES;
-
ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
- rm->request_id = htonl (eo->spec->client_request_id);
+ rm->request_id = htonl (op->spec->client_request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
rm->element_type = htons (0);
- GNUNET_MQ_send (eo->spec->set->client_mq, ev);
+ GNUNET_MQ_send (op->spec->set->client_mq, ev);
+ _GSS_operation_destroy (op);
+}
- union_operation_destroy (eo);
+
+/**
+ * Send all remaining elements in the full result iterator.
+ *
+ * @param cls operation
+ */
+static void
+send_remaining_elements (void *cls)
+{
+ struct Operation *op = cls;
+ struct KeyEntry *ke;
+ int res;
+
+ res = GNUNET_CONTAINER_multihashmap32_iterator_next
(op->state->full_result_iter, NULL, (const void **) &ke);
+ res = GNUNET_NO;
+ if (GNUNET_NO == res)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because
iterator ran out\n");
+ send_done_and_destroy (op);
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
+
+ while (1)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SET_ResultMessage *rm;
+ struct GNUNET_SET_Element *element;
+ element = &ke->element->element;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client
(full set)\n", element->size);
+ GNUNET_assert (0 != op->spec->client_request_id);
+ ev = GNUNET_MQ_msg_extra (rm, element->size,
GNUNET_MESSAGE_TYPE_SET_RESULT);
+ if (NULL == ev)
+ {
+ GNUNET_MQ_discard (ev);
+ GNUNET_break (0);
+ continue;
+ }
+ rm->result_status = htons (GNUNET_SET_STATUS_OK);
+ rm->request_id = htonl (op->spec->client_request_id);
+ rm->element_type = element->type;
+ memcpy (&rm[1], element->data, element->size);
+ if (ke->next_colliding == NULL)
+ {
+ GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
+ GNUNET_MQ_send (op->spec->set->client_mq, ev);
+ break;
+ }
+ GNUNET_MQ_send (op->spec->set->client_mq, ev);
+ ke = ke->next_colliding;
+ }
}
/**
+ * Send a result message to the client indicating
+ * that the operation is over.
+ * After the result done message has been sent to the client,
+ * destroy the evaluate operation.
+ *
+ * @param op union operation
+ */
+static void
+finish_and_destroy (struct Operation *op)
+{
+ GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
+
+ if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
+ GNUNET_assert (NULL == op->state->full_result_iter);
+ op->state->full_result_iter =
+ GNUNET_CONTAINER_multihashmap32_iterator_create
(op->state->key_to_element);
+ send_remaining_elements (op);
+ return;
+ }
+ send_done_and_destroy (op);
+}
+
+
+/**
* Handle an element message from a remote peer.
*
* @param cls the union operation
@@ -1011,16 +1077,16 @@
static void
handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct OperationState *eo = cls;
+ struct Operation *op = cls;
struct ElementEntry *ee;
uint16_t element_size;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
- if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
- (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
+ if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
+ (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
{
- fail_union_operation (eo);
+ fail_union_operation (op);
GNUNET_break (0);
return;
}
@@ -1032,12 +1098,17 @@
ee->remote = GNUNET_YES;
GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
- /* FIXME: see if the element has already been inserted! */
+ if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n");
+ GNUNET_free (ee);
+ return;
+ }
- op_register_element (eo, ee);
+ op_register_element (op, ee);
/* only send results immediately if the client wants it */
- if (GNUNET_SET_RESULT_ADDED == eo->spec->result_mode)
- send_client_element (eo, &ee->element);
+ if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
+ send_client_element (op, &ee->element);
}
@@ -1050,15 +1121,15 @@
static void
handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct OperationState *eo = cls;
+ struct Operation *op = cls;
struct IBF_Key *ibf_key;
unsigned int num_keys;
/* look up elements and send them */
- if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+ if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
{
GNUNET_break (0);
- fail_union_operation (eo);
+ fail_union_operation (op);
return;
}
@@ -1067,14 +1138,14 @@
if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
{
GNUNET_break (0);
- fail_union_operation (eo);
+ fail_union_operation (op);
return;
}
ibf_key = (struct IBF_Key *) &mh[1];
while (0 != num_keys--)
{
- send_elements_for_key (eo, *ibf_key);
+ send_elements_for_key (op, *ibf_key);
ibf_key++;
}
}
@@ -1089,28 +1160,28 @@
static void
handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct OperationState *eo = cls;
+ struct Operation *op = cls;
struct GNUNET_MQ_Envelope *ev;
- if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+ if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
{
/* we got all requests, but still have to send our elements as response */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after
elements\n");
- eo->phase = PHASE_FINISHED;
+ op->state->phase = PHASE_FINISHED;
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
- GNUNET_MQ_send (eo->mq, ev);
+ GNUNET_MQ_send (op->mq, ev);
return;
}
- if (eo->phase == PHASE_EXPECT_ELEMENTS)
+ if (op->state->phase == PHASE_EXPECT_ELEMENTS)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
- eo->phase = PHASE_FINISHED;
- send_client_done_and_destroy (eo);
+ op->state->phase = PHASE_FINISHED;
+ finish_and_destroy (op);
return;
}
GNUNET_break (0);
- fail_union_operation (eo);
+ fail_union_operation (op);
}
@@ -1118,78 +1189,34 @@
* Evaluate a union operation with
* a remote peer.
*
- * @param spec specification of the operation the evaluate
- * @param tunnel tunnel already connected to the partner peer
- * @param tc tunnel context, passed here so all new incoming
- * messages are directly going to the union operations
- * @return a handle to the operation
+ * @param op operation to evaluate
*/
static void
-union_evaluate (struct OperationSpecification *spec,
- struct GNUNET_MESH_Tunnel *tunnel,
- struct TunnelContext *tc)
+union_evaluate (struct Operation *op)
{
- struct OperationState *eo;
-
- eo = GNUNET_new (struct OperationState);
- tc->vt = _GSS_union_vt ();
- tc->op = eo;
- eo->se = strata_estimator_dup (spec->set->state->se);
- eo->generation_created = spec->set->current_generation++;
- eo->set = spec->set;
- eo->spec = spec;
- eo->tunnel = tunnel;
- eo->tunnel = tunnel;
- eo->mq = GNUNET_MESH_mq_create (tunnel);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "evaluating union operation, (app %s)\n",
- GNUNET_h2s (&eo->spec->app_id));
-
+ op->state = GNUNET_new (struct OperationState);
+ op->state->se = strata_estimator_dup (op->spec->set->state->se);
/* we started the operation, thus we have to send the operation request */
- eo->phase = PHASE_EXPECT_SE;
-
- GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
- eo->set->state->ops_tail,
- eo);
-
- send_operation_request (eo);
+ op->state->phase = PHASE_EXPECT_SE;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation");
+ send_operation_request (op);
}
/**
- * Accept an union operation request from a remote peer
+ * Accept an union operation request from a remote peer.
+ * Only initializes the private operation state.
*
- * @param spec all necessary information about the operation
- * @param tunnel open tunnel to the partner's peer
- * @param tc tunnel context, passed here so all new incoming
- * messages are directly going to the union operations
- * @return operation
+ * @param op operation that will be accepted as a union operation
*/
static void
-union_accept (struct OperationSpecification *spec,
- struct GNUNET_MESH_Tunnel *tunnel,
- struct TunnelContext *tc)
+union_accept (struct Operation *op)
{
- struct OperationState *eo;
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
-
- eo = GNUNET_new (struct OperationState);
- tc->vt = _GSS_union_vt ();
- tc->op = eo;
- eo->set = spec->set;
- eo->generation_created = eo->set->current_generation++;
- eo->spec = spec;
- eo->tunnel = tunnel;
- eo->mq = GNUNET_MESH_mq_create (tunnel);
- eo->se = strata_estimator_dup (eo->set->state->se);
- /* transfer ownership of mq and socket from incoming to eo */
- GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
- eo->set->state->ops_tail,
- eo);
+ op->state = GNUNET_new (struct OperationState);
+ op->state->se = strata_estimator_dup (op->spec->set->state->se);
/* kick off the operation */
- send_strata_estimator (eo);
+ send_strata_estimator (op);
}
@@ -1240,17 +1267,13 @@
/**
- * Destroy a set that supports the union operation
+ * Destroy a set that supports the union operation.
*
* @param set_state the set to destroy
*/
static void
union_set_destroy (struct SetState *set_state)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
- /* important to destroy operations before the rest of the set */
- while (NULL != set_state->ops_head)
- union_operation_destroy (set_state->ops_head);
if (NULL != set_state->se)
{
strata_estimator_destroy (set_state->se);
@@ -1263,13 +1286,13 @@
/**
* Dispatch messages for a union operation.
*
- * @param eo the state of the union evaluate operation
+ * @param op the state of the union evaluate operation
* @param mh the received message
* @return GNUNET_SYSERR if the tunnel should be disconnected,
* GNUNET_OK otherwise
*/
int
-union_handle_p2p_message (struct OperationState *eo,
+union_handle_p2p_message (struct Operation *op,
const struct GNUNET_MessageHeader *mh)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
@@ -1277,19 +1300,19 @@
switch (ntohs (mh->type))
{
case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
- handle_p2p_ibf (eo, mh);
+ handle_p2p_ibf (op, mh);
break;
case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
- handle_p2p_strata_estimator (eo, mh);
+ handle_p2p_strata_estimator (op, mh);
break;
case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
- handle_p2p_elements (eo, mh);
+ handle_p2p_elements (op, mh);
break;
case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
- handle_p2p_element_requests (eo, mh);
+ handle_p2p_element_requests (op, mh);
break;
case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
- handle_p2p_done (eo, mh);
+ handle_p2p_done (op, mh);
break;
default:
/* something wrong with mesh's message handlers? */
@@ -1300,19 +1323,10 @@
static void
-union_peer_disconnect (struct OperationState *op)
+union_peer_disconnect (struct Operation *op)
{
- /* Are we already disconnected? */
- if (NULL == op->tunnel)
- return;
- op->tunnel = NULL;
- if (NULL != op->mq)
+ if (PHASE_FINISHED != op->state->phase)
{
- GNUNET_MQ_destroy (op->mq);
- op->mq = NULL;
- }
- if (PHASE_FINISHED != op->phase)
- {
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *msg;
@@ -1322,37 +1336,15 @@
msg->element_type = htons (0);
GNUNET_MQ_send (op->spec->set->client_mq, ev);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected
prematurely\n");
- union_operation_destroy (op);
+ _GSS_operation_destroy (op);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
- if (GNUNET_NO == op->client_done_sent)
- send_client_done_and_destroy (op);
+ if (GNUNET_NO == op->state->client_done_sent)
+ finish_and_destroy (op);
}
-static void
-union_op_cancel (struct SetState *set_state, uint32_t op_id)
-{
- struct OperationState *op_state;
- int found = GNUNET_NO;
- for (op_state = set_state->ops_head; NULL != op_state; op_state =
op_state->next)
- {
- if (op_state->spec->client_request_id == op_id)
- {
- found = GNUNET_YES;
- break;
- }
- }
- if (GNUNET_NO == found)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "canceling non-existing
operation\n");
- return;
- }
- union_operation_destroy (op_state);
-}
-
-
const struct SetVT *
_GSS_union_vt ()
{
Modified: gnunet/src/set/ibf.h
===================================================================
--- gnunet/src/set/ibf.h 2013-11-04 17:04:28 UTC (rev 30510)
+++ gnunet/src/set/ibf.h 2013-11-05 00:08:13 UTC (rev 30511)
@@ -39,27 +39,40 @@
#endif
+/**
+ * Keys that can be inserted into and removed from an IBF.
+ */
struct IBF_Key
{
uint64_t key_val;
};
+
+/**
+ * Hash of an IBF key.
+ */
struct IBF_KeyHash
{
uint32_t key_hash_val;
};
+
+/**
+ * Type of the count field of IBF buckets.
+ */
struct IBF_Count
{
int8_t count_val;
};
+
/**
* Size of one ibf bucket in bytes
*/
#define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) +
\
sizeof (struct IBF_KeyHash))
+
/**
* Invertible bloom filter (IBF).
*
@@ -212,6 +225,7 @@
struct InvertibleBloomFilter *
ibf_dup (const struct InvertibleBloomFilter *ibf);
+
/**
* Destroy all resources associated with the invertible bloom filter.
* No more ibf_*-functions may be called on ibf after calling destroy.
Modified: gnunet/src/set/strata_estimator.h
===================================================================
--- gnunet/src/set/strata_estimator.h 2013-11-04 17:04:28 UTC (rev 30510)
+++ gnunet/src/set/strata_estimator.h 2013-11-05 00:08:13 UTC (rev 30511)
@@ -40,6 +40,9 @@
#endif
+/**
+ * A handle to a strata estimator.
+ */
struct StrataEstimator
{
struct InvertibleBloomFilter **strata;
@@ -48,31 +51,77 @@
};
+/**
+ * Write the given strata estimator to the buffer.
+ *
+ * @param se strata estimator to serialize
+ * @param buf buffer to write to, must be of appropriate size
+ */
void
strata_estimator_write (const struct StrataEstimator *se, void *buf);
+/**
+ * Read strata from the buffer into the given strata
+ * estimator. The strata estimator must already be allocated.
+ *
+ * @param buf buffer to read from
+ * @param se strata estimator to write to
+ */
void
strata_estimator_read (const void *buf, struct StrataEstimator *se);
+/**
+ * Create a new strata estimator with the given parameters.
+ *
+ * @param strata_count number of stratas, that is, number of ibfs in the
estimator
+ * @param ibf_size size of each ibf stratum
+ * @param ibf_hashnum hashnum parameter of each ibf
+ * @return a freshly allocated, empty strata estimator
+ */
struct StrataEstimator *
strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t
ibf_hashnum);
+/**
+ * Get an estimation of the symmetric difference of the elements
+ * contained in both strata estimators.
+ *
+ * @param se1 first strata estimator
+ * @param se2 second strata estimator
+ * @return abs(|se1| - |se2|)
+ */
unsigned int
strata_estimator_difference (const struct StrataEstimator *se1,
const struct StrataEstimator *se2);
+/**
+ * Add a key to the strata estimator.
+ *
+ * @param se strata estimator to add the key to
+ * @param key key to add
+ */
void
strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key);
+/**
+ * Remove a key from the strata estimator.
+ *
+ * @param se strata estimator to remove the key from
+ * @param key key to remove
+ */
void
strata_estimator_remove (struct StrataEstimator *se, struct IBF_Key key);
+/**
+ * Destroy a strata estimator, free all of its resources.
+ *
+ * @param se strata estimator to destroy.
+ */
void
strata_estimator_destroy (struct StrataEstimator *se);
Modified: gnunet/src/set/test_set.conf
===================================================================
--- gnunet/src/set/test_set.conf 2013-11-04 17:04:28 UTC (rev 30510)
+++ gnunet/src/set/test_set.conf 2013-11-05 00:08:13 UTC (rev 30511)
@@ -8,6 +8,7 @@
HOSTNAME = localhost
BINARY = gnunet-service-set
#PREFIX = valgrind
+#PREFIX = valgrind -v --leak-check=full
#PREFIX = gdbserver :1234
ACCEPT_FROM = 127.0.0.1;
ACCEPT_FROM6 = ::1;
Added: gnunet/src/set/test_set_union_result_full.c
===================================================================
--- gnunet/src/set/test_set_union_result_full.c (rev 0)
+++ gnunet/src/set/test_set_union_result_full.c 2013-11-05 00:08:13 UTC (rev
30511)
@@ -0,0 +1,255 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file set/test_set_union_result_full.c
+ * @brief testcase for full result mode of the union set operation
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_set_service.h"
+
+
+static struct GNUNET_PeerIdentity local_id;
+
+static struct GNUNET_HashCode app_id;
+static struct GNUNET_SET_Handle *set1;
+static struct GNUNET_SET_Handle *set2;
+static struct GNUNET_SET_ListenHandle *listen_handle;
+const static struct GNUNET_CONFIGURATION_Handle *config;
+
+static int iter_count;
+
+
+static void
+result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_OK:
+ printf ("set 1: got element\n");
+ break;
+ case GNUNET_SET_STATUS_FAILURE:
+ printf ("set 1: failure\n");
+ break;
+ case GNUNET_SET_STATUS_DONE:
+ printf ("set 1: done\n");
+ GNUNET_SET_destroy (set1);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+static void
+result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_OK:
+ printf ("set 2: got element\n");
+ break;
+ case GNUNET_SET_STATUS_FAILURE:
+ printf ("set 2: failure\n");
+ break;
+ case GNUNET_SET_STATUS_DONE:
+ printf ("set 2: done\n");
+ GNUNET_SET_destroy (set2);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+static void
+listen_cb (void *cls,
+ const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_MessageHeader *context_msg,
+ struct GNUNET_SET_Request *request)
+{
+ struct GNUNET_SET_OperationHandle *oh;
+
+ GNUNET_assert (NULL != context_msg);
+
+ GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_TEST);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
+ GNUNET_SET_listen_cancel (listen_handle);
+
+ oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_FULL, result_cb_set2,
NULL);
+ GNUNET_SET_commit (oh, set2);
+}
+
+
+/**
+ * Start the set operation.
+ *
+ * @param cls closure, unused
+ */
+static void
+start (void *cls)
+{
+ struct GNUNET_SET_OperationHandle *oh;
+ struct GNUNET_MessageHeader context_msg;
+
+ context_msg.size = htons (sizeof context_msg);
+ context_msg.type = htons (GNUNET_MESSAGE_TYPE_TEST);
+
+ listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
+ &app_id, listen_cb, NULL);
+ oh = GNUNET_SET_prepare (&local_id, &app_id, &context_msg, 42,
+ GNUNET_SET_RESULT_FULL,
+ result_cb_set1, NULL);
+ GNUNET_SET_commit (oh, set1);
+}
+
+
+/**
+ * Initialize the second set, continue
+ *
+ * @param cls closure, unused
+ */
+static void
+init_set2 (void *cls)
+{
+ struct GNUNET_SET_Element element;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
+
+ element.type = 0;
+
+ element.data = "hello";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set2, &element, NULL, NULL);
+ element.data = "quux";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set2, &element, NULL, NULL);
+ element.data = "baz";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set2, &element, start, NULL);
+}
+
+
+/**
+ * Initialize the first set, continue.
+ */
+static void
+init_set1 (void)
+{
+ struct GNUNET_SET_Element element;
+
+ element.type = 0;
+
+ element.data = "hello";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set1, &element, NULL, NULL);
+ element.data = "bar";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set1, &element, init_set2, NULL);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
+}
+
+
+static int
+iter_cb (void *cls,
+ const struct GNUNET_SET_Element *element)
+{
+ if (NULL == element)
+ {
+ GNUNET_assert (iter_count == 3);
+ GNUNET_SET_destroy (cls);
+ return GNUNET_YES;
+ }
+ printf ("iter: got element\n");
+ iter_count++;
+ return GNUNET_YES;
+}
+
+
+static void
+test_iter ()
+{
+ struct GNUNET_SET_Element element;
+ struct GNUNET_SET_Handle *iter_set;
+
+ iter_set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+
+ element.type = 0;
+
+ element.data = "hello";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
+ element.data = "bar";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
+ element.data = "quux";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
+
+ GNUNET_SET_iterate (iter_set, iter_cb, iter_set);
+}
+
+
+/**
+ * Signature of the 'main' function for a (single-peer) testcase that
+ * is run using 'GNUNET_TESTING_peer_run'.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+{
+ config = cfg;
+ GNUNET_CRYPTO_get_peer_identity (cfg, &local_id);
+ printf ("my id (from CRYPTO): %s\n", GNUNET_i2s (&local_id));
+ GNUNET_TESTING_peer_get_identity (peer, &local_id);
+ printf ("my id (from TESTING): %s\n", GNUNET_i2s (&local_id));
+
+ test_iter ();
+
+ set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
+
+ /* test the real set reconciliation */
+ init_set1 ();
+}
+
+int
+main (int argc, char **argv)
+{
+ int ret;
+
+ ret = GNUNET_TESTING_peer_run ("test_set_api",
+ "test_set.conf",
+ &run, NULL);
+ return ret;
+}
+
Modified: gnunet/src/util/container_multihashmap32.c
===================================================================
--- gnunet/src/util/container_multihashmap32.c 2013-11-04 17:04:28 UTC (rev
30510)
+++ gnunet/src/util/container_multihashmap32.c 2013-11-05 00:08:13 UTC (rev
30511)
@@ -73,10 +73,45 @@
* Length of the "map" array.
*/
unsigned int map_length;
+
+ /**
+ * Counts the destructive modifications (grow, remove)
+ * to the map, so that iterators can check if they are still valid.
+ */
+ unsigned int modification_counter;
};
/**
+ * Cursor into a multihashmap.
+ * Allows to enumerate elements asynchronously.
+ */
+struct GNUNET_CONTAINER_MultiHashMap32Iterator
+{
+ /**
+ * Position in the bucket 'idx'
+ */
+ struct MapEntry *me;
+
+ /**
+ * Current bucket index.
+ */
+ unsigned int idx;
+
+ /**
+ * Modification counter as observed on the map when the iterator
+ * was created.
+ */
+ unsigned int modification_counter;
+
+ /**
+ * Map that we are iterating over.
+ */
+ const struct GNUNET_CONTAINER_MultiHashMap32 *map;
+};
+
+
+/**
* Create a multi hash map.
*
* @param len initial size (map will grow as needed)
@@ -239,6 +274,8 @@
struct MapEntry *p;
unsigned int i;
+ map->modification_counter++;
+
i = idx_of (map, key);
p = NULL;
e = map->map[i];
@@ -280,6 +317,8 @@
unsigned int i;
int ret;
+ map->modification_counter++;
+
ret = 0;
i = idx_of (map, key);
p = NULL;
@@ -383,6 +422,8 @@
unsigned int idx;
unsigned int i;
+ map->modification_counter++;
+
old_map = map->map;
old_len = map->map_length;
new_len = old_len * 2;
@@ -492,4 +533,84 @@
}
+/**
+ * Create an iterator for a multihashmap.
+ * The iterator can be used to retrieve all the elements in the multihashmap
+ * one by one, without having to handle all elements at once (in contrast to
+ * GNUNET_CONTAINER_multihashmap_iterate()). Note that the iterator can not be
+ * used anymore if elements have been removed from 'map' after the creation of
+ * the iterator, or 'map' has been destroyed. Adding elements to 'map' may
+ * result in skipped or repeated elements.
+ *
+ * @param map the map to create an iterator for
+ * @return an iterator over the given multihashmap 'map'
+ */
+struct GNUNET_CONTAINER_MultiHashMap32Iterator *
+GNUNET_CONTAINER_multihashmap32_iterator_create (const struct
GNUNET_CONTAINER_MultiHashMap32 *map)
+{
+ struct GNUNET_CONTAINER_MultiHashMap32Iterator *iter;
+
+ iter = GNUNET_new (struct GNUNET_CONTAINER_MultiHashMap32Iterator);
+ iter->map = map;
+ iter->modification_counter = map->modification_counter;
+ iter->me = map->map[0];
+ return iter;
+}
+
+
+/**
+ * Retrieve the next element from the hash map at the iterator's position.
+ * If there are no elements left, GNUNET_NO is returned, and 'key' and 'value'
+ * are not modified.
+ * This operation is only allowed if no elements have been removed from the
+ * multihashmap since the creation of 'iter', and the map has not been
destroyed.
+ * Adding elements may result in repeating or skipping elements.
+ *
+ * @param iter the iterator to get the next element from
+ * @param key pointer to store the key in, can be NULL
+ * @param value pointer to store the value in, can be NULL
+ * @return #GNUNET_YES we returned an element,
+ * #GNUNET_NO if we are out of elements
+ */
+int
+GNUNET_CONTAINER_multihashmap32_iterator_next (struct
GNUNET_CONTAINER_MultiHashMap32Iterator *iter,
+ uint32_t *key,
+ const void **value)
+{
+ /* make sure the map has not been modified */
+ GNUNET_assert (iter->modification_counter ==
iter->map->modification_counter);
+
+ /* look for the next entry, skipping empty buckets */
+ while (1)
+ {
+ if (iter->idx >= iter->map->map_length)
+ return GNUNET_NO;
+ if (NULL != iter->me)
+ {
+ if (NULL != key)
+ *key = iter->me->key;
+ if (NULL != value)
+ *value = iter->me->value;
+ iter->me = iter->me->next;
+ return GNUNET_YES;
+ }
+ iter->idx += 1;
+ if (iter->idx < iter->map->map_length)
+ iter->me = iter->map->map[iter->idx];
+ }
+}
+
+
+/**
+ * Destroy a multihashmap iterator.
+ *
+ * @param iter the iterator to destroy
+ */
+void
+GNUNET_CONTAINER_multihashmap32_iterator_destroy (struct
GNUNET_CONTAINER_MultiHashMapIterator *iter)
+{
+ GNUNET_free (iter);
+}
+
+
/* end of container_multihashmap.c */
Modified: gnunet/src/util/mq.c
===================================================================
--- gnunet/src/util/mq.c 2013-11-04 17:04:28 UTC (rev 30510)
+++ gnunet/src/util/mq.c 2013-11-05 00:08:13 UTC (rev 30511)
@@ -603,6 +603,7 @@
GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs
(msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_NO,
&connection_client_transmit_queued,
mq);
+ GNUNET_assert (NULL != state->th);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30511 - in gnunet/src: include set util,
gnunet <=