[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28145 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28145 - gnunet/src/fs |
Date: |
Thu, 18 Jul 2013 13:31:34 +0200 |
Author: grothoff
Date: 2013-07-18 13:31:34 +0200 (Thu, 18 Jul 2013)
New Revision: 28145
Added:
gnunet/src/fs/gnunet-service-fs_mesh_client.c
Modified:
gnunet/src/fs/gnunet-service-fs_mesh_server.c
Log:
-missing file, more cleanup
Added: gnunet/src/fs/gnunet-service-fs_mesh_client.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_client.c
(rev 0)
+++ gnunet/src/fs/gnunet-service-fs_mesh_client.c 2013-07-18 11:31:34 UTC
(rev 28145)
@@ -0,0 +1,743 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_mesh_client.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - PORT is set to old application type, unsure if we should keep
+ * it that way (fine for now)
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_mesh.h"
+
+
+/**
+ * After how long do we reset connections without replies?
+ */
+#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 30)
+
+
+/**
+ * Handle for a mesh to another peer.
+ */
+struct MeshHandle;
+
+
+/**
+ * Handle for a request that is going out via mesh API.
+ */
+struct GSF_MeshRequest
+{
+
+ /**
+ * DLL.
+ */
+ struct GSF_MeshRequest *next;
+
+ /**
+ * DLL.
+ */
+ struct GSF_MeshRequest *prev;
+
+ /**
+ * Which mesh is this request associated with?
+ */
+ struct MeshHandle *mh;
+
+ /**
+ * Function to call with the result.
+ */
+ GSF_MeshReplyProcessor proc;
+
+ /**
+ * Closure for 'proc'
+ */
+ void *proc_cls;
+
+ /**
+ * Query to transmit to the other peer.
+ */
+ struct GNUNET_HashCode query;
+
+ /**
+ * Desired type for the reply.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we transmit this request already? YES if we are
+ * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
+ */
+ int was_transmitted;
+};
+
+
+/**
+ * Handle for a mesh to another peer.
+ */
+struct MeshHandle
+{
+ /**
+ * Head of DLL of pending requests on this mesh.
+ */
+ struct GSF_MeshRequest *pending_head;
+
+ /**
+ * Tail of DLL of pending requests on this mesh.
+ */
+ struct GSF_MeshRequest *pending_tail;
+
+ /**
+ * Map from query to 'struct GSF_MeshRequest's waiting for
+ * a reply.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
+
+ /**
+ * Tunnel to the other peer.
+ */
+ struct GNUNET_MESH_Tunnel *tunnel;
+
+ /**
+ * Handle for active write operation, or NULL.
+ */
+ struct GNUNET_MESH_TransmitHandle *wh;
+
+ /**
+ * Which peer does this mesh go to?
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Task to kill inactive meshs (we keep them around for
+ * a few seconds to give the application a chance to give
+ * us another query).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Task to reset meshs that had errors (asynchronously,
+ * as we may not be able to do it immediately during a
+ * callback from the mesh API).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+};
+
+
+/**
+ * Mesh tunnel for creating outbound tunnels.
+ */
+static struct GNUNET_MESH_Handle *mesh_tunnel;
+
+/**
+ * Map from peer identities to 'struct MeshHandles' with mesh
+ * tunnels to those peers.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
+
+
+/* ********************* client-side code ************************* */
+
+
+/**
+ * Transmit pending requests via the mesh.
+ *
+ * @param mh mesh to process
+ */
+static void
+transmit_pending (struct MeshHandle *mh);
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct MeshHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_MeshRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct MeshHandle *mh = cls;
+ struct GSF_MeshRequest *sr = value;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
+ key,
+ value));
+ GNUNET_CONTAINER_DLL_insert (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ sr->was_transmitted = GNUNET_NO;
+ return GNUNET_YES;
+}
+
+
+/**
+ * We had a serious error, tear down and re-create mesh from scratch.
+ *
+ * @param mh mesh to reset
+ */
+static void
+reset_mesh (struct MeshHandle *mh)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Resetting mesh tunnel to %s\n",
+ GNUNET_i2s (&mh->target));
+ GNUNET_MESH_tunnel_destroy (mh->tunnel);
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &move_to_pending,
+ mh);
+ mh->tunnel = GNUNET_MESH_tunnel_create (mesh_tunnel,
+ mh,
+ &mh->target,
+
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ GNUNET_YES,
+ GNUNET_YES);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive mesh tunnel.
+ *
+ * @param cls the 'struct MeshHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+mesh_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MeshHandle *mh = cls;
+ struct GNUNET_MESH_Tunnel *tun;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on mesh tunnel to %s\n",
+ GNUNET_i2s (&mh->target));
+ mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ tun = mh->tunnel;
+ mh->tunnel = NULL;
+ GNUNET_MESH_tunnel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an mesh.
+ *
+ * @param cls the 'struct MeshHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_mesh_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MeshHandle *mh = cls;
+
+ mh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+ reset_mesh (mh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create mesh from scratch,
+ * but do so asynchronously.
+ *
+ * @param mh mesh to reset
+ */
+static void
+reset_mesh_async (struct MeshHandle *mh)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
+ GNUNET_SCHEDULER_cancel (mh->reset_task);
+ mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
+ mh);
+}
+
+
+/**
+ * Functions of this signature are called whenever we are ready to transmit
+ * query via a mesh.
+ *
+ * @param cls the struct MeshHandle for which we did the write call
+ * @param size the number of bytes that can be written to 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
+ */
+static size_t
+transmit_sqm (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct MeshHandle *mh = cls;
+ struct MeshQueryMessage sqm;
+ struct GSF_MeshRequest *sr;
+
+ mh->wh = NULL;
+ if (NULL == buf)
+ {
+ reset_mesh (mh);
+ return 0;
+ }
+ sr = mh->pending_head;
+ if (NULL == sr)
+ return 0;
+ GNUNET_assert (size >= sizeof (struct MeshQueryMessage));
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+ &sr->query,
+ sr,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending query for %s via mesh to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&mh->target));
+ sr->was_transmitted = GNUNET_YES;
+ sqm.header.size = htons (sizeof (sqm));
+ sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY);
+ sqm.type = htonl (sr->type);
+ sqm.query = sr->query;
+ memcpy (buf, &sqm, sizeof (sqm));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Successfully transmitted %u bytes via mesh to %s\n",
+ (unsigned int) size,
+ GNUNET_i2s (&mh->target));
+ transmit_pending (mh);
+ return sizeof (sqm);
+}
+
+
+/**
+ * Transmit pending requests via the mesh.
+ *
+ * @param mh mesh to process
+ */
+static void
+transmit_pending (struct MeshHandle *mh)
+{
+ if (NULL != mh->wh)
+ return;
+ mh->wh = GNUNET_MESH_notify_transmit_ready (mh->tunnel, GNUNET_YES /* allow
cork */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ sizeof (struct MeshQueryMessage),
+ &transmit_sqm, mh);
+}
+
+
+/**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+ /**
+ * Reply payload.
+ */
+ const void *data;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Number of bytes in 'data'.
+ */
+ size_t data_size;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we have a matching query?
+ */
+ int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_MeshRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct HandleReplyClosure *hrc = cls;
+ struct GSF_MeshRequest *sr = value;
+
+ sr->proc (sr->proc_cls,
+ hrc->type,
+ hrc->expiration,
+ hrc->data_size,
+ hrc->data);
+ GSF_mesh_query_cancel (sr);
+ hrc->found = GNUNET_YES;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Functions with this signature are called whenever a complete reply
+ * is received.
+ *
+ * @param cls closure with the 'struct MeshHandle'
+ * @param tunnel tunnel handle
+ * @param tunnel_ctx tunnel context
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+reply_cb (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct MeshHandle *mh = *tunnel_ctx;
+ const struct MeshReplyMessage *srm;
+ struct HandleReplyClosure hrc;
+ uint16_t msize;
+ enum GNUNET_BLOCK_Type type;
+ struct GNUNET_HashCode query;
+
+ msize = ntohs (message->size);
+ if (sizeof (struct MeshReplyMessage) > msize)
+ {
+ GNUNET_break_op (0);
+ reset_mesh_async (mh);
+ return GNUNET_SYSERR;
+ }
+ srm = (const struct MeshReplyMessage *) message;
+ msize -= sizeof (struct MeshReplyMessage);
+ type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
+ if (GNUNET_YES !=
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
+ type,
+ &srm[1], msize, &query))
+ {
+ GNUNET_break_op (0);
+ reset_mesh_async (mh);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received reply `%s' via mesh from peer %s\n",
+ GNUNET_h2s (&query),
+ GNUNET_i2s (&mh->target));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via mesh"), 1,
+ GNUNET_NO);
+ hrc.data = &srm[1];
+ hrc.data_size = msize;
+ hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+ hrc.type = type;
+ hrc.found = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
+ &query,
+ &handle_reply,
+ &hrc);
+ if (GNUNET_NO == hrc.found)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via mesh
dropped"), 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Get (or create) a mesh to talk to the given peer.
+ *
+ * @param target peer we want to communicate with
+ */
+static struct MeshHandle *
+get_mesh (const struct GNUNET_PeerIdentity *target)
+{
+ struct MeshHandle *mh;
+
+ mh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
+ &target->hashPubKey);
+ if (NULL != mh)
+ {
+ if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (mh->timeout_task);
+ mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ return mh;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating mesh tunnel to %s\n",
+ GNUNET_i2s (target));
+ mh = GNUNET_new (struct MeshHandle);
+ mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
+ &reset_mesh_task,
+ mh);
+ mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+ mh->target = *target;
+ mh->tunnel = GNUNET_MESH_tunnel_create (mesh_tunnel,
+ mh,
+ &mh->target,
+
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ GNUNET_NO,
+ GNUNET_YES);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (mesh_map,
+ &mh->target.hashPubKey,
+ mh,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ return mh;
+}
+
+
+/**
+ * Look for a block by directly contacting a particular peer.
+ *
+ * @param target peer that should have the block
+ * @param query hash to query for the block
+ * @param type desired type for the block
+ * @param proc function to call with result
+ * @param proc_cls closure for 'proc'
+ * @return handle to cancel the operation
+ */
+struct GSF_MeshRequest *
+GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_MeshReplyProcessor proc, void *proc_cls)
+{
+ struct MeshHandle *mh;
+ struct GSF_MeshRequest *sr;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing to send query for %s via mesh to %s\n",
+ GNUNET_h2s (query),
+ GNUNET_i2s (target));
+ mh = get_mesh (target);
+ sr = GNUNET_new (struct GSF_MeshRequest);
+ sr->mh = mh;
+ sr->proc = proc;
+ sr->proc_cls = proc_cls;
+ sr->type = type;
+ sr->query = *query;
+ GNUNET_CONTAINER_DLL_insert (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ transmit_pending (mh);
+ return sr;
+}
+
+
+/**
+ * Cancel an active request; must not be called after 'proc'
+ * was calld.
+ *
+ * @param sr request to cancel
+ */
+void
+GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
+{
+ struct MeshHandle *mh = sr->mh;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelled query for %s via mesh to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&sr->mh->target));
+ if (GNUNET_YES == sr->was_transmitted)
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
+ &sr->query,
+ sr));
+ else
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ GNUNET_free (sr);
+ if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
+ (NULL == mh->pending_head) )
+ mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &mesh_timeout,
+ mh);
+}
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct MeshHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_MeshRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_MeshRequest *sr = value;
+
+ sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ 0, NULL);
+ GSF_mesh_query_cancel (sr);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function called by mesh when a client disconnects.
+ * Cleans up our 'struct MeshClient' of that tunnel.
+ *
+ * @param cls NULL
+ * @param tunnel tunnel of the disconnecting client
+ * @param tunnel_ctx our 'struct MeshClient'
+ */
+static void
+cleaner_cb (void *cls,
+ const struct GNUNET_MESH_Tunnel *tunnel,
+ void *tunnel_ctx)
+{
+ struct MeshHandle *mh = tunnel_ctx;
+ struct GSF_MeshRequest *sr;
+
+ mh->tunnel = NULL;
+ while (NULL != (sr = mh->pending_head))
+ {
+ sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ 0, NULL);
+ GSF_mesh_query_cancel (sr);
+ }
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &free_waiting_entry,
+ mh);
+ if (NULL != mh->wh)
+ GNUNET_MESH_notify_transmit_ready_cancel (mh->wh);
+ if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
+ GNUNET_SCHEDULER_cancel (mh->timeout_task);
+ if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
+ GNUNET_SCHEDULER_cancel (mh->reset_task);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (mesh_map,
+ &mh->target.hashPubKey,
+ mh));
+ GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
+ GNUNET_free (mh);
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_start_client ()
+{
+ static const struct GNUNET_MESH_MessageHandler handlers[] = {
+ { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 },
+ { NULL, 0, 0 }
+ };
+ static const uint32_t ports[] = {
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ 0
+ };
+
+ mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+ mesh_tunnel = GNUNET_MESH_connect (GSF_cfg,
+ NULL,
+ NULL,
+ &cleaner_cb,
+ handlers,
+ ports);
+}
+
+
+/**
+ * Function called on each active meshs to shut them down.
+ *
+ * @param cls NULL
+ * @param key target peer, unused
+ * @param value the 'struct MeshHandle' to destroy
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+release_meshs (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct MeshHandle *mh = value;
+ struct GNUNET_MESH_Tunnel *tun;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on mesh tunnel to %s\n",
+ GNUNET_i2s (&mh->target));
+ tun = mh->tunnel;
+ mh->tunnel = NULL;
+ if (NULL != tun)
+ GNUNET_MESH_tunnel_destroy (tun);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_mesh_stop_client ()
+{
+ GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
+ &release_meshs,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
+ mesh_map = NULL;
+ if (NULL != mesh_tunnel)
+ {
+ GNUNET_MESH_disconnect (mesh_tunnel);
+ mesh_tunnel = NULL;
+ }
+}
+
+
+/* end of gnunet-service-fs_mesh_client.c */
Modified: gnunet/src/fs/gnunet-service-fs_mesh_server.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_server.c 2013-07-18 11:19:51 UTC
(rev 28144)
+++ gnunet/src/fs/gnunet-service-fs_mesh_server.c 2013-07-18 11:31:34 UTC
(rev 28145)
@@ -24,16 +24,6 @@
* @author Christian Grothoff
*
* TODO:
- * - MESH2 API doesn't allow flow control for server yet (needed!)
- * - likely need to register clean up handler with mesh to handle
- * client disconnect (likely leaky right now)
- * - server is optional, currently client code will NPE if we have
- * no server, again MESH2 API requirement forcing this for now
- * - message handlers are symmetric for client/server, should be
- * separated (currently clients can get requests and servers can
- * handle answers, not good)
- * - code is entirely untested
- * - might have overlooked a few possible simplifications
* - PORT is set to old application type, unsure if we should keep
* it that way (fine for now)
*/
@@ -91,9 +81,9 @@
struct MeshClient *prev;
/**
- * Socket for communication.
+ * Tunnel for communication.
*/
- struct GNUNET_MESH_Tunnel *socket;
+ struct GNUNET_MESH_Tunnel *tunnel;
/**
* Handle for active write operation, or NULL.
@@ -134,9 +124,9 @@
/**
- * Listen socket for incoming requests.
+ * Listen tunnel for incoming requests.
*/
-static struct GNUNET_MESH_Handle *listen_socket;
+static struct GNUNET_MESH_Handle *listen_tunnel;
/**
* Head of DLL of mesh clients.
@@ -160,49 +150,7 @@
-/* ********************* server-side code ************************* */
-
-
/**
- * We're done with a particular client, clean up.
- *
- * @param sc client to clean up
- */
-static void
-terminate_mesh (struct MeshClient *sc)
-{
- struct WriteQueueItem *wqi;
-
- fprintf (stderr,
- "terminate mesh called for %p\n",
- sc);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# mesh connections active"), -1,
- GNUNET_NO);
- if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
- GNUNET_SCHEDULER_cancel (sc->terminate_task);
- if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
- GNUNET_SCHEDULER_cancel (sc->timeout_task);
- if (NULL != sc->wh)
- GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
- if (NULL != sc->qe)
- GNUNET_DATASTORE_cancel (sc->qe);
- while (NULL != (wqi = sc->wqi_head))
- {
- GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- GNUNET_free (wqi);
- }
- GNUNET_CONTAINER_DLL_remove (sc_head,
- sc_tail,
- sc);
- sc_count--;
- GNUNET_free (sc);
-}
-
-
-/**
* Task run to asynchronously terminate the mesh due to timeout.
*
* @param cls the 'struct MeshClient'
@@ -216,8 +164,8 @@
struct GNUNET_MESH_Tunnel *tun;
sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- tun = sc->socket;
- sc->socket = NULL;
+ tun = sc->tunnel;
+ sc->tunnel = NULL;
GNUNET_MESH_tunnel_destroy (tun);
}
@@ -247,7 +195,7 @@
continue_reading (struct MeshClient *sc)
{
refresh_timeout_task (sc);
- GNUNET_MESH_receive_done (sc->socket);
+ GNUNET_MESH_receive_done (sc->tunnel);
}
@@ -274,6 +222,7 @@
void *buf)
{
struct MeshClient *sc = cls;
+ struct GNUNET_MESH_Tunnel *tun;
struct WriteQueueItem *wqi;
size_t ret;
@@ -288,7 +237,9 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmission of reply failed, terminating mesh\n");
- terminate_mesh (sc);
+ tun = sc->tunnel;
+ sc->tunnel = NULL;
+ GNUNET_MESH_tunnel_destroy (tun);
return 0;
}
GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
@@ -316,6 +267,7 @@
continue_writing (struct MeshClient *sc)
{
struct WriteQueueItem *wqi;
+ struct GNUNET_MESH_Tunnel *tun;
if (NULL != sc->wh)
{
@@ -330,7 +282,7 @@
continue_reading (sc);
return;
}
- sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
+ sc->wh = GNUNET_MESH_notify_transmit_ready (sc->tunnel, GNUNET_NO,
GNUNET_TIME_UNIT_FOREVER_REL,
wqi->msize,
&write_continuation,
@@ -339,7 +291,9 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Write failed; terminating mesh\n");
- terminate_mesh (sc);
+ tun = sc->tunnel;
+ sc->tunnel = NULL;
+ GNUNET_MESH_tunnel_destroy (tun);
return;
}
}
@@ -361,7 +315,7 @@
*/
static void
handle_datastore_reply (void *cls,
- const struct GNUNET_HashCode * key,
+ const struct GNUNET_HashCode *key,
size_t size, const void *data,
enum GNUNET_BLOCK_Type type,
uint32_t priority,
@@ -378,7 +332,8 @@
if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Performing on-demand encoding\n");
+ "Performing on-demand encoding for query %s\n",
+ GNUNET_h2s (key));
if (GNUNET_OK !=
GNUNET_FS_handle_on_demand_block (key,
size, data, type,
@@ -440,13 +395,11 @@
struct MeshClient *sc = *tunnel_ctx;
const struct MeshQueryMessage *sqm;
- fprintf (stderr,
- "Request gets %p\n",
- sc);
sqm = (const struct MeshQueryMessage *) message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received query for `%s' via mesh\n",
- GNUNET_h2s (&sqm->query));
+ "Received query for `%s' via mesh from client %p\n",
+ GNUNET_h2s (&sqm->query),
+ sc);
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# queries received via mesh"), 1,
GNUNET_NO);
@@ -473,7 +426,7 @@
* Functions of this type are called upon new mesh connection from other peers.
*
* @param cls the closure from GNUNET_MESH_connect
- * @param socket the socket representing the mesh
+ * @param tunnel the tunnel representing the mesh
* @param initiator the identity of the peer who wants to establish a mesh
* with us; NULL on binding error
* @param port mesh port used for the incoming connection
@@ -481,37 +434,35 @@
*/
static void *
accept_cb (void *cls,
- struct GNUNET_MESH_Tunnel *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
const struct GNUNET_PeerIdentity *initiator,
uint32_t port)
{
struct MeshClient *sc;
- GNUNET_assert (NULL != socket);
+ GNUNET_assert (NULL != tunnel);
if (sc_count >= sc_count_max)
{
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# mesh client connections
rejected"), 1,
GNUNET_NO);
- GNUNET_MESH_tunnel_destroy (socket);
+ GNUNET_MESH_tunnel_destroy (tunnel);
return NULL;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Accepting inbound mesh connection from `%s'\n",
- GNUNET_i2s (initiator));
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# mesh connections active"), 1,
GNUNET_NO);
sc = GNUNET_new (struct MeshClient);
- sc->socket = socket;
+ sc->tunnel = tunnel;
GNUNET_CONTAINER_DLL_insert (sc_head,
sc_tail,
sc);
sc_count++;
refresh_timeout_task (sc);
- fprintf (stderr,
- "Accept returns %p\n",
- sc);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Accepting inbound mesh connection from `%s' as client %p\n",
+ GNUNET_i2s (initiator),
+ sc);
return sc;
}
@@ -530,12 +481,37 @@
void *tunnel_ctx)
{
struct MeshClient *sc = tunnel_ctx;
+ struct WriteQueueItem *wqi;
- fprintf (stderr,
- "Cleaner called with %p\n",
- sc);
- if (NULL != sc)
- terminate_mesh (sc);
+ if (NULL == sc)
+ return;
+ sc->tunnel = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Terminating mesh connection with client %p\n",
+ sc);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# mesh connections active"), -1,
+ GNUNET_NO);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+ GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ if (NULL != sc->wh)
+ GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
+ if (NULL != sc->qe)
+ GNUNET_DATASTORE_cancel (sc->qe);
+ while (NULL != (wqi = sc->wqi_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ GNUNET_free (wqi);
+ }
+ GNUNET_CONTAINER_DLL_remove (sc_head,
+ sc_tail,
+ sc);
+ sc_count--;
+ GNUNET_free (sc);
}
@@ -560,7 +536,7 @@
"MAX_MESH_CLIENTS",
&sc_count_max))
return;
- listen_socket = GNUNET_MESH_connect (GSF_cfg,
+ listen_tunnel = GNUNET_MESH_connect (GSF_cfg,
NULL,
&accept_cb,
&cleaner_cb,
@@ -575,15 +551,13 @@
void
GSF_mesh_stop_server ()
{
- struct MeshClient *sc;
-
- while (NULL != (sc = sc_head))
- terminate_mesh (sc);
- if (NULL != listen_socket)
+ if (NULL != listen_tunnel)
{
- GNUNET_MESH_disconnect (listen_socket);
- listen_socket = NULL;
+ GNUNET_MESH_disconnect (listen_tunnel);
+ listen_tunnel = NULL;
}
+ GNUNET_assert (NULL == sc_head);
+ GNUNET_assert (0 == sc_count);
}
/* end of gnunet-service-fs_mesh.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28145 - gnunet/src/fs,
gnunet <=