[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r22063 - in gnunet/src: include stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r22063 - in gnunet/src: include stream |
Date: |
Sun, 17 Jun 2012 19:56:45 +0200 |
Author: harsha
Date: 2012-06-17 19:56:45 +0200 (Sun, 17 Jun 2012)
New Revision: 22063
Modified:
gnunet/src/include/gnunet_stream_lib.h
gnunet/src/stream/Makefile.am
gnunet/src/stream/stream_api.c
gnunet/src/stream/test_stream_2peers.c
gnunet/src/stream/test_stream_2peers_halfclose.c
gnunet/src/stream/test_stream_big.c
gnunet/src/stream/test_stream_local.c
gnunet/src/stream/test_stream_local.conf
gnunet/src/stream/test_stream_sequence_wraparound.c
Log:
-stream uses locks from lockmanager
Modified: gnunet/src/include/gnunet_stream_lib.h
===================================================================
--- gnunet/src/include/gnunet_stream_lib.h 2012-06-17 14:47:11 UTC (rev
22062)
+++ gnunet/src/include/gnunet_stream_lib.h 2012-06-17 17:56:45 UTC (rev
22063)
@@ -192,9 +192,9 @@
* Functions of this type are called upon new stream connection from other
peers
*
* @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream
+ * @param socket the socket representing the stream; NULL on binding error
* @param initiator the identity of the peer who wants to establish a stream
- * with us
+ * with us; NULL on binding error
* @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
* stream (the socket will be invalid after the call)
*/
@@ -213,7 +213,12 @@
* Listens for stream connections for a specific application ports
*
* @param cfg the configuration to use
- * @param app_port the application port for which new streams will be accepted
+ *
+ * @param app_port the application port for which new streams will be
+ * accepted. If another stream is listening on the same port the
+ * listen_cb will be called to signal binding error and the returned
+ * ListenSocket will be invalidated.
+ *
* @param listen_cb this function will be called when a peer tries to establish
* a stream with us
* @param listen_cb_cls closure for listen_cb
Modified: gnunet/src/stream/Makefile.am
===================================================================
--- gnunet/src/stream/Makefile.am 2012-06-17 14:47:11 UTC (rev 22062)
+++ gnunet/src/stream/Makefile.am 2012-06-17 17:56:45 UTC (rev 22063)
@@ -15,6 +15,7 @@
stream_api.c stream_protocol.h
libgnunetstream_la_LIBADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/lockmanager/libgnunetlockmanager.la \
$(top_builddir)/src/util/libgnunetutil.la $(XLIB)
libgnunetstream_la_LDFLAGS = \
$(GN_LIB_LDFLAGS)
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-06-17 14:47:11 UTC (rev 22062)
+++ gnunet/src/stream/stream_api.c 2012-06-17 17:56:45 UTC (rev 22063)
@@ -38,12 +38,16 @@
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_crypto_lib.h"
+#include "gnunet_lockmanager_service.h"
#include "gnunet_stream_lib.h"
#include "stream_protocol.h"
#define LOG(kind,...) \
GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* The maximum packet size of a stream packet
*/
@@ -370,6 +374,21 @@
struct GNUNET_MESH_Handle *mesh;
/**
+ * Our configuration
+ */
+ struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Handle to the lock manager service
+ */
+ struct GNUNET_LOCKMANAGER_Handle *lockmanager;
+
+ /**
+ * The active LockingRequest from lockmanager
+ */
+ struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
+
+ /**
* The callback function which is called after successful opening socket
*/
GNUNET_STREAM_ListenCallback listen_cb;
@@ -381,9 +400,13 @@
/**
* The service port
- * FIXME: Remove if not required!
*/
GNUNET_MESH_ApplicationType port;
+
+ /**
+ * The id of the lockmanager timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
/**
* The retransmit timeout
@@ -391,6 +414,11 @@
struct GNUNET_TIME_Relative retransmit_timeout;
/**
+ * Listen enabled?
+ */
+ int listening;
+
+ /**
* Whether testing mode is active or not
*/
int testing_active;
@@ -492,9 +520,14 @@
/**
* Default value in seconds for various timeouts
*/
-static unsigned int default_timeout = 10;
+static const unsigned int default_timeout = 10;
+/**
+ * The domain name for locks we use here
+ */
+static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
+
/**
* Callback function for sending queued message
*
@@ -2690,6 +2723,15 @@
/* FIXME: If a tunnel is already created, we should not accept new tunnels
from the same peer again until the socket is closed */
+ if (GNUNET_NO == lsocket->listening)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Destroying tunnel from peer %s as we don't have the lock\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ GNUNET_MESH_tunnel_destroy (tunnel);
+ return NULL;
+ }
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *initiator;
socket->tunnel = tunnel;
@@ -2771,6 +2813,71 @@
}
+/**
+ * Callback to signal timeout on lockmanager lock acquire
+ *
+ * @param cls the ListenSocket
+ * @param tc the scheduler task context
+ */
+static void
+lockmanager_acquire_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+ GNUNET_STREAM_ListenCallback listen_cb;
+ void *listen_cb_cls;
+
+ lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ listen_cb = lsocket->listen_cb;
+ listen_cb_cls = lsocket->listen_cb_cls;
+ GNUNET_STREAM_listen_close (lsocket);
+ if (NULL != listen_cb)
+ listen_cb (listen_cb_cls, NULL, NULL);
+}
+
+
+/**
+ * Callback to notify us on the status changes on app_port lock
+ *
+ * @param cls the ListenSocket
+ * @param domain the domain name of the lock
+ * @param lock the app_port
+ * @param status the current status of the lock
+ */
+static void
+lock_status_change_cb (void *cls, const char *domain_name, uint32_t lock,
+ enum GNUNET_LOCKMANAGER_Status status)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+
+ GNUNET_assert (lock == (uint32_t) lsocket->port);
+ if (GNUNET_LOCKMANAGER_SUCCESS == status)
+ {
+ lsocket->listening = GNUNET_YES;
+ if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
+ lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL == lsocket->mesh)
+ {
+ GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
+
+ lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
+ RECEIVE_BUFFER_SIZE, /* FIXME:
QUEUE size as parameter? */
+ lsocket, /* Closure */
+ &new_tunnel_notify,
+ &tunnel_cleaner,
+ server_message_handlers,
+ ports);
+ GNUNET_assert (NULL != lsocket->mesh);
+ }
+ }
+ if (GNUNET_LOCKMANAGER_RELEASE == status)
+ lsocket->listening = GNUNET_NO;
+}
+
+
/*****************/
/* API functions */
/*****************/
@@ -3070,11 +3177,19 @@
{
/* FIXME: Add variable args for passing configration options? */
struct GNUNET_STREAM_ListenSocket *lsocket;
- GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
enum GNUNET_STREAM_Option option;
va_list vargs;
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
+ lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
+ lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
+ if (NULL == lsocket->lockmanager)
+ {
+ GNUNET_CONFIGURATION_destroy (lsocket->cfg);
+ GNUNET_free (lsocket);
+ return NULL;
+ }
+ lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port
*/
/* Set defaults */
lsocket->retransmit_timeout =
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
@@ -3101,14 +3216,13 @@
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;
lsocket->listen_cb_cls = listen_cb_cls;
- lsocket->mesh = GNUNET_MESH_connect (cfg,
- RECEIVE_BUFFER_SIZE, /* FIXME: QUEUE
size as parameter? */
- lsocket, /* Closure */
- &new_tunnel_notify,
- &tunnel_cleaner,
- server_message_handlers,
- ports);
- GNUNET_assert (NULL != lsocket->mesh);
+ lsocket->locking_request =
+ GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
+ (uint32_t) lsocket->port,
+ &lock_status_change_cb, lsocket);
+ lsocket->lockmanager_acquire_timeout_task =
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS (20),
+ &lockmanager_acquire_timeout, lsocket);
return lsocket;
}
@@ -3122,9 +3236,15 @@
GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
{
/* Close MESH connection */
- GNUNET_assert (NULL != lsocket->mesh);
- GNUNET_MESH_disconnect (lsocket->mesh);
-
+ if (NULL != lsocket->mesh)
+ GNUNET_MESH_disconnect (lsocket->mesh);
+ GNUNET_CONFIGURATION_destroy (lsocket->cfg);
+ if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
+ GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
+ if (NULL != lsocket->locking_request)
+ GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
+ if (NULL != lsocket->lockmanager)
+ GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
GNUNET_free (lsocket);
}
Modified: gnunet/src/stream/test_stream_2peers.c
===================================================================
--- gnunet/src/stream/test_stream_2peers.c 2012-06-17 14:47:11 UTC (rev
22062)
+++ gnunet/src/stream/test_stream_2peers.c 2012-06-17 17:56:45 UTC (rev
22063)
@@ -39,6 +39,9 @@
*/
#define NUM_PEERS 2
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
*/
@@ -433,6 +436,26 @@
/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ /* Connect to stream library */
+ peer1.socket = GNUNET_STREAM_open (d1->cfg,
+ &d2->id, /* Null for local peer?
*/
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
+/**
* Callback to be called when testing peer group is ready
*
* @param cls NULL
@@ -477,15 +500,7 @@
NULL,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (d1->cfg,
- &d2->id, /* Null for local peer?
*/
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, NULL);
}
Modified: gnunet/src/stream/test_stream_2peers_halfclose.c
===================================================================
--- gnunet/src/stream/test_stream_2peers_halfclose.c 2012-06-17 14:47:11 UTC
(rev 22062)
+++ gnunet/src/stream/test_stream_2peers_halfclose.c 2012-06-17 17:56:45 UTC
(rev 22063)
@@ -40,6 +40,9 @@
*/
#define NUM_PEERS 2
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
*/
@@ -659,6 +662,26 @@
/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ /* Connect to stream library */
+ peer1.socket = GNUNET_STREAM_open (d1->cfg,
+ &d2->id, /* Null for local peer?
*/
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
+/**
* Callback to be called when testing peer group is ready
*
* @param cls NULL
@@ -702,15 +725,7 @@
&stream_listen_cb,
NULL);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (d1->cfg,
- &d2->id, /* Null for local peer?
*/
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, NULL);
}
Modified: gnunet/src/stream/test_stream_big.c
===================================================================
--- gnunet/src/stream/test_stream_big.c 2012-06-17 14:47:11 UTC (rev 22062)
+++ gnunet/src/stream/test_stream_big.c 2012-06-17 17:56:45 UTC (rev 22063)
@@ -34,6 +34,10 @@
#define LOG(kind, ...) \
GNUNET_log (kind, __VA_ARGS__);
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+
/**
* Structure for holding peer's sockets and IO Handles
*/
@@ -327,6 +331,32 @@
/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+ struct GNUNET_PeerIdentity self;
+
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+ &self));
+ /* Connect to stream */
+ peer->socket = GNUNET_STREAM_open (config,
+ &self, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer->socket);
+}
+
+
+/**
* Testing function
*
* @param cls NULL
@@ -348,15 +378,7 @@
NULL,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (config,
- &self, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- (void *) &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1);
}
@@ -381,8 +403,7 @@
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
-
- test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
+ test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(3), &test, NULL);
}
/**
Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c 2012-06-17 14:47:11 UTC (rev
22062)
+++ gnunet/src/stream/test_stream_local.c 2012-06-17 17:56:45 UTC (rev
22063)
@@ -34,6 +34,10 @@
#define VERBOSE 1
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+
/**
* Structure for holding peer's sockets and IO Handles
*/
@@ -355,6 +359,32 @@
/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+ struct GNUNET_PeerIdentity self;
+
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
+ &self));
+ /* Connect to stream library */
+ peer->socket = GNUNET_STREAM_open (config_peer1,
+ &self, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
+/**
* Testing function
*
* @param cls NULL
@@ -376,15 +406,7 @@
&peer2,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (config_peer1,
- &self, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1);
}
/**
@@ -417,7 +439,7 @@
(GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
- test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
+ test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &test, NULL);
}
/**
Modified: gnunet/src/stream/test_stream_local.conf
===================================================================
--- gnunet/src/stream/test_stream_local.conf 2012-06-17 14:47:11 UTC (rev
22062)
+++ gnunet/src/stream/test_stream_local.conf 2012-06-17 17:56:45 UTC (rev
22063)
@@ -1,3 +1,8 @@
+[lockmanager]
+AUTOSTART = NO
+ACCEPT_FROM = 127.0.0.1;
+HOSTNAME = localhost
+
[fs]
AUTOSTART = NO
@@ -44,7 +49,7 @@
PORT = 12092
[arm]
-DEFAULTSERVICES = core
+DEFAULTSERVICES = core lockmanager
PORT = 12366
DEBUG = NO
@@ -74,4 +79,4 @@
AUTOSTART = NO
[nse]
-AUTOSTART = NO
+AUTOSTART = NO
\ No newline at end of file
Modified: gnunet/src/stream/test_stream_sequence_wraparound.c
===================================================================
--- gnunet/src/stream/test_stream_sequence_wraparound.c 2012-06-17 14:47:11 UTC
(rev 22062)
+++ gnunet/src/stream/test_stream_sequence_wraparound.c 2012-06-17 17:56:45 UTC
(rev 22063)
@@ -34,6 +34,9 @@
#define LOG(kind, ...) \
GNUNET_log (kind, __VA_ARGS__);
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
*/
@@ -327,6 +330,35 @@
/**
+ * Task for connecting the peer to stream as client
+ *
+ * @param cls PeerData
+ * @param tc the TaskContext
+ */
+static void
+stream_connect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+ struct GNUNET_PeerIdentity self;
+
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+ &self));
+ /* Connect to stream */
+ peer->socket = GNUNET_STREAM_open (config,
+ &self, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+
GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER,
+ UINT32_MAX - GNUNET_CRYPTO_random_u32
+ (GNUNET_CRYPTO_QUALITY_WEAK, 64),
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer->socket);
+}
+
+
+/**
* Testing function
*
* @param cls NULL
@@ -348,18 +380,7 @@
NULL,
GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (config,
- &self, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- (void *) &peer1,
-
GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER,
- UINT32_MAX - GNUNET_CRYPTO_random_u32
- (GNUNET_CRYPTO_QUALITY_WEAK, 64),
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(2), &stream_connect, &peer1);
}
@@ -384,8 +405,7 @@
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
-
- test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
+ test_task = GNUNET_SCHEDULER_add_delayed (TIME_REL_SECS(3), &test, NULL);
}
/**
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r22063 - in gnunet/src: include stream,
gnunet <=