[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] 04/04: starting with another CADET testcase (unfin
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] 04/04: starting with another CADET testcase (unfinished) |
Date: |
Wed, 23 May 2018 00:01:04 +0200 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository gnunet.
commit 8d7ab16402048793275b6feebd744e5e17af63fb
Author: Christian Grothoff <address@hidden>
AuthorDate: Wed May 23 00:00:58 2018 +0200
starting with another CADET testcase (unfinished)
---
src/cadet/cadet_test_lib.c | 65 +--
src/cadet/test_cadet_flow.c | 888 ++++++++++++++++++++++++++++++++++++++++
src/cadet/test_cadet_local_mq.c | 6 +-
3 files changed, 928 insertions(+), 31 deletions(-)
diff --git a/src/cadet/cadet_test_lib.c b/src/cadet/cadet_test_lib.c
index 20ef028b2..0efb81ab4 100644
--- a/src/cadet/cadet_test_lib.c
+++ b/src/cadet/cadet_test_lib.c
@@ -110,7 +110,7 @@ struct GNUNET_CADET_TEST_AdapterContext
* Port handlers for open ports.
*/
struct GNUNET_CADET_Port **ports;
-
+
/**
* General context.
*/
@@ -135,14 +135,13 @@ cadet_connect_adapter (void *cls,
struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
struct GNUNET_CADET_TEST_Context *ctx = actx->ctx;
struct GNUNET_CADET_Handle *h;
- unsigned int i;
h = GNUNET_CADET_connect (cfg);
if (NULL == ctx->ports)
return h;
-
- actx->ports = GNUNET_new_array (ctx->port_count, struct GNUNET_CADET_Port *);
- for (i = 0; i < ctx->port_count; i++)
+ actx->ports = GNUNET_new_array (ctx->port_count,
+ struct GNUNET_CADET_Port *);
+ for (unsigned int i = 0; i < ctx->port_count; i++)
{
actx->ports[i] = GNUNET_CADET_open_port (h,
ctx->ports[i],
@@ -165,14 +164,14 @@ cadet_connect_adapter (void *cls,
*/
static void
cadet_disconnect_adapter (void *cls,
- void *op_result)
+ void *op_result)
{
struct GNUNET_CADET_Handle *cadet = op_result;
struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
if (NULL != actx->ports)
{
- for (int i = 0; i < actx->ctx->port_count; i++)
+ for (unsigned int i = 0; i < actx->ctx->port_count; i++)
{
GNUNET_CADET_close_port (actx->ports[i]);
actx->ports[i] = NULL;
@@ -201,22 +200,24 @@ cadet_connect_cb (void *cls,
const char *emsg)
{
struct GNUNET_CADET_TEST_Context *ctx = cls;
- unsigned int i;
if (NULL != emsg)
{
- fprintf (stderr, "Failed to connect to CADET service: %s\n",
+ fprintf (stderr,
+ "Failed to connect to CADET service: %s\n",
emsg);
GNUNET_SCHEDULER_shutdown ();
return;
}
- for (i = 0; i < ctx->num_peers; i++)
+ for (unsigned int i = 0; i < ctx->num_peers; i++)
if (op == ctx->ops[i])
{
ctx->cadets[i] = ca_result;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "...cadet %u connected\n", i);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "...cadet %u connected\n",
+ i);
}
- for (i = 0; i < ctx->num_peers; i++)
+ for (unsigned int i = 0; i < ctx->num_peers; i++)
if (NULL == ctx->cadets[i])
return; /* still some CADET connections missing */
/* all CADET connections ready! */
@@ -231,9 +232,7 @@ cadet_connect_cb (void *cls,
void
GNUNET_CADET_TEST_cleanup (struct GNUNET_CADET_TEST_Context *ctx)
{
- unsigned int i;
-
- for (i = 0; i < ctx->num_peers; i++)
+ for (unsigned int i = 0; i < ctx->num_peers; i++)
{
GNUNET_assert (NULL != ctx->ops[i]);
GNUNET_TESTBED_operation_done (ctx->ops[i]);
@@ -269,33 +268,37 @@ cadet_test_run (void *cls,
unsigned int links_failed)
{
struct GNUNET_CADET_TEST_Context *ctx = cls;
- unsigned int i;
if (0 != links_failed)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Some links failed (%u), ending\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Some links failed (%u), ending\n",
links_failed);
exit (2);
}
-
if (num_peers != ctx->num_peers)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peers started %u/%u, ending\n",
- num_peers, ctx->num_peers);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Peers started %u/%u, ending\n",
+ num_peers,
+ ctx->num_peers);
exit (1);
}
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Testbed up, %u peers and %u links\n",
- num_peers, links_succeeded);
+ num_peers,
+ links_succeeded);
ctx->peers = peers;
- for (i = 0; i < num_peers; i++)
+ for (unsigned int i = 0; i < num_peers; i++)
{
struct GNUNET_CADET_TEST_AdapterContext *newctx;
+
newctx = GNUNET_new (struct GNUNET_CADET_TEST_AdapterContext);
newctx->peer = i;
newctx->ctx = ctx;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connecting to cadet %u\n", i);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Connecting to cadet %u\n",
+ i);
ctx->ops[i] = GNUNET_TESTBED_service_connect (ctx,
peers[i],
"cadet",
@@ -304,7 +307,9 @@ cadet_test_run (void *cls,
&cadet_connect_adapter,
&cadet_disconnect_adapter,
newctx);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "op handle %p\n", ctx->ops[i]);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "op handle %p\n",
+ ctx->ops[i]);
}
}
@@ -340,8 +345,10 @@ GNUNET_CADET_TEST_ruN (const char *testname,
ctx = GNUNET_new (struct GNUNET_CADET_TEST_Context);
ctx->num_peers = num_peers;
- ctx->ops = GNUNET_new_array (num_peers, struct GNUNET_TESTBED_Operation *);
- ctx->cadets = GNUNET_new_array (num_peers, struct GNUNET_CADET_Handle *);
+ ctx->ops = GNUNET_new_array (num_peers,
+ struct GNUNET_TESTBED_Operation *);
+ ctx->cadets = GNUNET_new_array (num_peers,
+ struct GNUNET_CADET_Handle *);
ctx->app_main = tmain;
ctx->app_main_cls = tmain_cls;
ctx->connects = connects;
@@ -352,12 +359,12 @@ GNUNET_CADET_TEST_ruN (const char *testname,
ctx->port_count = 0;
while (NULL != ctx->ports[ctx->port_count])
ctx->port_count++;
-
GNUNET_TESTBED_test_run (testname,
cfgfile,
num_peers,
0LL, NULL, NULL,
- &cadet_test_run, ctx);
+ &cadet_test_run,
+ ctx);
}
/* end of cadet_test_lib.c */
diff --git a/src/cadet/test_cadet_flow.c b/src/cadet/test_cadet_flow.c
new file mode 100644
index 000000000..554ee1d85
--- /dev/null
+++ b/src/cadet/test_cadet_flow.c
@@ -0,0 +1,888 @@
+/*
+ This file is part of GNUnet.
+ Copyright (C) 2011, 2017 GNUnet e.V.
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
+*/
+/**
+ * @file cadet/test_cadet_flow.c
+ * @author Bart Polot
+ * @author Christian Grothoff
+ * @brief Test for flow control of CADET service
+ */
+#include <stdio.h>
+#include "platform.h"
+#include "cadet_test_lib.h"
+#include "gnunet_cadet_service.h"
+#include "gnunet_statistics_service.h"
+#include <gauger.h>
+
+
+/**
+ * Ugly workaround to unify data handlers on incoming and outgoing channels.
+ */
+struct CadetTestChannelWrapper
+{
+ /**
+ * Channel pointer.
+ */
+ struct GNUNET_CADET_Channel *ch;
+};
+
+/**
+ * How many messages to send by default.
+ */
+#define TOTAL_PACKETS_DEFAULT 500
+
+/**
+ * How long until we give up on connecting the peers?
+ */
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
+
+/**
+ * Time to wait by default for stuff that should be rather fast.
+ */
+#define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20)
+
+/**
+ * How fast do we send messages?
+ */
+#define SEND_INTERVAL GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_MILLISECONDS, 10)
+
+
+/**
+ * How many packets to send.
+ */
+static unsigned int total_packets = TOTAL_PACKETS_DEFAULT;
+
+/**
+ * Time to wait for fast operations.
+ */
+static struct GNUNET_TIME_Relative short_time;
+
+/**
+ * Size of each test packet's payload
+ */
+static size_t size_payload = sizeof (uint32_t);
+
+/**
+ * Operation to get peer ids.
+ */
+static struct GNUNET_TESTBED_Operation *t_op[2];
+
+/**
+ * Peer ids.
+ */
+static struct GNUNET_PeerIdentity *p_id[2];
+
+/**
+ * Port ID
+ */
+static struct GNUNET_HashCode port;
+
+/**
+ * Peer ids counter.
+ */
+static unsigned int p_ids;
+
+/**
+ * Is the setup initialized?
+ */
+static int initialized;
+
+/**
+ * Number of payload packes sent.
+ */
+static int data_sent;
+
+/**
+ * Number of payload packets received.
+ */
+static int data_received;
+
+/**
+ * Number of payload packed acknowledgements sent.
+ */
+static int ack_sent;
+
+/**
+ * Number of payload packed explicitly (app level) acknowledged.
+ */
+static int ack_received;
+
+/**
+ * Total number of peers asked to run.
+ */
+static unsigned int peers_requested = 2;
+
+/**
+ * Number of currently running peers (should be same as @c peers_requested).
+ */
+static unsigned int peers_running;
+
+/**
+ * Test context (to shut down).
+ */
+struct GNUNET_CADET_TEST_Context *test_ctx;
+
+/**
+ * Task called to disconnect peers.
+ */
+static struct GNUNET_SCHEDULER_Task *disconnect_task;
+
+/**
+ * Task To perform tests
+ */
+static struct GNUNET_SCHEDULER_Task *test_task;
+
+/**
+ * Task runnining #send_next_msg().
+ */
+static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
+
+/**
+ * Cadet handle for the root peer
+ */
+static struct GNUNET_CADET_Handle *h1;
+
+/**
+ * Cadet handle for the first leaf peer
+ */
+static struct GNUNET_CADET_Handle *h2;
+
+/**
+ * Channel handle for the root peer
+ */
+static struct GNUNET_CADET_Channel *outgoing_ch;
+
+/**
+ * Channel handle for the dest peer
+ */
+static struct GNUNET_CADET_Channel *incoming_ch;
+
+/**
+ * Time we started the data transmission (after channel has been established
+ * and initilized).
+ */
+static struct GNUNET_TIME_Absolute start_time;
+
+/**
+ * Peers handle.
+ */
+static struct GNUNET_TESTBED_Peer **testbed_peers;
+
+/**
+ * Statistics operation handle.
+ */
+static struct GNUNET_TESTBED_Operation *stats_op;
+
+/**
+ * Keepalives sent.
+ */
+static unsigned int ka_sent;
+
+/**
+ * Keepalives received.
+ */
+static unsigned int ka_received;
+
+/**
+ * How many messages were dropped by CADET because of full buffers?
+ */
+static unsigned int msg_dropped;
+
+
+/**
+ * Show the results of the test (banwidth acheived) and log them to GAUGER
+ */
+static void
+show_end_data (void)
+{
+ static struct GNUNET_TIME_Absolute end_time;
+ static struct GNUNET_TIME_Relative total_time;
+
+ end_time = GNUNET_TIME_absolute_get ();
+ total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
+ FPRINTF (stderr,
+ "\nResults of test \"%s\"\n",
+ test_name);
+ FPRINTF (stderr,
+ "Test time %s\n",
+ GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
+ FPRINTF (stderr,
+ "Test bandwidth: %f kb/s\n",
+ 4 * total_packets * 1.0 / (total_time.rel_value_us / 1000)); //
4bytes * ms
+ FPRINTF (stderr,
+ "Test throughput: %f packets/s\n\n",
+ total_packets * 1000.0 / (total_time.rel_value_us / 1000)); //
packets * ms
+ GAUGER ("CADET",
+ test_name,
+ total_packets * 1000.0 / (total_time.rel_value_us / 1000),
+ "packets/s");
+}
+
+
+/**
+ * Shut down peergroup, clean up.
+ *
+ * @param cls Closure (unused).
+ * @param tc Task Context.
+ */
+static void
+shutdown_task (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ending test.\n");
+ if (NULL != send_next_msg_task)
+ {
+ GNUNET_SCHEDULER_cancel (send_next_msg_task);
+ send_next_msg_task = NULL;
+ }
+ if (NULL != test_task)
+ {
+ GNUNET_SCHEDULER_cancel (test_task);
+ test_task = NULL;
+ }
+ for (unsigned int i = 0; i < 2; i++)
+ GNUNET_TESTBED_operation_done (t_op[i]);
+ if (NULL != outgoing_ch)
+ {
+ GNUNET_CADET_channel_destroy (outgoing_ch);
+ outgoing_ch = NULL;
+ }
+ if (NULL != incoming_ch)
+ {
+ GNUNET_CADET_channel_destroy (incoming_ch);
+ incoming_ch = NULL;
+ }
+ GNUNET_CADET_TEST_cleanup (test_ctx);
+}
+
+
+/**
+ * Stats callback. Finish the stats testbed operation and when all stats have
+ * been iterated, shutdown the test.
+ *
+ * @param cls Closure (line number from which termination was requested).
+ * @param op the operation that has been finished
+ * @param emsg error message in case the operation has failed; will be NULL if
+ * operation has executed successfully.
+ */
+static void
+stats_cont (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ const char *emsg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "KA sent: %u, KA received: %u\n",
+ ka_sent,
+ ka_received);
+ if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1)))
+ {
+ GNUNET_break (0);
+ ok--;
+ }
+ GNUNET_TESTBED_operation_done (stats_op);
+
+ if (NULL != disconnect_task)
+ GNUNET_SCHEDULER_cancel (disconnect_task);
+ disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
+ cls);
+}
+
+
+/**
+ * Process statistic values.
+ *
+ * @param cls closure (line number, unused)
+ * @param peer the peer the statistic belong to
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if
not
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
+ */
+static int
+stats_iterator (void *cls,
+ const struct GNUNET_TESTBED_Peer *peer,
+ const char *subsystem,
+ const char *name,
+ uint64_t value,
+ int is_persistent)
+{
+ static const char *s_sent = "# keepalives sent";
+ static const char *s_recv = "# keepalives received";
+ static const char *rdrops = "# messages dropped due to full buffer";
+ static const char *cdrops = "# messages dropped due to slow client";
+ uint32_t i;
+
+ i = GNUNET_TESTBED_get_index (peer);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
+ subsystem, name, (unsigned long long) value);
+ if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
+ ka_sent = value;
+ if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
+ ka_received = value;
+ if (0 == strncmp (rdrops, name, strlen (rdrops)))
+ msg_dropped += value;
+ if (0 == strncmp (cdrops, name, strlen (cdrops)))
+ msg_dropped += value;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Task to gather all statistics.
+ *
+ * @param cls Closure (line from which the task was scheduled).
+ */
+static void
+gather_stats_and_exit (void *cls)
+{
+ long l = (long) cls;
+
+ disconnect_task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "gathering statistics from line %ld\n",
+ l);
+ if (NULL != outgoing_ch)
+ {
+ GNUNET_CADET_channel_destroy (outgoing_ch);
+ outgoing_ch = NULL;
+ }
+ stats_op = GNUNET_TESTBED_get_statistics (peers_running,
+ testbed_peers,
+ "cadet",
+ NULL,
+ &stats_iterator,
+ stats_cont,
+ cls);
+}
+
+
+/**
+ * Abort test: schedule disconnect and shutdown immediately
+ *
+ * @param line Line in the code the abort is requested from (__LINE__).
+ */
+static void
+abort_test (long line)
+{
+ if (NULL != disconnect_task)
+ {
+ GNUNET_SCHEDULER_cancel (disconnect_task);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Aborting test from %ld\n",
+ line);
+ disconnect_task =
+ GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
+ (void *) line);
+ }
+}
+
+
+/**
+ * Send a message on the channel with the appropriate size and payload.
+ *
+ * Update the appropriate *_sent counter.
+ *
+ * @param channel Channel to send the message on.
+ */
+static void
+send_test_message (struct GNUNET_CADET_Channel *channel)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *msg;
+ uint32_t *data;
+ int payload;
+ int size;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending test message on channel %p\n",
+ channel);
+ size = size_payload;
+ if (GNUNET_NO == initialized)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
+ size += 1000;
+ payload = data_sent;
+ if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
+ data_sent++;
+ }
+ else if (SPEED == test || SPEED_ACK == test)
+ {
+ if (get_target_channel() == channel)
+ {
+ payload = ack_sent;
+ size += ack_sent;
+ ack_sent++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending ACK %u [%d bytes]\n",
+ payload, size);
+ }
+ else
+ {
+ payload = data_sent;
+ size += data_sent;
+ data_sent++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending DATA %u [%d bytes]\n",
+ data_sent, size);
+ }
+ }
+ else if (FORWARD == test)
+ {
+ payload = ack_sent;
+ }
+ else if (P2P_SIGNAL == test)
+ {
+ payload = data_sent;
+ }
+ else
+ {
+ GNUNET_assert (0);
+ }
+ env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
+
+ data = (uint32_t *) &msg[1];
+ *data = htonl (payload);
+ GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
+}
+
+
+/**
+ * Task to request a new data transmission in a SPEED test, without waiting
+ * for previous messages to be sent/arrrive.
+ *
+ * @param cls Closure (unused).
+ */
+static void
+send_next_msg (void *cls)
+{
+ struct GNUNET_CADET_Channel *channel;
+
+ send_next_msg_task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending next message: %d\n",
+ data_sent);
+
+ channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
+ GNUNET_assert (NULL != channel);
+ GNUNET_assert (SPEED == test);
+ send_test_message (channel);
+ if (data_sent < total_packets)
+ {
+ /* SPEED test: Send all messages as soon as possible */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Scheduling message %d\n",
+ data_sent + 1);
+ send_next_msg_task =
+ GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL,
+ &send_next_msg,
+ NULL);
+ }
+}
+
+
+/**
+ * Check if payload is sane (size contains payload).
+ *
+ * @param cls should match #ch
+ * @param message The actual message.
+ * @return #GNUNET_OK to keep the channel open,
+ * #GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+check_data (void *cls,
+ const struct GNUNET_MessageHeader *message)
+{
+ return GNUNET_OK; /* all is well-formed */
+}
+
+
+/**
+ * Function is called whenever a message is received.
+ *
+ * @param cls closure (set from GNUNET_CADET_connect(), peer number)
+ * @param message the actual message
+ */
+static void
+handle_data (void *cls,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct CadetTestChannelWrapper *ch = cls;
+ struct GNUNET_CADET_Channel *channel = ch->ch;
+ uint32_t *data;
+ uint32_t payload;
+ int *counter;
+
+ GNUNET_CADET_receive_done (channel);
+ counter = get_target_channel () == channel ? &data_received : &ack_received;
+ if (channel == outgoing_ch)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Root client got a message.\n");
+ }
+ else if (channel == incoming_ch)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Leaf client got a message.\n");
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Unknown channel %p.\n",
+ channel);
+ GNUNET_assert (0);
+ }
+
+ data = (uint32_t *) &message[1];
+ payload = ntohl (*data);
+ if (payload == *counter)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Payload as expected: %u\n",
+ payload);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Received payload %u, expected: %u\n",
+ payload, *counter);
+ }
+ (*counter)++;
+ if (get_target_channel () == channel) /* Got "data" */
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ " received data %u\n",
+ data_received);
+ if (data_received < total_packets)
+ return;
+ }
+ else /* Got "ack" */
+ {
+ if (SPEED_ACK == test || SPEED == test)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
+ /* Send more data */
+ send_test_message (channel);
+ if (ack_received < total_packets && SPEED != test)
+ return;
+ if (ok == 2 && SPEED == test)
+ return;
+ show_end_data ();
+ }
+ if (test == P2P_SIGNAL)
+ {
+ GNUNET_CADET_channel_destroy (incoming_ch);
+ incoming_ch = NULL;
+ }
+ else
+ {
+ GNUNET_CADET_channel_destroy (outgoing_ch);
+ outgoing_ch = NULL;
+ }
+ }
+}
+
+
+/**
+ * Method called whenever a peer connects to a port in MQ-based CADET.
+ *
+ * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
+ * @param channel New handle to the channel.
+ * @param source Peer that started this channel.
+ * @return Closure for the incoming @a channel. It's given to:
+ * - The #GNUNET_CADET_DisconnectEventHandler (given to
+ * #GNUNET_CADET_open_port) when the channel dies.
+ * - Each the #GNUNET_MQ_MessageCallback handlers for each message
+ * received on the @a channel.
+ */
+static void *
+connect_handler (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ const struct GNUNET_PeerIdentity *source)
+{
+ struct CadetTestChannelWrapper *ch;
+ long peer = (long) cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Incoming channel from %s to %ld: %p\n",
+ GNUNET_i2s (source),
+ peer,
+ channel);
+ if (peer == peers_requested - 1)
+ {
+ if (NULL != incoming_ch)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Duplicate incoming channel for client %lu\n",
+ (long) cls);
+ GNUNET_assert (0);
+ }
+ incoming_ch = channel;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Incoming channel for unexpected peer #%lu\n",
+ (long) cls);
+ GNUNET_assert (0);
+ }
+ ch = GNUNET_new (struct CadetTestChannelWrapper);
+ ch->ch = channel;
+
+ return ch;
+}
+
+
+/**
+ * Function called whenever an MQ-channel is destroyed, even if the destruction
+ * was requested by #GNUNET_CADET_channel_destroy.
+ * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
+ *
+ * It should clean up any associated state, including cancelling any pending
+ * transmission on this channel.
+ *
+ * @param cls Channel closure (channel wrapper).
+ * @param channel Connection to the other end (henceforth invalid).
+ */
+static void
+disconnect_handler (void *cls,
+ const struct GNUNET_CADET_Channel *channel)
+{
+ struct CadetTestChannelWrapper *ch_w = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Channel disconnected at %d\n",
+ ok);
+ GNUNET_assert (ch_w->ch == channel);
+ if (channel == incoming_ch)
+ incoming_ch = NULL;
+ else if (outgoing_ch == channel)
+ outgoing_ch = NULL;
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Disconnect on unknown channel %p\n",
+ channel);
+ if (NULL != disconnect_task)
+ GNUNET_SCHEDULER_cancel (disconnect_task);
+ disconnect_task = GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
+ (void *) __LINE__);
+ GNUNET_free (ch_w);
+}
+
+
+/**
+ * Start the testcase, we know the peers and have handles to CADET.
+ *
+ * Testcase continues when the root receives confirmation of connected peers,
+ * on callback function ch.
+ *
+ * @param cls Closure (unused).
+ */
+static void
+start_test (void *cls)
+{
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (data,
+ GNUNET_MESSAGE_TYPE_DUMMY,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_handler_end ()
+ };
+ struct CadetTestChannelWrapper *ch;
+ enum GNUNET_CADET_ChannelOption flags;
+
+ test_task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "In start_test\n");
+ start_time = GNUNET_TIME_absolute_get ();
+ ch = GNUNET_new (struct CadetTestChannelWrapper);
+ outgoing_ch = GNUNET_CADET_channel_create (h1,
+ ch,
+ p_id[1],
+ &port,
+ flags,
+ NULL,
+ &disconnect_handler,
+ handlers);
+ ch->ch = outgoing_ch;
+ GNUNET_assert (NULL == disconnect_task);
+ disconnect_task
+ = GNUNET_SCHEDULER_add_delayed (short_time,
+ &gather_stats_and_exit,
+ (void *) __LINE__);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending data initializer on channel %p...\n",
+ outgoing_ch);
+ send_test_message (outgoing_ch);
+}
+
+
+/**
+ * Callback to be called when the requested peer information is available
+ *
+ * @param cls the closure from GNUNET_TESTBED_peer_get_information()
+ * @param op the operation this callback corresponds to
+ * @param pinfo the result; will be NULL if the operation has failed
+ * @param emsg error message if the operation has failed;
+ * NULL if the operation is successfull
+ */
+static void
+pi_cb (void *cls,
+ struct GNUNET_TESTBED_Operation *op,
+ const struct GNUNET_TESTBED_PeerInformation *pinfo,
+ const char *emsg)
+{
+ long i = (long) cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "ID callback for %ld\n",
+ i);
+ if ( (NULL == pinfo) ||
+ (NULL != emsg) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "pi_cb: %s\n",
+ emsg);
+ abort_test (__LINE__);
+ return;
+ }
+ p_id[i] = pinfo->result.id;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "id: %s\n",
+ GNUNET_i2s (p_id[i]));
+ p_ids++;
+ if (p_ids < 2)
+ return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Got all IDs, starting test\n");
+ test_task = GNUNET_SCHEDULER_add_now (&start_test,
+ NULL);
+}
+
+
+/**
+ * test main: start test when all peers are connected
+ *
+ * @param cls Closure.
+ * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
+ * @param num_peers Number of peers that are running.
+ * @param peers Array of peers.
+ * @param cadets Handle to each of the CADETs of the peers.
+ */
+static void
+tmain (void *cls,
+ struct GNUNET_CADET_TEST_Context *ctx,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers,
+ struct GNUNET_CADET_Handle **cadets)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "test main\n");
+ test_ctx = ctx;
+ peers_running = num_peers;
+ GNUNET_assert (peers_running == peers_requested);
+ testbed_peers = peers;
+ h1 = cadets[0];
+ h2 = cadets[num_peers - 1];
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ NULL);
+ p_ids = 0;
+ t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &pi_cb,
+ (void *) 0L);
+ t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &pi_cb,
+ (void *) 1L);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "requested peer ids\n");
+}
+
+
+/**
+ * Main: start test
+ */
+int
+main (int argc,
+ char *argv[])
+{
+ static const struct GNUNET_HashCode *ports[2];
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (data,
+ GNUNET_MESSAGE_TYPE_DUMMY,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_handler_end ()
+ };
+ const char *config_file = "test_cadet.conf";
+ char port_id[] = "test port";
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_relative_time ('t',
+ "time",
+ "short_time",
+ gettext_noop ("set short timeout"),
+ &short_time),
+ GNUNET_GETOPT_option_uint ('m',
+ "messages",
+ "NUM_MESSAGES",
+ gettext_noop ("set number of messages to send"),
+ &total_packets),
+ GNUNET_GETOPT_option_uint ('p',
+ "peers",
+ "NUM_PEERS",
+ gettext_noop ("number of peers to launch"),
+ &peers_requested),
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ GNUNET_log_setup ("test-cadet-flow",
+ "DEBUG",
+ NULL);
+ total_packets = TOTAL_PACKETS;
+ short_time = SHORT_TIME;
+ if (-1 == GNUNET_GETOPT_run (argv[0],
+ options,
+ argc,
+ argv))
+ {
+ FPRINTF (stderr,
+ "test failed: problem with CLI parameters\n");
+ return 1;
+ }
+ GNUNET_CRYPTO_hash (port_id,
+ sizeof (port_id),
+ &port);
+ ports[0] = &port;
+ ports[1] = NULL;
+ GNUNET_CADET_TEST_ruN ("test_cadet_flow",
+ config_file,
+ peers_requested,
+ &tmain,
+ NULL, /* tmain cls */
+ &connect_handler,
+ NULL,
+ &disconnect_handler,
+ handlers,
+ ports);
+ return 0;
+}
+
+/* end of test_cadet_flow.c */
diff --git a/src/cadet/test_cadet_local_mq.c b/src/cadet/test_cadet_local_mq.c
index eab35c217..2ea754743 100644
--- a/src/cadet/test_cadet_local_mq.c
+++ b/src/cadet/test_cadet_local_mq.c
@@ -150,7 +150,8 @@ connected (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"connected %s, cls: %p\n",
- GNUNET_i2s(source), cls);
+ GNUNET_i2s(source),
+ cls);
return channel;
}
@@ -208,7 +209,8 @@ handle_data_received (void *cls,
static void
message_sent (void *cls)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "message sent\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "message sent\n");
}
--
To stop receiving notification emails like this one, please contact
address@hidden