[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] branch master updated: get FS test with CADET to f
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] branch master updated: get FS test with CADET to finally pass again |
Date: |
Fri, 17 Feb 2017 14:30:36 +0100 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new 181c039d1 get FS test with CADET to finally pass again
181c039d1 is described below
commit 181c039d12aa2aa99920d14070e7b64c018e8be7
Author: Christian Grothoff <address@hidden>
AuthorDate: Fri Feb 17 14:31:38 2017 +0100
get FS test with CADET to finally pass again
---
src/cadet/cadet_api_new.c | 5 +-
src/fs/Makefile.am | 2 +-
src/fs/gnunet-service-fs_cadet_client.c | 358 +++++++++++++++-----------------
src/fs/gnunet-service-fs_cadet_server.c | 10 +-
4 files changed, 170 insertions(+), 205 deletions(-)
diff --git a/src/cadet/cadet_api_new.c b/src/cadet/cadet_api_new.c
index 8f482aa28..eb8bc2549 100644
--- a/src/cadet/cadet_api_new.c
+++ b/src/cadet/cadet_api_new.c
@@ -711,11 +711,10 @@ handle_local_data (void *cls,
type = ntohs (payload->type);
fwd = ntohl (ch->ccn.channel_of_client) <= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got a %s data on channel %s [%X] of type %s (%u)\n",
- GC_f2s (fwd),
+ "Got a %s data on channel %s [%X] of type %u\n",
+ fwd ? "FWD" : "BWD",
GNUNET_i2s (&ch->peer),
ntohl (message->ccn.channel_of_client),
- GC_m2s (type),
type);
GNUNET_MQ_inject_message (ch->mq,
payload);
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am
index 344eb5a74..75451c7f6 100644
--- a/src/fs/Makefile.am
+++ b/src/fs/Makefile.am
@@ -202,7 +202,7 @@ gnunet_service_fs_LDADD = \
$(top_builddir)/src/block/libgnunetblock.la \
$(top_builddir)/src/datastore/libgnunetdatastore.la \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
- $(top_builddir)/src/cadet/libgnunetcadet.la \
+ $(top_builddir)/src/cadet/libgnunetcadetnew.la \
$(top_builddir)/src/ats/libgnunetats.la \
$(top_builddir)/src/core/libgnunetcore.la \
$(top_builddir)/src/util/libgnunetutil.la \
diff --git a/src/fs/gnunet-service-fs_cadet_client.c
b/src/fs/gnunet-service-fs_cadet_client.c
index 193fe2263..55e0cbc24 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -77,7 +77,7 @@ struct GSF_CadetRequest
GSF_CadetReplyProcessor proc;
/**
- * Closure for 'proc'
+ * Closure for @e proc
*/
void *proc_cls;
@@ -126,11 +126,6 @@ struct CadetHandle
struct GNUNET_CADET_Channel *channel;
/**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_CADET_TransmitHandle *wh;
-
- /**
* Which peer does this cadet go to?
*/
struct GNUNET_PeerIdentity target;
@@ -140,14 +135,14 @@ struct CadetHandle
* a few seconds to give the application a chance to give
* us another query).
*/
- struct GNUNET_SCHEDULER_Task * timeout_task;
+ struct GNUNET_SCHEDULER_Task *timeout_task;
/**
* Task to reset cadets that had errors (asynchronously,
* as we may not be able to do it immediately during a
* callback from the cadet API).
*/
- struct GNUNET_SCHEDULER_Task * reset_task;
+ struct GNUNET_SCHEDULER_Task *reset_task;
};
@@ -170,10 +165,10 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
/**
* Transmit pending requests via the cadet.
*
- * @param mh cadet to process
+ * @param cls `struct CadetHandle` to process
*/
static void
-transmit_pending (struct CadetHandle *mh);
+transmit_pending (void *cls);
/**
@@ -206,65 +201,19 @@ move_to_pending (void *cls,
/**
- * We had a serious error, tear down and re-create cadet from scratch.
- *
- * @param mh cadet to reset
- */
-static void
-reset_cadet (struct CadetHandle *mh)
-{
- struct GNUNET_CADET_Channel *channel = mh->channel;
- struct GNUNET_HashCode port;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->channel = NULL;
-
- if (NULL != channel)
- {
- /* Avoid loop */
- if (NULL != mh->wh)
- {
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
- mh->wh = NULL;
- }
- GNUNET_CADET_channel_destroy (channel);
- }
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &move_to_pending,
- mh);
- GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
- strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
- &port);
- mh->channel = GNUNET_CADET_channel_create (cadet_handle,
- mh,
- &mh->target,
- &port,
- GNUNET_CADET_OPTION_RELIABLE);
- transmit_pending (mh);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive cadet channel.
+ * Functions with this signature are called whenever a complete reply
+ * is received.
*
- * @param cls the `struct CadetHandle` to tear down
+ * @param cls closure with the `struct CadetHandle`
+ * @param srm the actual message
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
*/
-static void
-cadet_timeout (void *cls)
+static int
+check_reply (void *cls,
+ const struct CadetReplyMessage *srm)
{
- struct CadetHandle *mh = cls;
- struct GNUNET_CADET_Channel *tun;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->timeout_task = NULL;
- tun = mh->channel;
- mh->channel = NULL;
- if(NULL != tun)
- GNUNET_CADET_channel_destroy (tun);
+ /* We check later... */
+ return GNUNET_OK;
}
@@ -274,13 +223,7 @@ cadet_timeout (void *cls)
* @param cls the `struct CadetHandle` to tear down
*/
static void
-reset_cadet_task (void *cls)
-{
- struct CadetHandle *mh = cls;
-
- mh->reset_task = NULL;
- reset_cadet (mh);
-}
+reset_cadet_task (void *cls);
/**
@@ -300,83 +243,6 @@ reset_cadet_async (struct CadetHandle *mh)
/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a cadet.
- *
- * @param cls the struct CadetHandle for which we did the write call
- * @param size the number of bytes that can be written to @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_sqm (void *cls,
- size_t size,
- void *buf)
-{
- struct CadetHandle *mh = cls;
- struct CadetQueryMessage sqm;
- struct GSF_CadetRequest *sr;
-
- mh->wh = NULL;
- if (NULL == buf)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cadet channel to %s failed during transmission attempt,
rebuilding\n",
- GNUNET_i2s (&mh->target));
- reset_cadet_async (mh);
- return 0;
- }
- sr = mh->pending_head;
- if (NULL == sr)
- return 0;
- GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
- GNUNET_CONTAINER_DLL_remove (mh->pending_head,
- mh->pending_tail,
- sr);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
- &sr->query,
- sr,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- sr->was_transmitted = GNUNET_YES;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending query for %s via cadet to %s\n",
- GNUNET_h2s (&sr->query),
- GNUNET_i2s (&mh->target));
- sqm.header.size = htons (sizeof (sqm));
- sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
- sqm.type = htonl (sr->type);
- sqm.query = sr->query;
- GNUNET_memcpy (buf, &sqm, sizeof (sqm));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully transmitted %u bytes via cadet to %s\n",
- (unsigned int) size,
- GNUNET_i2s (&mh->target));
- transmit_pending (mh);
- return sizeof (sqm);
-}
-
-
-/**
- * Transmit pending requests via the cadet.
- *
- * @param mh cadet to process
- */
-static void
-transmit_pending (struct CadetHandle *mh)
-{
- if (NULL == mh->channel)
- return;
- if (NULL != mh->wh)
- return;
- mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /*
allow cork */,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct CadetQueryMessage),
- &transmit_sqm, mh);
-}
-
-
-/**
* Closure for handle_reply().
*/
struct HandleReplyClosure
@@ -393,7 +259,7 @@ struct HandleReplyClosure
struct GNUNET_TIME_Absolute expiration;
/**
- * Number of bytes in 'data'.
+ * Number of bytes in @e data.
*/
size_t data_size;
@@ -439,19 +305,24 @@ process_reply (void *cls,
/**
- * Functions with this signature are called whenever a complete reply
- * is received.
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
*
- * @param cls closure with the `struct CadetHandle`
- * @param srm the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
*/
static int
-check_reply (void *cls,
- const struct CadetReplyMessage *srm)
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
{
- /* We check later... */
- return GNUNET_OK;
+ struct GSF_CadetRequest *sr = value;
+
+ GSF_cadet_query_cancel (sr);
+ return GNUNET_YES;
}
@@ -517,28 +388,6 @@ handle_reply (void *cls,
/**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GSF_CadetRequest *sr = value;
-
- GSF_cadet_query_cancel (sr);
- return GNUNET_YES;
-}
-
-
-/**
* Function called by cadet when a client disconnects.
* Cleans up our `struct CadetClient` of that channel.
*
@@ -569,8 +418,6 @@ disconnect_cb (void *cls,
GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
&free_waiting_entry,
mh);
- if (NULL != mh->wh)
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
if (NULL != mh->timeout_task)
GNUNET_SCHEDULER_cancel (mh->timeout_task);
if (NULL != mh->reset_task)
@@ -602,6 +449,133 @@ window_change_cb (void *cls,
int window_size)
{
/* FIXME: for flow control, implement? */
+#if 0
+ /* Something like this instead of the GNUNET_MQ_notify_sent() in
+ transmit_pending() might be good (once the window change CB works...) */
+ if (0 < window_size) /* test needed? */
+ transmit_pending (mh);
+#endif
+}
+
+
+/**
+ * We had a serious error, tear down and re-create cadet from scratch.
+ *
+ * @param mh cadet to reset
+ */
+static void
+reset_cadet (struct CadetHandle *mh)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Resetting cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ GNUNET_CADET_channel_destroy (mh->channel);
+ mh->channel = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &move_to_pending,
+ mh);
+ {
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (reply,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+ struct CadetReplyMessage,
+ mh),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_HashCode port;
+
+ GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+ strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+ &port);
+ mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+ mh,
+ &mh->target,
+ &port,
+ GNUNET_CADET_OPTION_RELIABLE,
+ &window_change_cb,
+ &disconnect_cb,
+ handlers);
+ }
+ transmit_pending (mh);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive cadet channel.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+cadet_timeout (void *cls)
+{
+ struct CadetHandle *mh = cls;
+ struct GNUNET_CADET_Channel *tun;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ mh->timeout_task = NULL;
+ tun = mh->channel;
+ mh->channel = NULL;
+ if (NULL != tun)
+ GNUNET_CADET_channel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an cadet.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+reset_cadet_task (void *cls)
+{
+ struct CadetHandle *mh = cls;
+
+ mh->reset_task = NULL;
+ reset_cadet (mh);
+}
+
+
+/**
+ * Transmit pending requests via the cadet.
+ *
+ * @param cls `struct CadetHandle` to process
+ */
+static void
+transmit_pending (void *cls)
+{
+ struct CadetHandle *mh = cls;
+ struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
+ struct GSF_CadetRequest *sr;
+ struct GNUNET_MQ_Envelope *env;
+ struct CadetQueryMessage *sqm;
+
+ if ( (0 != GNUNET_MQ_get_length (mq)) ||
+ (NULL == (sr = mh->pending_head)) )
+ return;
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+ &sr->query,
+ sr,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ sr->was_transmitted = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending query for %s via cadet to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&mh->target));
+ env = GNUNET_MQ_msg (sqm,
+ GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
+ sqm->type = htonl (sr->type);
+ sqm->query = sr->query;
+ GNUNET_MQ_notify_sent (env,
+ &transmit_pending,
+ mh);
+ GNUNET_MQ_send (mq,
+ env);
}
@@ -614,7 +588,6 @@ static struct CadetHandle *
get_cadet (const struct GNUNET_PeerIdentity *target)
{
struct CadetHandle *mh;
- struct GNUNET_HashCode port;
mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
target);
@@ -641,10 +614,6 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
&mh->target,
mh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
- strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
- &port);
-
{
struct GNUNET_MQ_MessageHandler handlers[] = {
GNUNET_MQ_hd_var_size (reply,
@@ -653,7 +622,11 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
mh),
GNUNET_MQ_handler_end ()
};
+ struct GNUNET_HashCode port;
+ GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+ strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+ &port);
mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
mh,
&mh->target,
@@ -679,9 +652,10 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
*/
struct GSF_CadetRequest *
GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_HashCode *query,
- enum GNUNET_BLOCK_Type type,
- GSF_CadetReplyProcessor proc, void *proc_cls)
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_CadetReplyProcessor proc,
+ void *proc_cls)
{
struct CadetHandle *mh;
struct GSF_CadetRequest *sr;
diff --git a/src/fs/gnunet-service-fs_cadet_server.c
b/src/fs/gnunet-service-fs_cadet_server.c
index 0a72a8279..adbce1154 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- Copyright (C) 2012, 2013 GNUnet e.V.
+ Copyright (C) 2012, 2013, 2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -86,11 +86,6 @@ struct CadetClient
struct GNUNET_CADET_Channel *channel;
/**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_CADET_TransmitHandle *wh;
-
- /**
* Head of write queue.
*/
struct WriteQueueItem *wqi_head;
@@ -439,8 +434,6 @@ disconnect_cb (void *cls,
GNUNET_SCHEDULER_cancel (sc->terminate_task);
if (NULL != sc->timeout_task)
GNUNET_SCHEDULER_cancel (sc->timeout_task);
- if (NULL != sc->wh)
- GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
if (NULL != sc->qe)
GNUNET_DATASTORE_cancel (sc->qe);
while (NULL != (wqi = sc->wqi_head))
@@ -458,7 +451,6 @@ disconnect_cb (void *cls,
}
-
/**
* Function called whenever an MQ-channel's transmission window size changes.
*
--
To stop receiving notification emails like this one, please contact
address@hidden
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] [gnunet] branch master updated: get FS test with CADET to finally pass again,
gnunet <=