[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r27860 - in gnunet/src: consensus include set util
From: |
gnunet |
Subject: |
[GNUnet-SVN] r27860 - in gnunet/src: consensus include set util |
Date: |
Wed, 10 Jul 2013 03:31:13 +0200 |
Author: dold
Date: 2013-07-10 03:31:13 +0200 (Wed, 10 Jul 2013)
New Revision: 27860
Removed:
gnunet/src/consensus/gnunet-consensus-start-peers.c
Modified:
gnunet/src/consensus/Makefile.am
gnunet/src/consensus/consensus_api.c
gnunet/src/consensus/gnunet-service-consensus.c
gnunet/src/include/gnunet_mq_lib.h
gnunet/src/include/gnunet_set_service.h
gnunet/src/set/gnunet-service-set.c
gnunet/src/set/gnunet-service-set.h
gnunet/src/set/gnunet-service-set_union.c
gnunet/src/set/gnunet-set-profiler.c
gnunet/src/set/set.h
gnunet/src/set/set_api.c
gnunet/src/util/mq.c
Log:
- set service working
- set profiler
Modified: gnunet/src/consensus/Makefile.am
===================================================================
--- gnunet/src/consensus/Makefile.am 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/consensus/Makefile.am 2013-07-10 01:31:13 UTC (rev 27860)
@@ -16,8 +16,7 @@
endif
bin_PROGRAMS = \
- gnunet-consensus \
- gnunet-consensus-start-peers
+ gnunet-consensus
libexec_PROGRAMS = \
gnunet-service-consensus
@@ -41,17 +40,6 @@
gnunet_consensus_DEPENDENCIES = \
libgnunetconsensus.la
-gnunet_consensus_start_peers_SOURCES = \
- gnunet-consensus-start-peers.c
-gnunet_consensus_start_peers_LDADD = \
- $(top_builddir)/src/util/libgnunetutil.la \
- $(top_builddir)/src/testbed/libgnunettestbed.la \
- $(top_builddir)/src/consensus/libgnunetconsensus.la \
- $(GN_LIBINTL)
-gnunet_consensus_start_peers_DEPENDENCIES = \
- libgnunetconsensus.la
-
-
gnunet_service_consensus_SOURCES = \
gnunet-service-consensus.c
gnunet_service_consensus_LDADD = \
Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c 2013-07-10 01:26:04 UTC (rev
27859)
+++ gnunet/src/consensus/consensus_api.c 2013-07-10 01:31:13 UTC (rev
27860)
@@ -205,7 +205,7 @@
consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
- mq_handlers,
consensus);
+ mq_handlers, NULL,
consensus);
GNUNET_assert (consensus->client != NULL);
Deleted: gnunet/src/consensus/gnunet-consensus-start-peers.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-07-10 01:26:04 UTC
(rev 27859)
+++ gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-07-10 01:31:13 UTC
(rev 27860)
@@ -1,186 +0,0 @@
-
-/*
- This file is part of GNUnet
- (C) 2012 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
- */
-
-/**
- * @file consensus/gnunet-consensus-start-peers.c
- * @brief Starts peers with testebed on localhost,
- * prints their configuration files and waits for ^C.
- * @author Florian Dold
- */
-#include "platform.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_testbed_service.h"
-
-
-static char *config_template_file;
-static unsigned int num_peers_requested = 2;
-static struct GNUNET_TESTBED_Peer **peers;
-
-
-/**
- * Callback to be called when the requested peer information is available
- *
- * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
- * @param op the operation this callback corresponds to
- * @param pinfo the result; will be NULL if the operation has failed
- * @param emsg error message if the operation has failed; will be NULL if the
- * operation is successfull
- */
-static void
-peer_info_cb (void *cb_cls,
- struct GNUNET_TESTBED_Operation
- *op,
- const struct
- GNUNET_TESTBED_PeerInformation
- *pinfo,
- const char *emsg)
-{
- GNUNET_assert (NULL == emsg);
- if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
- {
- struct GNUNET_CRYPTO_HashAsciiEncoded enc;
- GNUNET_CRYPTO_hash_to_enc (&pinfo->result.id->hashPubKey, &enc);
- printf("peer %td identity:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) -
&peers[0]);
- printf("%s\n", (char *)&enc);
- }
- else if (pinfo->pit == GNUNET_TESTBED_PIT_CONFIGURATION)
- {
- char *tmpfilename;
- if (NULL == (tmpfilename = GNUNET_DISK_mktemp ("gnunet-consensus")))
- {
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- if (GNUNET_SYSERR ==
- GNUNET_CONFIGURATION_write (pinfo->result.cfg,
- tmpfilename))
- {
- GNUNET_break (0);
- return;
- }
- printf("peer %td config file:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls)
- &peers[0]);
- printf("%s\n", tmpfilename);
- }
- else
- {
- GNUNET_assert (0);
- }
-}
-
-
-
-/**
- * Signature of the event handler function called by the
- * respective event controller.
- *
- * @param cls closure
- * @param event information about the event
- */
-static void
-controller_cb(void *cls,
- const struct GNUNET_TESTBED_EventInformation *event)
-{
- GNUNET_assert (0);
-}
-
-
-
-
-/**
- * Signature of a main function for a testcase.
- *
- * @param cls closure
- * @param num_peers number of peers in 'peers'
- * @param started_peers handle to peers being run in the testbed. NULL upon
- * timeout (see GNUNET_TESTBED_test_run()).
- * @param links_succeeded the number of overlay link connection attempts that
- * succeeded
- * @param links_failed the number of overlay link connection attempts that
- * failed
- */
-static void
-test_master (void *cls,
- unsigned int num_peers,
- struct GNUNET_TESTBED_Peer **started_peers,
- unsigned int links_succeeded,
- unsigned int links_failed)
-{
- int i;
-
- printf("started %d peers\n", num_peers);
- peers = started_peers;
-
- for (i = 0; i < num_peers; i++)
- {
- GNUNET_TESTBED_peer_get_information (peers[i],
- GNUNET_TESTBED_PIT_IDENTITY,
- peer_info_cb,
- &peers[i]);
- GNUNET_TESTBED_peer_get_information (peers[i],
- GNUNET_TESTBED_PIT_CONFIGURATION,
- peer_info_cb,
- &peers[i]);
- }
-}
-
-
-static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *config)
-{
- if (NULL == config_template_file)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no template file specified\n");
- return;
- }
-
- (void) GNUNET_TESTBED_test_run ("gnunet-consensus-start-peers",
- config_template_file,
- num_peers_requested,
- 0,
- controller_cb,
- NULL,
- test_master,
- NULL);
-}
-
-
-int
-main (int argc, char **argv)
-{
- static const struct GNUNET_GETOPT_CommandLineOption options[] = {
- { 't', "config-template", "TEMPLATE",
- gettext_noop ("start peers with the given template configuration"),
- GNUNET_YES, &GNUNET_GETOPT_set_string, &config_template_file },
- { 'n', "num-peers", "NUM",
- gettext_noop ("number of peers to start"),
- GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers_requested },
- GNUNET_GETOPT_OPTION_END
- };
-
- /* run without scheduler, as test_run already does this */
- GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-start-peers",
- "help",
- options, &run, NULL, GNUNET_YES);
- return 0;
-}
-
Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c 2013-07-10 01:26:04 UTC
(rev 27859)
+++ gnunet/src/consensus/gnunet-service-consensus.c 2013-07-10 01:31:13 UTC
(rev 27860)
@@ -266,8 +266,6 @@
}
-
-
/**
* Destroy a session, free all resources associated with it.
*
Modified: gnunet/src/include/gnunet_mq_lib.h
===================================================================
--- gnunet/src/include/gnunet_mq_lib.h 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/include/gnunet_mq_lib.h 2013-07-10 01:31:13 UTC (rev 27860)
@@ -328,12 +328,14 @@
*
* @param connection the client connection
* @param handlers handlers for receiving messages
+ * @param error_handler error handler
* @param cls closure for the handlers
* @return the message queue
*/
struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection
*connection,
const struct GNUNET_MQ_MessageHandler
*handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
void *cls);
Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h 2013-07-10 01:26:04 UTC (rev
27859)
+++ gnunet/src/include/gnunet_set_service.h 2013-07-10 01:31:13 UTC (rev
27860)
@@ -74,9 +74,16 @@
enum GNUNET_SET_OperationType
{
/**
+ * A purely local set that does not support any
+ * operation.
+ */
+ GNUNET_SET_OPERATION_NONE,
+
+ /**
* Set intersection, only return elements that are in both sets.
*/
GNUNET_SET_OPERATION_INTERSECTION,
+
/**
* Set union, return all elements that are in at least one of the sets.
*/
@@ -116,6 +123,7 @@
GNUNET_SET_STATUS_DONE
};
+
/**
* The way results are given to the client.
*/
@@ -137,6 +145,7 @@
GNUNET_SET_RESULT_REMOVED
};
+
/**
* Element stored in a set.
*/
@@ -182,18 +191,19 @@
/**
* Called when another peer wants to do a set operation with the
- * local peer.
+ * local peer. If a listen error occurs, the 'request' is NULL.
*
* @param cls closure
* @param other_peer the other peer
* @param context_msg message with application specific information from
* the other peer
* @param request request from the other peer, use GNUNET_SET_accept
+ * Will be NULL if the listener failed.
* to accept it, otherwise the request will be refused
- * Note that we don't use a return value here, as it is also
- * necessary to specify the set we want to do the operation with,
- * whith sometimes can be derived from the context message.
- * Also necessary to specify the timeout.
+ * Note that we can't just return value from the listen callback,
+ * as it is also necessary to specify the set we want to do the
+ * operation with, whith sometimes can be derived from the context
+ * message. It's necessary to specify the timeout.
*/
typedef void
(*GNUNET_SET_ListenCallback) (void *cls,
Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/gnunet-service-set.c 2013-07-10 01:31:13 UTC (rev 27860)
@@ -46,11 +46,14 @@
/**
* Detail information about the operation.
+ * NULL as long as we did not receive the operation
+ * request from the remote peer.
*/
struct OperationSpecification *spec;
/**
- * The identity of the requesting peer.
+ * The identity of the requesting peer. Needs to
+ * be stored here as the op spec might not have been created yet.
*/
struct GNUNET_PeerIdentity peer;
@@ -83,17 +86,55 @@
/**
+ * A listener is inhabited by a client, and
+ * waits for evaluation requests from remote peers.
+ */
+struct Listener
+{
+ /**
+ * Listeners are held in a doubly linked list.
+ */
+ struct Listener *next;
+
+ /**
+ * Listeners are held in a doubly linked list.
+ */
+ struct Listener *prev;
+
+ /**
+ * Client that owns the listener.
+ * Only one client may own a listener.
+ */
+ struct GNUNET_SERVER_Client *client;
+
+ /**
+ * Message queue for the client
+ */
+ struct GNUNET_MQ_Handle *client_mq;
+
+ /**
+ * The type of the operation.
+ */
+ enum GNUNET_SET_OperationType operation;
+
+ /**
+ * Application ID for the operation, used to distinguish
+ * multiple operations of the same type with the same peer.
+ */
+ struct GNUNET_HashCode app_id;
+};
+
+
+/**
* Configuration of our local peer.
- * (Not declared 'static' as also needed in gnunet-service-set_union.c)
*/
-const struct GNUNET_CONFIGURATION_Handle *configuration;
+static const struct GNUNET_CONFIGURATION_Handle *configuration;
/**
* Handle to the mesh service, used
* to listen for and connect to remote peers.
- * (Not declared 'static' as also needed in gnunet-service-set_union.c)
*/
-struct GNUNET_MESH_Handle *mesh;
+static struct GNUNET_MESH_Handle *mesh;
/**
* Sets are held in a doubly linked list.
@@ -204,8 +245,9 @@
* The client's destroy callback will destroy the listener again. */
if (NULL != listener->client)
{
- GNUNET_SERVER_client_disconnect (listener->client);
+ struct GNUNET_SERVER_Client *client = listener->client;
listener->client = NULL;
+ GNUNET_SERVER_client_disconnect (client);
return;
}
if (NULL != listener->client_mq)
@@ -230,22 +272,19 @@
* The client's destroy callback will destroy the set again. */
if (NULL != set->client)
{
- GNUNET_SERVER_client_disconnect (set->client);
+ struct GNUNET_SERVER_Client *client = set->client;
set->client = NULL;
+ GNUNET_SERVER_client_disconnect (client);
return;
}
- switch (set->operation)
+ if (NULL != set->client_mq)
{
- case GNUNET_SET_OPERATION_INTERSECTION:
- GNUNET_assert (0);
- break;
- case GNUNET_SET_OPERATION_UNION:
- _GSS_union_set_destroy (set);
- break;
- default:
- GNUNET_assert (0);
- break;
+ GNUNET_MQ_destroy (set->client_mq);
+ set->client_mq = NULL;
}
+ GNUNET_assert (NULL != set->state);
+ set->vt->destroy_set (set->state);
+ set->state = NULL;
GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set);
GNUNET_free (set);
}
@@ -264,6 +303,8 @@
struct Set *set;
struct Listener *listener;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n");
+
set = set_get (client);
if (NULL != set)
{
@@ -287,6 +328,7 @@
static void
incoming_destroy (struct Incoming *incoming)
{
+ GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
if (NULL != incoming->tunnel)
{
struct GNUNET_MESH_Tunnel *t = incoming->tunnel;
@@ -294,7 +336,6 @@
GNUNET_MESH_tunnel_destroy (t);
return;
}
- GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
GNUNET_free (incoming);
}
@@ -338,7 +379,7 @@
mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
incoming->spec->context_msg);
GNUNET_assert (NULL != mqm);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id
%u\n", incoming->suggest_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id
%u\n", incoming->suggest_id);
cmsg->accept_id = htonl (incoming->suggest_id);
cmsg->peer_id = incoming->spec->peer;
GNUNET_MQ_send (listener->client_mq, mqm);
@@ -350,33 +391,28 @@
* Handle a request for a set operation from
* another peer.
*
- * @param cls the incoming socket
- * @param tunnel the tunnel that sent the message
- * @param tunnel_ctx the tunnel context
- * @param mh the message
+ * @param op the operation state
+ * @param mh the received message
+ * @return GNUNET_OK if the tunnel should be kept alive,
+ * GNUNET_SYSERR to destroy the tunnel
*/
static int
-handle_p2p_operation_request (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
- const struct GNUNET_MessageHeader *mh)
+handle_incoming_msg (struct OperationState *op,
+ const struct GNUNET_MessageHeader *mh)
{
- struct TunnelContext *tc = *tunnel_ctx;
- struct Incoming *incoming;
+ struct Incoming *incoming = (struct Incoming *) op;
const struct OperationRequestMessage *msg = (const struct
OperationRequestMessage *) mh;
struct Listener *listener;
struct OperationSpecification *spec;
- if (CONTEXT_INCOMING != tc->type)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got op request\n");
+
+ if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
{
- /* unexpected request */
GNUNET_break_op (0);
- /* kill the tunnel, cleaner will be called */
return GNUNET_SYSERR;
}
- incoming = tc->data.incoming;
-
if (NULL != incoming->spec)
{
/* double operation request */
@@ -385,8 +421,9 @@
}
spec = GNUNET_new (struct OperationSpecification);
- spec->context_msg =
- GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg));
+ spec->context_msg = GNUNET_MQ_extract_nested_mh (msg);
+ if (NULL != spec->context_msg)
+ spec->context_msg = GNUNET_copy_message (spec->context_msg);
spec->operation = ntohl (msg->operation);
spec->app_id = msg->app_id;
spec->salt = ntohl (msg->salt);
@@ -401,12 +438,12 @@
return GNUNET_SYSERR;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u,
app %s)\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u,
app %s)\n",
ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id);
if (NULL == listener)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"no listener matches incoming request, waiting with
timeout\n");
return GNUNET_OK;
}
@@ -430,7 +467,7 @@
struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
struct Set *set;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation
%u)\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client created new set (operation
%u)\n",
ntohs (msg->operation));
if (NULL != set_get (client))
@@ -441,14 +478,15 @@
}
set = NULL;
+ set = GNUNET_new (struct Set);
switch (ntohs (msg->operation))
{
case GNUNET_SET_OPERATION_INTERSECTION:
- //set = _GSS_intersection_set_create ();
+ // FIXME
break;
case GNUNET_SET_OPERATION_UNION:
- set = _GSS_union_set_create ();
+ set->vt = _GSS_union_vt ();
break;
default:
GNUNET_break (0);
@@ -456,8 +494,7 @@
return;
}
- GNUNET_assert (NULL != set);
-
+ set->state = set->vt->create ();
set->client = client;
set->client_mq = GNUNET_MQ_queue_for_server_client (client);
GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
@@ -493,7 +530,7 @@
listener->app_id = msg->app_id;
listener->operation = ntohs (msg->operation);
GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new listener created (op %u, app %s)\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app
%s)\n",
listener->operation, GNUNET_h2s (&listener->app_id));
for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
{
@@ -511,46 +548,6 @@
/**
- * Called when a client wants to remove an element
- * from the set it inhabits.
- *
- * @param cls unused
- * @param client client that sent the message
- * @param m message sent by the client
- */
-static void
-handle_client_remove (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *m)
-{
- struct Set *set;
-
- set = set_get (client);
- if (NULL == set)
- {
- GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (client);
- return;
- }
- switch (set->operation)
- {
- case GNUNET_SET_OPERATION_UNION:
- _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set);
- break;
- case GNUNET_SET_OPERATION_INTERSECTION:
- //_GSS_intersection_remove ((struct GNUNET_SET_ElementMessage *) m, set);
- break;
- default:
- GNUNET_assert (0);
- break;
- }
-
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-
-/**
* Called when the client wants to reject an operation
* request from another peer.
*
@@ -574,13 +571,12 @@
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n");
GNUNET_MESH_tunnel_destroy (incoming->tunnel);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
-
/**
* Called when a client wants to add an element to a
* set it inhabits.
@@ -590,11 +586,13 @@
* @param m message sent by the client
*/
static void
-handle_client_add (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *m)
+handle_client_add_remove (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
{
struct Set *set;
+ const struct GNUNET_SET_ElementMessage *msg;
+ struct GNUNET_SET_Element el;
set = set_get (client);
if (NULL == set)
@@ -603,19 +601,14 @@
GNUNET_SERVER_client_disconnect (client);
return;
}
- switch (set->operation)
- {
- case GNUNET_SET_OPERATION_UNION:
- _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set);
- break;
- case GNUNET_SET_OPERATION_INTERSECTION:
- //_GSS_intersection_add ((struct GNUNET_SET_ElementMessage *) m, set);
- break;
- default:
- GNUNET_assert (0);
- break;
- }
-
+ msg = (const struct GNUNET_SET_ElementMessage *) m;
+ el.size = ntohs (m->size) - sizeof *msg;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client ins/rem element of size %u\n",
el.size);
+ el.data = &msg[1];
+ if (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type))
+ set->vt->remove (set->state, &el);
+ else
+ set->vt->add (set->state, &el);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -653,27 +646,15 @@
spec->app_id = msg->app_id;
spec->salt = ntohl (msg->salt);
spec->peer = msg->target_peer;
+ spec->set = set;
+ spec->client_request_id = ntohl (msg->request_id);
tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer,
GNUNET_APPLICATION_TYPE_SET,
GNUNET_YES,
GNUNET_YES);
- switch (set->operation)
- {
- case GNUNET_SET_OPERATION_INTERSECTION:
- tc->type = CONTEXT_OPERATION_INTERSECTION;
- //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m,
set);
- break;
- case GNUNET_SET_OPERATION_UNION:
- tc->type = CONTEXT_OPERATION_UNION;
- tc->data.union_op =
- _GSS_union_evaluate (spec, tunnel);
- break;
- default:
- GNUNET_assert (0);
- break;
- }
+ set->vt->evaluate (spec, tunnel, tc);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -705,6 +686,35 @@
* @param mh the message
*/
static void
+handle_client_cancel (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *mh)
+{
+ const struct GNUNET_SET_CancelMessage *msg =
+ (const struct GNUNET_SET_CancelMessage *) mh;
+ struct Set *set;
+
+ set = set_get (client);
+ if (NULL == set)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (client);
+ return;
+ }
+ /* FIXME: maybe cancel should return success/error code? */
+ set->vt->cancel (set->state, ntohl (msg->request_id));
+}
+
+
+/**
+ * Handle a request from the client to accept
+ * a set operation that came from a remote peer.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param mh the message
+ */
+static void
handle_client_accept (void *cls,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *mh)
@@ -712,12 +722,10 @@
struct Set *set;
struct Incoming *incoming;
struct GNUNET_SET_AcceptRejectMessage *msg = (struct
GNUNET_SET_AcceptRejectMessage *) mh;
- struct OperationSpecification *spec;
- struct TunnelContext *tc;
incoming = get_incoming (ntohl (msg->accept_reject_id));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl
(msg->accept_reject_id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl
(msg->accept_reject_id));
if (NULL == incoming)
{
@@ -736,24 +744,9 @@
return;
}
- spec = GNUNET_new (struct OperationSpecification);
- tc = incoming->tc;
-
- switch (set->operation)
- {
- case GNUNET_SET_OPERATION_INTERSECTION:
- tc->type = CONTEXT_OPERATION_INTERSECTION;
- // _GSS_intersection_accept (msg, set, incoming);
- break;
- case GNUNET_SET_OPERATION_UNION:
- tc->type = CONTEXT_OPERATION_UNION;
- tc->data.union_op = _GSS_union_accept (spec, incoming->tunnel);
- break;
- default:
- GNUNET_assert (0);
- break;
- }
-
+ incoming->spec->set = set;
+ incoming->spec->client_request_id = ntohl (msg->request_id);
+ set->vt->accept (incoming->spec, incoming->tunnel, incoming->tc);
/* tunnel ownership goes to operation */
incoming->tunnel = NULL;
incoming_destroy (incoming);
@@ -771,12 +764,6 @@
shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- if (NULL != mesh)
- {
- GNUNET_MESH_disconnect (mesh);
- mesh = NULL;
- }
-
while (NULL != incoming_head)
incoming_destroy (incoming_head);
@@ -786,11 +773,19 @@
while (NULL != sets_head)
set_destroy (sets_head);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
+
+ /* it's important to destroy mesh at the end, as tunnels
+ * must be destroyed first! */
+ if (NULL != mesh)
+ {
+ GNUNET_MESH_disconnect (mesh);
+ mesh = NULL;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
}
-
/**
* Signature of the main function of a task.
*
@@ -803,11 +798,22 @@
{
struct Incoming *incoming = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "remote peer timed out\n");
incoming_destroy (incoming);
}
+static void
+handle_incoming_disconnect (struct OperationState *op_state)
+{
+ struct Incoming *incoming = (struct Incoming *) op_state;
+ if (NULL == incoming->tunnel)
+ return;
+
+ incoming_destroy (incoming);
+}
+
+
/**
* Method called whenever another peer has added us to a tunnel
* the other peer initiated.
@@ -830,23 +836,25 @@
uint32_t port)
{
struct Incoming *incoming;
- struct TunnelContext *tc;
+ static const struct SetVT incoming_vt = {
+ .msg_handler = handle_incoming_msg,
+ .peer_disconnect = handle_incoming_disconnect
+ };
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new incoming tunnel\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n");
GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
- tc = GNUNET_new (struct TunnelContext);
incoming = GNUNET_new (struct Incoming);
incoming->peer = *initiator;
incoming->tunnel = tunnel;
- incoming->tc = tc;
- tc->data.incoming = incoming;
- tc->type = CONTEXT_INCOMING;
+ incoming->tc = GNUNET_new (struct TunnelContext);;
+ incoming->tc->vt = &incoming_vt;
+ incoming->tc->op = (struct OperationState *) incoming;
incoming->timeout_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
incoming_timeout_cb, incoming);
GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
- return tc;
+ return incoming->tc;
}
@@ -867,27 +875,46 @@
{
struct TunnelContext *ctx = tunnel_ctx;
- switch (ctx->type)
- {
- case CONTEXT_INCOMING:
- incoming_destroy (ctx->data.incoming);
- break;
- case CONTEXT_OPERATION_UNION:
- _GSS_union_operation_destroy (ctx->data.union_op);
- break;
- case CONTEXT_OPERATION_INTERSECTION:
- GNUNET_assert (0);
- /* FIXME: cfuchs */
- break;
- default:
- GNUNET_assert (0);
- }
-
+ ctx->vt->peer_disconnect (ctx->op);
+ /* mesh will never call us with the context again! */
GNUNET_free (tunnel_ctx);
}
/**
+ * Functions with this signature are called whenever a message is
+ * received.
+ *
+ * Each time the function must call GNUNET_MESH_receive_done on the tunnel
+ * in order to receive the next message. This doesn't need to be immediate:
+ * can be delayed if some processing is done on the message.
+ *
+ * @param cls Closure (set from GNUNET_MESH_connect).
+ * @param tunnel Connection to the other end.
+ * @param tunnel_ctx Place to store local state associated with the tunnel.
+ * @param message The actual message.
+ *
+ * @return GNUNET_OK to keep the tunnel open,
+ * GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+dispatch_p2p_message (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct TunnelContext *tc = *tunnel_ctx;
+ int ret;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message\n");
+ ret = tc->vt->msg_handler (tc->op, message);
+ GNUNET_MESH_receive_done (tunnel);
+
+ return ret;
+}
+
+
+/**
* Function called by the service's run
* method to run service-specific setup code.
*
@@ -900,31 +927,29 @@
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
- {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
+ {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT,
+ sizeof (struct GNUNET_SET_AcceptRejectMessage)},
{handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
- {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
- {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
+ {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
+ {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE,
+ sizeof (struct GNUNET_SET_CreateMessage)},
{handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
- {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
- {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0},
- {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
+ {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN,
+ sizeof (struct GNUNET_SET_ListenMessage)},
+ {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT,
+ sizeof (struct GNUNET_SET_AcceptRejectMessage)},
+ {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
+ {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE,
+ sizeof (struct GNUNET_SET_CancelMessage)},
{NULL, NULL, 0, 0}
};
static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
- {handle_p2p_operation_request,
- GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
- /* messages for the union operation */
- {_GSS_union_handle_p2p_message,
- GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
- {_GSS_union_handle_p2p_message,
- GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
- {_GSS_union_handle_p2p_message,
- GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
- {_GSS_union_handle_p2p_message,
- GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
- {_GSS_union_handle_p2p_message,
- GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
- /* FIXME: messages for intersection operation */
+ {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
+ {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
+ {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
+ {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
+ {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
+ {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
{NULL, 0, 0}
};
static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
@@ -943,7 +968,7 @@
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "started\n");
}
Modified: gnunet/src/set/gnunet-service-set.h
===================================================================
--- gnunet/src/set/gnunet-service-set.h 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/gnunet-service-set.h 2013-07-10 01:31:13 UTC (rev 27860)
@@ -38,69 +38,27 @@
#include "set.h"
-/* FIXME: cfuchs */
-struct IntersectionState;
-
-
-/* FIXME: cfuchs */
-struct IntersectionOperation;
-
-
/**
- * Extra state required for set union.
+ * Implementation-specific set state.
+ * Used as opaque pointer, and specified further
+ * in the respective implementation.
*/
-struct UnionState;
+struct SetState;
-/**
- * State of a union operation being evaluated.
- */
-struct UnionEvaluateOperation;
-
-
/**
- * A set that supports a specific operation
- * with other peers.
+ * Implementation-specific set operation.
+ * Used as opaque pointer, and specified further
+ * in the respective implementation.
*/
-struct Set
-{
- /**
- * Client that owns the set.
- * Only one client may own a set.
- */
- struct GNUNET_SERVER_Client *client;
+struct OperationState;
- /**
- * Message queue for the client
- */
- struct GNUNET_MQ_Handle *client_mq;
- /**
- * Type of operation supported for this set
- */
- uint32_t operation; // use enum from API
+/* forward declarations */
+struct Set;
+struct TunnelContext;
- /**
- * Sets are held in a doubly linked list.
- */
- struct Set *next;
- /**
- * Sets are held in a doubly linked list.
- */
- struct Set *prev;
-
- /**
- * Appropriate state for each type of
- * operation.
- */
- union {
- struct IntersectionState *i;
- struct UnionState *u;
- } state;
-};
-
-
/**
* Detail information about an operation.
*/
@@ -146,96 +104,169 @@
/**
- * A listener is inhabited by a client, and
- * waits for evaluation requests from remote peers.
+ * Signature of functions that create the implementation-specific
+ * state for a set supporting a specific operation.
+ *
+ * @return a set state specific to the supported operation
*/
-struct Listener
+typedef struct SetState *(*CreateImpl) (void);
+
+
+/**
+ * Signature of functions that implement the add/remove functionality
+ * for a set supporting a specific operation.
+ *
+ * @param set implementation-specific set state
+ * @param msg element message from the client
+ */
+typedef void (*AddRemoveImpl) (struct SetState *state, const struct
GNUNET_SET_Element *element);
+
+
+/**
+ * Signature of functions that handle disconnection
+ * of the remote peer.
+ *
+ * @param op the set operation, contains implementation-specific data
+ */
+typedef void (*PeerDisconnectImpl) (struct OperationState *op);
+
+
+/**
+ * Signature of functions that implement the destruction of the
+ * implementation-specific set state.
+ *
+ * @param state the set state, contains implementation-specific data
+ */
+typedef void (*DestroySetImpl) (struct SetState *state);
+
+
+/**
+ * Signature of functions that implement the creation of set operations
+ * (currently evaluate and accept).
+ *
+ * @param spec specification of the set operation to be created
+ * @param tunnel the tunnel with the other peer
+ * @param tc tunnel context
+ */
+typedef void (*OpCreateImpl) (struct OperationSpecification *spec,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ struct TunnelContext *tc);
+
+
+/**
+ * Signature of functions that implement the message handling for
+ * the different set operations.
+ *
+ * @param op operation state
+ * @param msg received message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to
+ * destroy the operation and the tunnel
+ */
+typedef int (*MsgHandlerImpl) (struct OperationState *op,
+ const struct GNUNET_MessageHeader *msg);
+
+typedef void (*CancelImpl) (struct SetState *set,
+ uint32_t request_id);
+
+
+/**
+ * Dispatch table for a specific set operation.
+ * Every set operation has to implement the callback
+ * in this struct.
+ */
+struct SetVT
{
/**
- * Listeners are held in a doubly linked list.
+ * Callback for the set creation.
*/
- struct Listener *next;
+ CreateImpl create;
/**
- * Listeners are held in a doubly linked list.
+ * Callback for element insertion
*/
- struct Listener *prev;
+ AddRemoveImpl add;
/**
- * Client that owns the listener.
- * Only one client may own a listener.
+ * Callback for element removal.
*/
- struct GNUNET_SERVER_Client *client;
+ AddRemoveImpl remove;
/**
- * Message queue for the client
+ * Callback for accepting a set operation request
*/
- struct GNUNET_MQ_Handle *client_mq;
+ OpCreateImpl accept;
/**
- * The type of the operation.
+ * Callback for starting evaluation with a remote peer.
*/
- enum GNUNET_SET_OperationType operation;
+ OpCreateImpl evaluate;
/**
- * Application ID for the operation, used to distinguish
- * multiple operations of the same type with the same peer.
+ * Callback for destruction of the set state.
*/
- struct GNUNET_HashCode app_id;
-};
+ DestroySetImpl destroy_set;
+ /**
+ * Callback for handling operation-specific messages.
+ */
+ MsgHandlerImpl msg_handler;
-/**
- * Peer that has connected to us, but is not yet evaluating a set operation.
- * Once the peer has sent a request, and the client has
- * accepted or rejected it, this information will be deleted.
- */
-struct Incoming;
+ /**
+ * Callback for handling the remote peer's
+ * disconnect.
+ */
+ PeerDisconnectImpl peer_disconnect;
+ /**
+ * Callback for canceling an operation by
+ * its ID.
+ */
+ CancelImpl cancel;
+};
+
/**
- * Different types a tunnel can be.
+ * A set that supports a specific operation
+ * with other peers.
*/
-enum TunnelContextType
+struct Set
{
/**
- * Tunnel is waiting for a set request from the tunnel,
- * or for the ack/nack of the client for a received request.
+ * Client that owns the set.
+ * Only one client may own a set.
*/
- CONTEXT_INCOMING,
+ struct GNUNET_SERVER_Client *client;
/**
- * The tunnel performs a union operation.
+ * Message queue for the client
*/
- CONTEXT_OPERATION_UNION,
+ struct GNUNET_MQ_Handle *client_mq;
/**
- * The tunnel performs an intersection operation.
+ * Type of operation supported for this set
*/
- CONTEXT_OPERATION_INTERSECTION,
-};
+ enum GNUNET_SET_OperationType operation;
+ /**
+ * Virtual table for this set.
+ * Determined by the operation type of this set.
+ */
+ const struct SetVT *vt;
-/**
- * State associated with the tunnel, dependent on
- * tunnel type.
- */
-union TunnelContextData
-{
/**
- * Valid for tag 'CONTEXT_INCOMING'
+ * Sets are held in a doubly linked list.
*/
- struct Incoming *incoming;
+ struct Set *next;
/**
- * Valid for tag 'CONTEXT_OPERATION_UNION'
+ * Sets are held in a doubly linked list.
*/
- struct UnionEvaluateOperation *union_op;
+ struct Set *prev;
/**
- * Valid for tag 'CONTEXT_OPERATION_INTERSECTION'
+ * Implementation-specific state.
*/
- struct IntersectionEvaluateOperation *intersection_op;
+ struct SetState *state;
};
@@ -246,119 +277,24 @@
struct TunnelContext
{
/**
- * Type of the tunnel.
+ * V-Table for the operation belonging
+ * to the tunnel contest.
*/
- enum TunnelContextType type;
+ const struct SetVT *vt;
/**
- * State associated with the tunnel, dependent on
- * tunnel type.
+ * Implementation-specific operation state.
*/
- union TunnelContextData data;
+ struct OperationState *op;
};
-
/**
- * Configuration of the local peer.
+ * Get the table with implementing functions for
+ * set union.
*/
-extern const struct GNUNET_CONFIGURATION_Handle *configuration;
+const struct SetVT *
+_GSS_union_vt (void);
-/**
- * Handle to the mesh service.
- */
-extern struct GNUNET_MESH_Handle *mesh;
-
-/**
- * Create a new set supporting the union operation
- *
- * @return the newly created set
- */
-struct Set *
-_GSS_union_set_create (void);
-
-
-/**
- * Evaluate a union operation with
- * a remote peer.
- *
- * @param spec specification of the operation the evaluate
- * @param tunnel tunnel already connected to the partner peer
- * @return a handle to the operation
- */
-struct UnionEvaluateOperation *
-_GSS_union_evaluate (struct OperationSpecification *spec,
- struct GNUNET_MESH_Tunnel *tunnel);
-
-
-/**
- * Add the element from the given element message to the set.
- *
- * @param m message with the element
- * @param set set to add the element to
- */
-void
-_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set);
-
-
-/**
- * Remove the element given in the element message from the set.
- * Only marks the element as removed, so that older set operations can still
exchange it.
- *
- * @param m message with the element
- * @param set set to remove the element from
- */
-void
-_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set);
-
-
-/**
- * Destroy a set that supports the union operation
- *
- * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
- */
-void
-_GSS_union_set_destroy (struct Set *set);
-
-
-/**
- * Accept an union operation request from a remote peer
- *
- * @param spec all necessary information about the operation
- * @param tunnel open tunnel to the partner's peer
- * @return operation
- */
-struct UnionEvaluateOperation *
-_GSS_union_accept (struct OperationSpecification *spec,
- struct GNUNET_MESH_Tunnel *tunnel);
-
-
-/**
- * Destroy a union operation, and free all resources
- * associated with it.
- *
- * @param eo the union operation to destroy
- */
-void
-_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo);
-
-
-/**
- * Dispatch messages for a union operation.
- *
- * @param cls closure
- * @param tunnel mesh tunnel
- * @param tunnel_ctx tunnel context
- * @param mh message to process
- * @return GNUNET_SYSERR if the tunnel should be disconnected,
- * GNUNET_OK otherwise
- */
-int
-_GSS_union_handle_p2p_message (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
- const struct GNUNET_MessageHeader *mh);
-
-
#endif
Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c 2013-07-10 01:26:04 UTC (rev
27859)
+++ gnunet/src/set/gnunet-service-set_union.c 2013-07-10 01:31:13 UTC (rev
27860)
@@ -61,7 +61,7 @@
/**
- * Current phase we are in for a union operation
+ * Current phase we are in for a union operation.
*/
enum UnionOperationPhase
{
@@ -100,7 +100,7 @@
* State of an evaluate operation
* with another peer.
*/
-struct UnionEvaluateOperation
+struct OperationState
{
/**
* Tunnel to the remote peer.
@@ -154,23 +154,29 @@
* was created.
*/
unsigned int generation_created;
+
+ /**
+ * Set state of the set that this operation
+ * belongs to.
+ */
+ struct SetState *set_state;
/**
* Evaluate operations are held in
* a linked list.
*/
- struct UnionEvaluateOperation *next;
+ struct OperationState *next;
/**
- * Evaluate operations are held in
- * a linked list.
- */
- struct UnionEvaluateOperation *prev;
+ * Evaluate operations are held in
+ * a linked list.
+ */
+ struct OperationState *prev;
};
/**
- * Information about the element in a set.
+ * Information about an element element in the set.
* All elements are stored in a hash-table
* from their hash-code to their 'struct Element',
* so that the remove and add operations are reasonably
@@ -218,7 +224,8 @@
/**
- * Entries in the key-to-element map of the union set.
+ * The key entry is used to associate an ibf key with
+ * an element.
*/
struct KeyEntry
{
@@ -239,6 +246,7 @@
struct KeyEntry *next_colliding;
};
+
/**
* Used as a closure for sending elements
* with a specific IBF key.
@@ -255,14 +263,14 @@
* Operation for which the elements
* should be sent.
*/
- struct UnionEvaluateOperation *eo;
+ struct OperationState *eo;
};
/**
* Extra state required for efficient set union.
*/
-struct UnionState
+struct SetState
{
/**
* The strata estimator is only generated once for
@@ -281,13 +289,13 @@
* Evaluate operations are held in
* a linked list.
*/
- struct UnionEvaluateOperation *ops_head;
+ struct OperationState *ops_head;
/**
* Evaluate operations are held in
* a linked list.
*/
- struct UnionEvaluateOperation *ops_tail;
+ struct OperationState *ops_tail;
/**
* Current generation, that is, number of
@@ -321,23 +329,6 @@
/**
- * Destroy the elements belonging to a union set.
- *
- * @param us union state that contains the elements
- */
-static void
-destroy_elements (struct UnionState *us)
-{
- if (NULL == us->elements)
- return;
- GNUNET_CONTAINER_multihashmap_iterate (us->elements,
destroy_elements_iterator, NULL);
- GNUNET_CONTAINER_multihashmap_destroy (us->elements);
- us->elements = NULL;
-}
-
-
-
-/**
* Iterator over hash map entries.
*
* @param cls closure
@@ -358,6 +349,11 @@
{
struct KeyEntry *k_tmp = k;
k = k->next_colliding;
+ if (GNUNET_YES == k_tmp->element->remote)
+ {
+ GNUNET_free (k_tmp->element);
+ k_tmp->element = NULL;
+ }
GNUNET_free (k_tmp);
}
return GNUNET_YES;
@@ -370,20 +366,24 @@
*
* @param eo the union operation to destroy
*/
-void
-_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
+static void
+union_operation_destroy (struct OperationState *eo)
{
- struct UnionState *st = eo->spec->set->state.u;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
-
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
+ GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
+ eo->set_state->ops_tail,
+ eo);
+ if (NULL != eo->mq)
+ {
+ GNUNET_MQ_destroy (eo->mq);
+ eo->mq = NULL;
+ }
if (NULL != eo->tunnel)
{
struct GNUNET_MESH_Tunnel *t = eo->tunnel;
eo->tunnel = NULL;
GNUNET_MESH_tunnel_destroy (t);
}
-
if (NULL != eo->remote_ibf)
{
ibf_destroy (eo->remote_ibf);
@@ -405,16 +405,20 @@
GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
eo->key_to_element = NULL;
}
-
- GNUNET_CONTAINER_DLL_remove (st->ops_head,
- st->ops_tail,
- eo);
+ if (NULL != eo->spec)
+ {
+ if (NULL != eo->spec->context_msg)
+ {
+ GNUNET_free (eo->spec->context_msg);
+ eo->spec->context_msg = NULL;
+ }
+ GNUNET_free (eo->spec);
+ eo->spec = NULL;
+ }
GNUNET_free (eo);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
-
-
/* FIXME: do a garbage collection of the set generations */
}
@@ -426,7 +430,7 @@
* @param eo the union operation to fail
*/
static void
-fail_union_operation (struct UnionEvaluateOperation *eo)
+fail_union_operation (struct OperationState *eo)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *msg;
@@ -434,8 +438,9 @@
ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
msg->request_id = htonl (eo->spec->client_request_id);
+ msg->element_type = htons (0);
GNUNET_MQ_send (eo->spec->set->client_mq, ev);
- _GSS_union_operation_destroy (eo);
+ union_operation_destroy (eo);
}
@@ -467,13 +472,13 @@
* @param eo operation with the other peer
*/
static void
-send_operation_request (struct UnionEvaluateOperation *eo)
+send_operation_request (struct OperationState *eo)
{
struct GNUNET_MQ_Envelope *ev;
struct OperationRequestMessage *msg;
ev = GNUNET_MQ_msg_nested_mh (msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- eo->spec->context_msg);
+ eo->spec->context_msg);
if (NULL == ev)
{
@@ -484,6 +489,7 @@
}
msg->operation = htons (GNUNET_SET_OPERATION_UNION);
msg->app_id = eo->spec->app_id;
+ msg->salt = htonl (eo->spec->salt);
GNUNET_MQ_send (eo->mq, ev);
if (NULL != eo->spec->context_msg)
@@ -492,7 +498,7 @@
eo->spec->context_msg = NULL;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n");
}
@@ -532,13 +538,13 @@
/**
* Insert an element into the union operation's
- * key-to-element mapping
+ * key-to-element mapping. Takes ownership of 'ee'.
*
* @param eo the union operation
* @param ee the element entry
*/
static void
-insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
+insert_element (struct OperationState *eo, struct ElementEntry *ee)
{
int ret;
struct IBF_Key ibf_key;
@@ -595,7 +601,7 @@
const struct GNUNET_HashCode *key,
void *value)
{
- struct UnionEvaluateOperation *eo = cls;
+ struct OperationState *eo = cls;
struct ElementEntry *e = value;
/* make sure that the element belongs to the set at the time
@@ -605,6 +611,8 @@
(e->generation_removed < eo->generation_created)))
return GNUNET_YES;
+ e->remote = GNUNET_NO;
+
insert_element (eo, e);
return GNUNET_YES;
}
@@ -618,15 +626,15 @@
* @param size size of the ibf to create
*/
static void
-prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
+prepare_ibf (struct OperationState *eo, uint16_t size)
{
if (NULL == eo->key_to_element)
{
unsigned int len;
- len = GNUNET_CONTAINER_multihashmap_size
(eo->spec->set->state.u->elements);
+ len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
- GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements,
- init_key_to_element_iterator, eo);
+ GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
+ init_key_to_element_iterator, eo);
}
if (NULL != eo->local_ibf)
ibf_destroy (eo->local_ibf);
@@ -643,14 +651,14 @@
* @param ibf_order order of the ibf to send, size=2^order
*/
static void
-send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
+send_ibf (struct OperationState *eo, uint16_t ibf_order)
{
unsigned int buckets_sent = 0;
struct InvertibleBloomFilter *ibf;
prepare_ibf (eo, 1<<ibf_order);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n",
1<<ibf_order);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n",
1<<ibf_order);
ibf = eo->local_ibf;
@@ -667,11 +675,14 @@
ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
+ msg->reserved = 0;
msg->order = ibf_order;
msg->offset = htons (buckets_sent);
ibf_write_slice (ibf, buckets_sent,
buckets_in_message, &msg[1]);
buckets_sent += buckets_in_message;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
+ buckets_in_message, buckets_sent, 1<<ibf_order);
GNUNET_MQ_send (eo->mq, ev);
}
@@ -685,18 +696,18 @@
* @param eo the union operation with the remote peer
*/
static void
-send_strata_estimator (struct UnionEvaluateOperation *eo)
+send_strata_estimator (struct OperationState *eo)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MessageHeader *strata_msg;
- struct UnionState *st = eo->spec->set->state.u;
ev = GNUNET_MQ_msg_header_extra (strata_msg,
- SE_STRATA_COUNT * IBF_BUCKET_SIZE *
SE_IBF_SIZE,
- GNUNET_MESSAGE_TYPE_SET_P2P_SE);
- strata_estimator_write (st->se, &strata_msg[1]);
+ SE_STRATA_COUNT * IBF_BUCKET_SIZE *
SE_IBF_SIZE,
+ GNUNET_MESSAGE_TYPE_SET_P2P_SE);
+ strata_estimator_write (eo->set_state->se, &strata_msg[1]);
GNUNET_MQ_send (eo->mq, ev);
eo->phase = PHASE_EXPECT_IBF;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
}
@@ -730,7 +741,7 @@
static void
handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct UnionEvaluateOperation *eo = cls;
+ struct OperationState *eo = cls;
struct StrataEstimator *remote_se;
int diff;
@@ -746,7 +757,7 @@
strata_estimator_read (&mh[1], remote_se);
GNUNET_assert (NULL != eo->se);
diff = strata_estimator_difference (remote_se, eo->se);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, diff=%d\n", diff);
strata_estimator_destroy (remote_se);
strata_estimator_destroy (eo->se);
eo->se = NULL;
@@ -769,7 +780,7 @@
{
struct SendElementClosure *sec = cls;
struct IBF_Key ibf_key = sec->ibf_key;
- struct UnionEvaluateOperation *eo = sec->eo;
+ struct OperationState *eo = sec->eo;
struct KeyEntry *ke = value;
if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -789,7 +800,7 @@
continue;
}
memcpy (&mh[1], element->data, element->size);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to client\n");
GNUNET_MQ_send (eo->mq, ev);
ke = ke->next_colliding;
}
@@ -804,7 +815,7 @@
* @param ibf_key IBF key of interest
*/
static void
-send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key
ibf_key)
+send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
{
struct SendElementClosure send_cls;
@@ -822,7 +833,7 @@
* @param eo union operation
*/
static void
-decode_and_send (struct UnionEvaluateOperation *eo)
+decode_and_send (struct OperationState *eo)
{
struct IBF_Key key;
int side;
@@ -833,6 +844,9 @@
prepare_ibf (eo, eo->remote_ibf->size);
diff_ibf = ibf_dup (eo->local_ibf);
ibf_subtract (diff_ibf, eo->remote_ibf);
+
+ ibf_destroy (eo->remote_ibf);
+ eo->remote_ibf = NULL;
while (1)
{
@@ -864,7 +878,7 @@
{
struct GNUNET_MQ_Envelope *ev;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending
DONE\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending
DONE\n");
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
GNUNET_MQ_send (eo->mq, ev);
break;
@@ -899,7 +913,7 @@
static void
handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct UnionEvaluateOperation *eo = cls;
+ struct OperationState *eo = cls;
struct IBFMessage *msg = (struct IBFMessage *) mh;
unsigned int buckets_in_message;
@@ -908,8 +922,9 @@
{
eo->phase = PHASE_EXPECT_IBF_CONT;
GNUNET_assert (NULL == eo->remote_ibf);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n",
1<<msg->order);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n",
1<<msg->order);
eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+ eo->ibf_buckets_received = 0;
if (0 != ntohs (msg->offset))
{
GNUNET_break (0);
@@ -929,6 +944,13 @@
buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) /
IBF_BUCKET_SIZE;
+ if (0 == buckets_in_message)
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (eo);
+ return;
+ }
+
if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message *
IBF_BUCKET_SIZE)
{
GNUNET_break (0);
@@ -942,7 +964,7 @@
if (eo->ibf_buckets_received == eo->remote_ibf->size)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
eo->phase = PHASE_EXPECT_ELEMENTS;
decode_and_send (eo);
}
@@ -957,12 +979,13 @@
* @param element element to send
*/
static void
-send_client_element (struct UnionEvaluateOperation *eo,
+send_client_element (struct OperationState *eo,
struct GNUNET_SET_Element *element)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n",
element->size);
GNUNET_assert (0 != eo->spec->client_request_id);
ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
if (NULL == ev)
@@ -973,6 +996,7 @@
}
rm->result_status = htons (GNUNET_SET_STATUS_OK);
rm->request_id = htonl (eo->spec->client_request_id);
+ rm->element_type = element->type;
memcpy (&rm[1], element->data, element->size);
GNUNET_MQ_send (eo->spec->set->client_mq, ev);
}
@@ -987,7 +1011,7 @@
* @param eo union operation
*/
static void
-send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
+send_client_done_and_destroy (struct OperationState *eo)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
@@ -995,6 +1019,7 @@
ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
rm->request_id = htonl (eo->spec->client_request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
+ rm->element_type = htons (0);
GNUNET_MQ_send (eo->spec->set->client_mq, ev);
}
@@ -1009,11 +1034,11 @@
static void
handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct UnionEvaluateOperation *eo = cls;
+ struct OperationState *eo = cls;
struct ElementEntry *ee;
uint16_t element_size;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
(eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
@@ -1025,13 +1050,12 @@
element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
ee = GNUNET_malloc (sizeof *eo + element_size);
memcpy (&ee[1], &mh[1], element_size);
+ ee->element.size = element_size;
ee->element.data = &ee[1];
ee->remote = GNUNET_YES;
insert_element (eo, ee);
send_client_element (eo, &ee->element);
-
- GNUNET_free (ee);
}
@@ -1044,7 +1068,7 @@
static void
handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct UnionEvaluateOperation *eo = cls;
+ struct OperationState *eo = cls;
struct IBF_Key *ibf_key;
unsigned int num_keys;
@@ -1082,7 +1106,7 @@
static void
peer_done_sent_cb (void *cls)
{
- struct UnionEvaluateOperation *eo = cls;
+ struct OperationState *eo = cls;
send_client_done_and_destroy (eo);
}
@@ -1097,14 +1121,14 @@
static void
handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct UnionEvaluateOperation *eo = cls;
+ struct OperationState *eo = cls;
if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
{
/* we got all requests, but still have to send our elements as response */
struct GNUNET_MQ_Envelope *ev;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after
elements\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after
elements\n");
eo->phase = PHASE_FINISHED;
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
@@ -1113,7 +1137,7 @@
}
if (eo->phase == PHASE_EXPECT_ELEMENTS)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
eo->phase = PHASE_FINISHED;
send_client_done_and_destroy (eo);
return;
@@ -1129,34 +1153,38 @@
*
* @param spec specification of the operation the evaluate
* @param tunnel tunnel already connected to the partner peer
+ * @param tc tunnel context, passed here so all new incoming
+ * messages are directly going to the union operations
* @return a handle to the operation
*/
-struct UnionEvaluateOperation *
-_GSS_union_evaluate (struct OperationSpecification *spec,
- struct GNUNET_MESH_Tunnel *tunnel)
+static void
+union_evaluate (struct OperationSpecification *spec,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ struct TunnelContext *tc)
{
- struct UnionEvaluateOperation *eo;
- struct UnionState *st = spec->set->state.u;
+ struct OperationState *eo;
- eo = GNUNET_new (struct UnionEvaluateOperation);
- eo->se = strata_estimator_dup (spec->set->state.u->se);
+ eo = GNUNET_new (struct OperationState);
+ tc->vt = _GSS_union_vt ();
+ tc->op = eo;
+ eo->se = strata_estimator_dup (spec->set->state->se);
+ eo->set_state = spec->set->state;
eo->spec = spec;
eo->tunnel = tunnel;
+ eo->mq = GNUNET_MESH_mq_create (tunnel);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"evaluating union operation, (app %s)\n",
GNUNET_h2s (&eo->spec->app_id));
/* we started the operation, thus we have to send the operation request */
eo->phase = PHASE_EXPECT_SE;
- GNUNET_CONTAINER_DLL_insert (st->ops_head,
- st->ops_tail,
+ GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
+ eo->set_state->ops_tail,
eo);
send_operation_request (eo);
-
- return eo;
}
@@ -1165,30 +1193,34 @@
*
* @param spec all necessary information about the operation
* @param tunnel open tunnel to the partner's peer
+ * @param tc tunnel context, passed here so all new incoming
+ * messages are directly going to the union operations
* @return operation
*/
-struct UnionEvaluateOperation *
-_GSS_union_accept (struct OperationSpecification *spec,
- struct GNUNET_MESH_Tunnel *tunnel)
+static void
+union_accept (struct OperationSpecification *spec,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ struct TunnelContext *tc)
{
- struct UnionEvaluateOperation *eo;
- struct UnionState *st = spec->set->state.u;
+ struct OperationState *eo;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
- eo = GNUNET_new (struct UnionEvaluateOperation);
- eo->generation_created = st->current_generation++;
+ eo = GNUNET_new (struct OperationState);
+ tc->vt = _GSS_union_vt ();
+ tc->op = eo;
+ eo->set_state = spec->set->state;
+ eo->generation_created = eo->set_state->current_generation++;
eo->spec = spec;
eo->tunnel = tunnel;
- eo->se = strata_estimator_dup (st->se);
+ eo->mq = GNUNET_MESH_mq_create (tunnel);
+ eo->se = strata_estimator_dup (eo->set_state->se);
/* transfer ownership of mq and socket from incoming to eo */
- GNUNET_CONTAINER_DLL_insert (st->ops_head,
- st->ops_tail,
+ GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
+ eo->set_state->ops_tail,
eo);
/* kick off the operation */
send_strata_estimator (eo);
-
- return eo;
}
@@ -1197,111 +1229,101 @@
*
* @return the newly created set
*/
-struct Set *
-_GSS_union_set_create (void)
+static struct SetState *
+union_set_create (void)
{
- struct Set *set;
+ struct SetState *set_state;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
- set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
- set->state.u = (struct UnionState *) &set[1];
- set->operation = GNUNET_SET_OPERATION_UNION;
+ set_state = GNUNET_new (struct SetState);
/* keys of the hash map are stored in the element entrys, thus we do not
* want the hash map to copy them */
- set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1,
GNUNET_YES);
- set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
+ set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ set_state->se = strata_estimator_create (SE_STRATA_COUNT,
SE_IBF_SIZE, SE_IBF_HASH_NUM);
- return set;
+ return set_state;
}
/**
* Add the element from the given element message to the set.
*
- * @param m message with the element
- * @param set set to add the element to
+ * @param set_state state of the set want to add to
+ * @param element the element to add to the set
*/
-void
-_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+union_add (struct SetState *set_state, const struct GNUNET_SET_Element
*element)
{
struct ElementEntry *ee;
struct ElementEntry *ee_dup;
- uint16_t element_size;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n",
element->size);
- GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
- element_size = ntohs (m->header.size) - sizeof *m;
- ee = GNUNET_malloc (element_size + sizeof *ee);
- ee->element.size = element_size;
- memcpy (&ee[1], &m[1], element_size);
+ ee = GNUNET_malloc (element->size + sizeof *ee);
+ ee->element.size = element->size;
+ memcpy (&ee[1], element->data, element->size);
ee->element.data = &ee[1];
- ee->generation_added = set->state.u->current_generation;
- GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
- ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements,
&ee->element_hash);
+ ee->generation_added = set_state->current_generation;
+ ee->remote = GNUNET_NO;
+ GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
+ ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
+ &ee->element_hash);
if (NULL != ee_dup)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
GNUNET_free (ee);
return;
}
- GNUNET_CONTAINER_multihashmap_put (set->state.u->elements,
&ee->element_hash, ee,
+ GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash,
ee,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
- strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash,
0));
+ strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
}
/**
* Destroy a set that supports the union operation
*
- * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
+ * @param set_state the set to destroy
*/
-void
-_GSS_union_set_destroy (struct Set *set)
+static void
+union_set_destroy (struct SetState *set_state)
{
- GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
- if (NULL != set->client)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
+ /* important to destroy operations before the rest of the set */
+ while (NULL != set_state->ops_head)
+ union_operation_destroy (set_state->ops_head);
+ if (NULL != set_state->se)
{
- GNUNET_SERVER_client_drop (set->client);
- set->client = NULL;
+ strata_estimator_destroy (set_state->se);
+ set_state->se = NULL;
}
- if (NULL != set->client_mq)
+ if (NULL != set_state->elements)
{
- GNUNET_MQ_destroy (set->client_mq);
- set->client_mq = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
+ destroy_elements_iterator, NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
+ set_state->elements = NULL;
}
- if (NULL != set->state.u->se)
- {
- strata_estimator_destroy (set->state.u->se);
- set->state.u->se = NULL;
- }
-
- destroy_elements (set->state.u);
-
- while (NULL != set->state.u->ops_head)
- {
- _GSS_union_operation_destroy (set->state.u->ops_head);
- }
+ GNUNET_free (set_state);
}
/**
* Remove the element given in the element message from the set.
* Only marks the element as removed, so that older set operations can still
exchange it.
*
- * @param m message with the element
- * @param set set to remove the element from
+ * @param set_state state of the set to remove from
+ * @param element set element to remove
*/
-void
-_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+union_remove (struct SetState *set_state, const struct GNUNET_SET_Element
*element)
{
struct GNUNET_HashCode hash;
struct ElementEntry *ee;
- GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
- GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
- ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
+ GNUNET_CRYPTO_hash (element->data, element->size, &hash);
+ ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
if (NULL == ee)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove
non-existing element\n");
@@ -1313,36 +1335,22 @@
return;
}
ee->removed = GNUNET_YES;
- ee->generation_removed = set->state.u->current_generation;
+ ee->generation_removed = set_state->current_generation;
}
/**
* Dispatch messages for a union operation.
*
- * @param cls closure
- * @param tunnel mesh tunnel
- * @param tunnel_ctx tunnel context
- * @param mh message to process
+ * @param eo the state of the union evaluate operation
+ * @param mh the received message
* @return GNUNET_SYSERR if the tunnel should be disconnected,
* GNUNET_OK otherwise
*/
int
-_GSS_union_handle_p2p_message (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
- const struct GNUNET_MessageHeader *mh)
+union_handle_p2p_message (struct OperationState *eo,
+ const struct GNUNET_MessageHeader *mh)
{
- struct TunnelContext *tc = *tunnel_ctx;
- struct UnionEvaluateOperation *eo;
-
- if (CONTEXT_OPERATION_UNION != tc->type)
- {
- return GNUNET_SYSERR;
- }
-
- eo = tc->data.union_op;
-
switch (ntohs (mh->type))
{
case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
@@ -1366,3 +1374,60 @@
}
return GNUNET_OK;
}
+
+
+static void
+union_peer_disconnect (struct OperationState *op)
+{
+ /* Are we already disconnected? */
+ if (NULL == op->tunnel)
+ return;
+ op->tunnel = NULL;
+ if (NULL != op->mq)
+ {
+ GNUNET_MQ_destroy (op->mq);
+ op->mq = NULL;
+ }
+ if (PHASE_FINISHED != op->phase)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SET_ResultMessage *msg;
+ ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ msg->request_id = htonl (op->spec->client_request_id);
+ msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
+ msg->element_type = htons (0);
+ GNUNET_MQ_send (op->spec->set->client_mq, ev);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected
prematurely\n");
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected
(finished)\n");
+ }
+ union_operation_destroy (op);
+}
+
+
+static void
+union_op_cancel (struct SetState *set_state, uint32_t op_id)
+{
+ /* FIXME: implement */
+}
+
+
+const struct SetVT *
+_GSS_union_vt (void)
+{
+ static const struct SetVT union_vt = {
+ .create = union_set_create,
+ .msg_handler = union_handle_p2p_message,
+ .add = union_add,
+ .remove = union_remove,
+ .destroy_set = union_set_destroy,
+ .evaluate = union_evaluate,
+ .accept = union_accept,
+ .peer_disconnect = union_peer_disconnect,
+ .cancel = union_op_cancel
+ };
+
+ return &union_vt;
+}
Modified: gnunet/src/set/gnunet-set-profiler.c
===================================================================
--- gnunet/src/set/gnunet-set-profiler.c 2013-07-10 01:26:04 UTC (rev
27859)
+++ gnunet/src/set/gnunet-set-profiler.c 2013-07-10 01:31:13 UTC (rev
27860)
@@ -16,7 +16,7 @@
along with GNUnet; see the file COPYING. If not, write to the
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
- */
+*/
/**
* @file set/gnunet-set-profiler.c
@@ -42,41 +42,25 @@
const static struct GNUNET_CONFIGURATION_Handle *config;
-struct GNUNET_CONTAINER_MultiHashMap *map_a;
-struct GNUNET_CONTAINER_MultiHashMap *map_b;
-struct GNUNET_CONTAINER_MultiHashMap *map_c;
+struct SetInfo
+{
+ char *id;
+ struct GNUNET_SET_Handle *set;
+ struct GNUNET_SET_OperationHandle *oh;
+ struct GNUNET_CONTAINER_MultiHashMap *sent;
+ struct GNUNET_CONTAINER_MultiHashMap *received;
+ int done;
+} info1, info2;
+struct GNUNET_CONTAINER_MultiHashMap *common_sent;
-/**
- * Elements that set a received, should match map_c
- * in the end.
- */
-struct GNUNET_CONTAINER_MultiHashMap *map_a_received;
-
-/**
- * Elements that set b received, should match map_c
- * in the end.
- */
-struct GNUNET_CONTAINER_MultiHashMap *map_b_received;
-
-struct GNUNET_SET_Handle *set_a;
-struct GNUNET_SET_Handle *set_b;
-
struct GNUNET_HashCode app_id;
struct GNUNET_PeerIdentity local_peer;
struct GNUNET_SET_ListenHandle *set_listener;
-struct GNUNET_SET_OperationHandle *set_oh1;
-struct GNUNET_SET_OperationHandle *set_oh2;
-
-int a_done;
-int b_done;
-
-
-
static int
map_remove_iterator (void *cls,
const struct GNUNET_HashCode *key,
@@ -85,66 +69,69 @@
struct GNUNET_CONTAINER_MultiHashMap *m = cls;
int ret;
+ GNUNET_assert (NULL != key);
+
ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL);
- GNUNET_assert (GNUNET_OK == ret);
+ if (GNUNET_OK != ret)
+ printf ("spurious element\n");
return GNUNET_YES;
}
-
static void
-set_result_cb_1 (void *cls,
- const struct GNUNET_SET_Element *element,
- enum GNUNET_SET_Status status)
+check_all_done (void)
{
- GNUNET_assert (GNUNET_NO == a_done);
- GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
- switch (status)
- {
- case GNUNET_SET_STATUS_DONE:
- case GNUNET_SET_STATUS_HALF_DONE:
- a_done = GNUNET_YES;
- GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator,
map_a_received);
- GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received));
- return;
- case GNUNET_SET_STATUS_FAILURE:
- GNUNET_assert (0);
- return;
- case GNUNET_SET_STATUS_OK:
- break;
- default:
- GNUNET_assert (0);
- }
- GNUNET_CONTAINER_multihashmap_put (map_a_received,
- element->data, NULL,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+ if (info1.done == GNUNET_NO || info2.done == GNUNET_NO)
+ return;
+
+ GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator,
info2.sent);
+ GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator,
info1.sent);
+
+ printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size
(info1.sent));
+ printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size
(info2.sent));
+
+ GNUNET_SCHEDULER_shutdown ();
}
static void
-set_result_cb_2 (void *cls,
+set_result_cb (void *cls,
const struct GNUNET_SET_Element *element,
enum GNUNET_SET_Status status)
{
- GNUNET_assert (GNUNET_NO == b_done);
- GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
+ struct SetInfo *info = cls;
+
+ GNUNET_assert (GNUNET_NO == info->done);
switch (status)
{
case GNUNET_SET_STATUS_DONE:
case GNUNET_SET_STATUS_HALF_DONE:
- b_done = GNUNET_YES;
- GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator,
map_b_received);
- GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received));
+ info->done = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id);
+ check_all_done ();
+ info->oh = NULL;
return;
case GNUNET_SET_STATUS_FAILURE:
- GNUNET_assert (0);
+ info->oh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
+ GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_SET_STATUS_OK:
break;
default:
GNUNET_assert (0);
}
- GNUNET_CONTAINER_multihashmap_put (map_b_received,
+
+ if (element->size != sizeof (struct GNUNET_HashCode))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u\n",
element->size);
+ GNUNET_assert (0);
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n",
+ info->id, GNUNET_h2s (element->data));
+ GNUNET_assert (NULL != element->data);
+ GNUNET_CONTAINER_multihashmap_put (info->received,
element->data, NULL,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
}
@@ -156,11 +143,16 @@
const struct GNUNET_MessageHeader *context_msg,
struct GNUNET_SET_Request *request)
{
- GNUNET_assert (NULL == set_oh2);
+ if (NULL == request)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "listener failed\n");
+ return;
+ }
+ GNUNET_assert (NULL == info2.oh);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set listen cb called\n");
- set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
- set_result_cb_2, NULL);
- GNUNET_SET_commit (set_oh2, set_b);
+ info2.oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+ set_result_cb, &info2);
+ GNUNET_SET_commit (info2.oh, info2.set);
}
@@ -185,6 +177,37 @@
static void
+handle_shutdown (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (NULL != set_listener)
+ {
+ GNUNET_SET_listen_cancel (set_listener);
+ set_listener = NULL;
+ }
+ if (NULL != info1.oh)
+ {
+ GNUNET_SET_operation_cancel (info1.oh);
+ info1.oh = NULL;
+ }
+ if (NULL != info2.oh)
+ {
+ GNUNET_SET_operation_cancel (info2.oh);
+ info2.oh = NULL;
+ }
+ if (NULL != info1.set)
+ {
+ GNUNET_SET_destroy (info1.set);
+ info1.set = NULL;
+ }
+ if (NULL != info2.set)
+ {
+ GNUNET_SET_destroy (info2.set);
+ info2.set = NULL;
+ }
+}
+
+static void
run (void *cls, char *const *args, const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
@@ -195,63 +218,41 @@
if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer))
{
- GNUNET_assert (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
+ ret = 0;
return;
}
+
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, handle_shutdown,
NULL);
+
+ info1.id = "a";
+ info2.id = "b";
- map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
- map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
- map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
+ info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
+ info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
+ common_sent = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
+ info1.received = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
+ info2.received = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
+
for (i = 0; i < num_a; i++)
{
GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
- {
- i--;
- continue;
- }
- GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
}
for (i = 0; i < num_b; i++)
{
GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
- {
- i--;
- continue;
- }
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
- {
- i--;
- continue;
- }
- GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL,
+ GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
}
for (i = 0; i < num_c; i++)
{
GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
- {
- i--;
- continue;
- }
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
- {
- i--;
- continue;
- }
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash))
- {
- i--;
- continue;
- }
- GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
- GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL,
+ GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
}
@@ -259,20 +260,22 @@
app_id = hash;
/* FIXME: also implement intersection etc. */
- set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
- set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+ info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+ info2.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
- GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a);
- GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b);
- GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a);
- GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b);
+ GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator,
info1.set);
+ GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator,
info2.set);
+ GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
info1.set);
+ GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator,
info2.set);
set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
&app_id, set_listen_cb, NULL);
- set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt,
GNUNET_SET_RESULT_ADDED,
- set_result_cb_1, NULL);
- GNUNET_SET_commit (set_oh1, set_a);
+ info1.oh = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt,
GNUNET_SET_RESULT_ADDED,
+ set_result_cb, &info1);
+ GNUNET_SET_commit (info1.oh, info1.set);
+ GNUNET_SET_destroy (info1.set);
+ info1.set = NULL;
}
Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/set.h 2013-07-10 01:31:13 UTC (rev 27860)
@@ -194,6 +194,24 @@
};
+/**
+ * Sent to the service by the client
+ * in order to cancel a set operation.
+ */
+struct GNUNET_SET_CancelMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * ID of the request we want to cancel.
+ */
+ uint32_t request_id GNUNET_PACKED;
+};
+
+
GNUNET_NETWORK_STRUCT_END
#endif
Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/set_api.c 2013-07-10 01:31:13 UTC (rev 27860)
@@ -33,7 +33,6 @@
#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
-
/**
* Opaque handle to a set.
*/
@@ -42,20 +41,47 @@
struct GNUNET_CLIENT_Connection *client;
struct GNUNET_MQ_Handle *mq;
unsigned int messages_since_ack;
+ struct GNUNET_SET_OperationHandle *ops_head;
+ struct GNUNET_SET_OperationHandle *ops_tail;
+ int destroy_requested;
};
+
/**
* Opaque handle to a set operation request from another peer.
*/
struct GNUNET_SET_Request
{
+ /**
+ * Id of the request, used to identify the request when
+ * accepting/rejecting it.
+ */
uint32_t accept_id;
+
+ /**
+ * Has the request been accepted already?
+ * GNUNET_YES/GNUNET_NO
+ */
int accepted;
};
+
+/**
+ * Handle to an operation.
+ * Only known to the service after commiting
+ * the handle with a set.
+ */
struct GNUNET_SET_OperationHandle
{
+ /**
+ * Function to be called when we have a result,
+ * or an error.
+ */
GNUNET_SET_ResultIterator result_cb;
+
+ /**
+ * Closure for result_cb.
+ */
void *result_cls;
/**
@@ -80,6 +106,17 @@
* used to patch the request id into the message when the set is known.
*/
uint32_t *request_id_addr;
+
+ /**
+ * Handles are kept in a linked list.
+ */
+ struct GNUNET_SET_OperationHandle *prev;
+
+ /**
+ * Handles are kept in a linked list.
+ */
+ struct GNUNET_SET_OperationHandle *next;
+
};
@@ -88,9 +125,25 @@
*/
struct GNUNET_SET_ListenHandle
{
+ /**
+ * Connection to the service.
+ */
struct GNUNET_CLIENT_Connection *client;
+
+ /**
+ * Message queue for the client.
+ */
struct GNUNET_MQ_Handle* mq;
+
+ /**
+ * Function to call on a new incoming request,
+ * or on error.
+ */
GNUNET_SET_ListenCallback listen_cb;
+
+ /**
+ * Closure for listen_cb.
+ */
void *listen_cls;
};
@@ -108,11 +161,13 @@
struct GNUNET_SET_Handle *set = cls;
struct GNUNET_SET_OperationHandle *oh;
struct GNUNET_SET_Element e;
+ enum GNUNET_SET_Status result_status;
-
GNUNET_assert (NULL != set);
GNUNET_assert (NULL != set->mq);
+ result_status = ntohs (msg->result_status);
+
if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
{
struct GNUNET_MQ_Envelope *mqm;
@@ -123,11 +178,14 @@
GNUNET_assert (NULL != oh);
/* status is not STATUS_OK => there's no attached element,
* and this is the last result message we get */
- if (htons (msg->result_status) != GNUNET_SET_STATUS_OK)
+ if (GNUNET_SET_STATUS_OK != result_status)
{
GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
+ GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
+ if (GNUNET_YES == oh->set->destroy_requested)
+ GNUNET_SET_destroy (oh->set);
if (NULL != oh->result_cb)
- oh->result_cb (oh->result_cls, NULL, htons (msg->result_status));
+ oh->result_cb (oh->result_cls, NULL, result_status);
GNUNET_free (oh);
return;
}
@@ -136,7 +194,7 @@
e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
e.type = msg->element_type;
if (NULL != oh->result_cb)
- oh->result_cb (oh->result_cls, &e, htons (msg->result_status));
+ oh->result_cb (oh->result_cls, &e, result_status);
}
/**
@@ -153,7 +211,7 @@
struct GNUNET_SET_Request *req;
struct GNUNET_MessageHeader *context_msg;
- LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "processing operation request\n");
req = GNUNET_new (struct GNUNET_SET_Request);
req->accept_id = ntohl (msg->accept_id);
context_msg = GNUNET_MQ_extract_nested_mh (msg);
@@ -171,16 +229,42 @@
amsg->accept_reject_id = msg->accept_id;
GNUNET_MQ_send (lh->mq, mqm);
GNUNET_free (req);
- LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n");
}
- LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n");
/* the accept-case is handled in GNUNET_SET_accept,
* as we have the accept message available there */
}
+static void
+handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_SET_ListenHandle *lh = cls;
+
+ lh->listen_cb (lh->listen_cls, NULL, NULL, NULL);
+}
+
+
+static void
+handle_client_set_error (void *cls, enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_SET_Handle *set = cls;
+
+ while (NULL != set->ops_head)
+ {
+ if (NULL != set->ops_head->result_cb)
+ set->ops_head->result_cb (set->ops_head->result_cls, NULL,
+ GNUNET_SET_STATUS_FAILURE);
+ GNUNET_SET_operation_cancel (set->ops_head);
+ }
+
+ /* FIXME: there should be a set error handler */
+}
+
+
/**
* Create an empty set, supporting the specified operation.
*
@@ -200,15 +284,16 @@
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_CreateMessage *msg;
static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
+ {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
GNUNET_MQ_HANDLERS_END
};
set = GNUNET_new (struct GNUNET_SET_Handle);
set->client = GNUNET_CLIENT_connect ("set", cfg);
- LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n");
GNUNET_assert (NULL != set->client);
- set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers,
set);
+ set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers,
+ handle_client_set_error,
set);
GNUNET_assert (NULL != set->mq);
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
msg->operation = htons (op);
@@ -279,6 +364,11 @@
void
GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
{
+ if (NULL != set->ops_head)
+ {
+ set->destroy_requested = GNUNET_YES;
+ return;
+ }
GNUNET_CLIENT_disconnect (set->client);
set->client = NULL;
GNUNET_MQ_destroy (set->mq);
@@ -332,7 +422,6 @@
return oh;
}
-
/**
* Wait for set operation requests for the given application id
*
@@ -365,7 +454,8 @@
lh->listen_cb = listen_cb;
lh->listen_cls = listen_cls;
GNUNET_assert (NULL != lh->client);
- lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, lh);
+ lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
+
handle_client_listener_error, lh);
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
msg->operation = htons (operation);
msg->app_id = *app_id;
@@ -413,6 +503,7 @@
struct GNUNET_SET_OperationHandle *oh;
struct GNUNET_SET_AcceptRejectMessage *msg;
+ GNUNET_assert (NULL != request);
GNUNET_assert (GNUNET_NO == request->accepted);
request->accepted = GNUNET_YES;
@@ -432,6 +523,9 @@
/**
* Cancel the given set operation.
+ * We need to send an explicit cancel message, as
+ * all operations communicate with the set's client
+ * handle.
*
* @param oh set operation to cancel
*/
@@ -441,17 +535,20 @@
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_OperationHandle *h_assoc;
- if (NULL != oh->set)
- {
- h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
- GNUNET_assert (h_assoc == oh);
- mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
- GNUNET_MQ_send (oh->set->mq, mqm);
- }
+ GNUNET_assert (NULL != oh->set);
+ GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
+ h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
+ GNUNET_assert (h_assoc == oh);
+ mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
+ GNUNET_MQ_send (oh->set->mq, mqm);
+
if (NULL != oh->conclude_mqm)
GNUNET_MQ_discard (oh->conclude_mqm);
+ if (GNUNET_YES == oh->set->destroy_requested)
+ GNUNET_SET_destroy (oh->set);
+
GNUNET_free (oh);
}
@@ -469,14 +566,15 @@
*/
void
GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
- struct GNUNET_SET_Handle *set)
+ struct GNUNET_SET_Handle *set)
{
GNUNET_assert (NULL == oh->set);
GNUNET_assert (NULL != oh->conclude_mqm);
oh->set = set;
- oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh);
+ GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, oh);
+ oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
*oh->request_id_addr = htonl (oh->request_id);
- GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
+ GNUNET_MQ_send (set->mq, oh->conclude_mqm);
oh->conclude_mqm = NULL;
oh->request_id_addr = NULL;
}
Modified: gnunet/src/util/mq.c
===================================================================
--- gnunet/src/util/mq.c 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/util/mq.c 2013-07-10 01:31:13 UTC (rev 27860)
@@ -612,6 +612,7 @@
struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection
*connection,
const struct GNUNET_MQ_MessageHandler
*handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
void *cls)
{
struct GNUNET_MQ_Handle *mq;
@@ -621,6 +622,7 @@
mq = GNUNET_new (struct GNUNET_MQ_Handle);
mq->handlers = handlers;
+ mq->error_handler = error_handler;
mq->handlers_cls = cls;
state = GNUNET_new (struct ClientConnectionState);
state->connection = connection;
@@ -708,18 +710,29 @@
void
GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
{
- /* FIXME: destroy all pending messages in the queue */
-
if (NULL != mq->destroy_impl)
{
mq->destroy_impl (mq, mq->impl_state);
}
+ while (NULL != mq->envelope_head)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+ ev = mq->envelope_head;
+ GNUNET_MQ_discard (ev);
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
+ }
+
+ if (NULL != mq->current_envelope)
+ {
+ GNUNET_MQ_discard (mq->current_envelope);
+ mq->current_envelope = NULL;
+ }
+
GNUNET_free (mq);
}
-
struct GNUNET_MessageHeader *
GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t
base_size)
{
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r27860 - in gnunet/src: consensus include set util,
gnunet <=