[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30003 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30003 - msh/src |
Date: |
Tue, 8 Oct 2013 19:33:28 +0200 |
Author: harsha
Date: 2013-10-08 19:33:28 +0200 (Tue, 08 Oct 2013)
New Revision: 30003
Added:
msh/src/mshd2.c
Modified:
msh/src/mshd-server.c
msh/src/mshd.c
msh/src/mshd_pmonitor.c
Log:
- fixes and new mshd2
Modified: msh/src/mshd-server.c
===================================================================
--- msh/src/mshd-server.c 2013-10-08 17:18:31 UTC (rev 30002)
+++ msh/src/mshd-server.c 2013-10-08 17:33:28 UTC (rev 30003)
@@ -118,10 +118,6 @@
*/
GNUNET_SCHEDULER_TaskIdentifier fin_task;
- /**
- * the file descriptor associated with the client connection
- */
- int conn_fd;
};
@@ -805,14 +801,12 @@
* @param conn the connection to derive the client from
*/
void
-daemon_server_add_connection (struct GNUNET_CONNECTION_Handle *conn,
- int conn_fd)
+daemon_server_add_connection (struct GNUNET_CONNECTION_Handle *conn)
{
struct ExecCtx *exec_ctx;
struct GNUNET_SERVER_Client *client;
exec_ctx = GNUNET_malloc (sizeof (struct ExecCtx));
- exec_ctx->conn_fd = conn_fd;
client = GNUNET_SERVER_connect_socket (daemon_serv, conn);
exec_ctx->client = client;
GNUNET_SERVER_client_set_user_context (client, exec_ctx);
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-10-08 17:18:31 UTC (rev 30002)
+++ msh/src/mshd.c 2013-10-08 17:33:28 UTC (rev 30003)
@@ -408,13 +408,11 @@
{
struct GNUNET_NETWORK_Handle *client_sock;
struct GNUNET_CONNECTION_Handle *client_conn;
- int client_sock_fd;
LOG_DEBUG ("Got a command execution connection\n");
client_sock = GNUNET_NETWORK_socket_accept (listen_socket, NULL, NULL);
- client_sock_fd = GNUNET_NETWORK_get_fd (client_sock);
client_conn = GNUNET_CONNECTION_create_from_existing (client_sock);
- daemon_server_add_connection (client_conn, client_sock_fd);
+ daemon_server_add_connection (client_conn);
}
break;
default:
Copied: msh/src/mshd2.c (from rev 29987, msh/src/mshd.c)
===================================================================
--- msh/src/mshd2.c (rev 0)
+++ msh/src/mshd2.c 2013-10-08 17:33:28 UTC (rev 30003)
@@ -0,0 +1,1287 @@
+/**
+ * @file mshd.c
+ * @brief implementation of the MSH Daemon
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#include "common.h"
+#include <gnunet/gnunet_util_lib.h>
+#include <mpi.h>
+#include "util.h"
+#include "mtypes.h"
+#include "bitmap.h"
+#include "addressmap.h"
+
+#define LOG(kind,...) \
+ GNUNET_log (kind, __VA_ARGS__)
+
+#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
+
+#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
+
+#define LOG_STRERROR(kind,cmd) \
+ GNUNET_log_from_strerror (kind, "mshd", cmd)
+
+/**
+ * Polling interval for checking termination signal
+ */
+#define POLL_SHUTDOWN_INTERVAL \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500)
+
+/**
+ * Context for verifying addresses
+ */
+struct VerifyAddressesCtx
+{
+ /**
+ * The DLL next ptr
+ */
+ struct VerifyAddressesCtx *next;
+
+ /**
+ * The DLL prev ptr
+ */
+ struct VerifyAddressesCtx *prev;
+
+ /**
+ * The instance addresses
+ */
+ struct InstanceAddrInfo *iainfo;
+
+ /**
+ * The connection handle to the received instance address
+ */
+ struct GNUNET_CONNECTION_Handle *conn;
+
+ /**
+ * The transmit handle for the above connection
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
+
+ /**
+ * task to close the connection
+ */
+ GNUNET_SCHEDULER_TaskIdentifier close_task;
+
+ /**
+ * state for the context
+ */
+ enum {
+ VERIFY_ADDRESS_CTX_WRITE,
+
+ VERIFY_ADDRESS_CTX_CLOSE
+ } state;
+
+ /**
+ * the ip address
+ */
+ in_addr_t ip;
+
+ /**
+ * the port number
+ */
+ uint16_t port;
+
+};
+
+
+/**
+ * Context information for reading from incoming connections
+ */
+struct ReadContext
+{
+ /**
+ * next pointer for DLL
+ */
+ struct ReadContext *next;
+
+ /**
+ * prev pointer for DLL
+ */
+ struct ReadContext *prev;
+
+ /**
+ * The connection
+ */
+ struct GNUNET_CONNECTION_Handle *conn;
+
+ /**
+ * are we waiting for a read on the above connection
+ */
+ int in_receive;
+};
+
+
+/**
+ * The mode of the current listen socket;
+ */
+enum ListenMode
+{
+ /**
+ * Mode in which the listen socket accepts connections from other instances
+ * and closes them immediately after reading some data. The incoming
+ * connections are used to verify which IP addresses of this instance are
+ * reachable from other instances
+ */
+ MODE_PROBE,
+
+ /**
+ * In this mode the listen socket accepts requests for starting remote
processes
+ */
+ MODE_SERV,
+
+ /**
+ * Simple worker mode. No listening is done.
+ */
+ MODE_WORKER,
+
+ /**
+ * Worker mode with protocol.
+ */
+ MODE_PROTOWORKER
+
+} mode;
+
+
+/**
+ * Mapping for instance addresses
+ */
+AddressMap *addrmap;
+
+/**
+ * Reverse mapping of the address map
+ */
+struct ReverseAddressMap *rmap;
+
+/**
+ * Rank of this process
+ */
+int rank;
+
+/**
+ * width of the round -- how many other mshd instances verify our IP addresses
+ * in a round
+ */
+unsigned int rwidth;
+
+/**
+ * The number of total mshd processes
+ */
+int nproc;
+
+
+/****************************/
+/* static variables */
+/****************************/
+
+/**
+ * DLL head for address verification contexts
+ */
+static struct VerifyAddressesCtx *vactx_head;
+
+/**
+ * DLL tail for address verification contexts
+ */
+static struct VerifyAddressesCtx *vactx_tail;
+
+/**
+ * Array of our IP addresses in network-byte format
+ */
+static in_addr_t *s_addrs;
+
+/**
+ * network handle for the listen socket
+ */
+static struct GNUNET_NETWORK_Handle *listen_socket;
+
+/**
+ * The process handle of the process started by instance running with rank 0
+ */
+static struct GNUNET_OS_Process *proc;
+
+/**
+ * Task for running a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier rtask;
+
+/**
+ * Task for asynchronous accept on the socket
+ */
+static GNUNET_SCHEDULER_TaskIdentifier atask;
+
+/**
+ * Task for finalising a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier finalise_task;
+
+/**
+ * Task for waiting for a shutdown signal
+ */
+static GNUNET_SCHEDULER_TaskIdentifier sigread_task;
+
+/**
+ * Bitmap for checking which MPI processes have verified our addresses in the
+ * current round
+ */
+static struct BitMap *bitmap;
+
+/**
+ * Instances addresses learnt in the current round
+ */
+struct InstanceAddrInfo **riainfos;
+
+/**
+ * head for read context DLL
+ */
+static struct ReadContext *rhead;
+
+/**
+ * tail for read context DLL
+ */
+static struct ReadContext *rtail;
+
+/**
+ * arguments representing the command to run and its arguments
+ */
+static char **run_args;
+
+/**
+ * the process handle for the command to run
+ */
+static struct GNUNET_OS_Process *process;
+
+/**
+ * The path of the unix domain socket we use for communication with local MSH
clients
+ */
+static char *unixpath;
+
+/**
+ * The file where the addresses of available hosts are written to
+ */
+static char *hostsfile;
+
+/**
+ * shutdown task
+ */
+GNUNET_SCHEDULER_TaskIdentifier shutdown_task;
+
+/**
+ * Shutdown polling task
+ */
+GNUNET_SCHEDULER_TaskIdentifier poll_shutdown_task;
+
+/**
+ * Random hashcode for authentication
+ */
+struct GNUNET_HashCode shash;
+
+/**
+ * Number of IP addresses
+ */
+static unsigned int nips;
+
+/**
+ * Current IP verification round
+ */
+static unsigned int current_round;
+
+/**
+ * Do we have to create a pty
+ */
+static int need_pty;
+
+/**
+ * The port number of our local socket
+ */
+uint16_t listen_port;
+
+
+/**
+ * Perform cleanup for shutdown
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+ switch (mode)
+ {
+ case MODE_PROBE:
+ break;
+ case MODE_SERV:
+ shutdown_local_server ();
+ MSH_pmonitor_shutdown ();
+ break;
+ case MODE_WORKER:
+ break;
+ case MODE_PROTOWORKER:
+ shutdown_daemon_server ();
+ break;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != accept_task)
+ {
+ GNUNET_SCHEDULER_cancel (accept_task);
+ accept_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL != listen_socket)
+ {
+ GNUNET_NETWORK_socket_close (listen_socket);
+ listen_socket = NULL;
+ }
+ if (NULL != bitmap)
+ {
+ bitmap_destroy (bitmap);
+ bitmap = NULL;
+ }
+ if (NULL != addrmap)
+ {
+ addressmap_destroy (addrmap);
+ addressmap = NULL;
+ }
+ if (NULL != rmap)
+ {
+ reverse_map_destroy (rmap);
+ rmap = NULL;
+ }
+ GNUNET_free_non_null (s_addrs);
+ s_addrs = NULL;
+ if (NULL != run_args)
+ {
+ free_argv (run_args);
+ run_args = NULL;
+ }
+ GNUNET_free_non_null (unixpath);
+ unixpath = NULL;
+ if (NULL != hostsfile)
+ {
+ (void) unlink (hostsfile);
+ GNUNET_free (hostsfile);
+ hostsfile = NULL;
+ }
+}
+
+
+/**
+ * Callback function invoked for each interface found.
+ *
+ * @param cls closure
+ * @param name name of the interface (can be NULL for unknown)
+ * @param isDefault is this presumably the default interface
+ * @param addr address of this interface (can be NULL for unknown or
unassigned)
+ * @param broadcast_addr the broadcast address (can be NULL for unknown or
unassigned)
+ * @param netmask the network mask (can be NULL for unknown or unassigned))
+ * @param addrlen length of the address
+ * @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
+ */
+static int net_if_processor (void *cls, const char *name,
+ int isDefault,
+ const struct sockaddr *addr,
+ const struct sockaddr *broadcast_addr,
+ const struct sockaddr *netmask,
+ socklen_t addrlen)
+{
+ char *hostip;
+ in_addr_t ip;
+ const struct sockaddr_in *inaddr;
+
+ if (sizeof (struct sockaddr_in) != addrlen)
+ return GNUNET_OK; /* Only consider IPv4 for now */
+ inaddr = (const struct sockaddr_in *) addr;
+ ip = ntohl (inaddr->sin_addr.s_addr);
+ if (127 == ip >> 24) /* ignore loopback addresses */
+ return GNUNET_OK;
+ GNUNET_array_append (s_addrs, nips, ip);
+ LOG_DEBUG ("%d: Found IP: %s\n", rank, ip2str (ip));
+ addressmap_add (addrmap, rank, listen_port, ip);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Callback function for data received from the network. Note that
+ * both "available" and "err" would be 0 if the read simply timed out.
+ *inaddr->sin_addr.s_addrinaddr->sin_addr.s_addr
+ * @param cls the read context
+ * @param buf pointer to received data
+ * @param available number of bytes availabe in "buf",
+ * possibly 0 (on errors)
+ * @param addr address of the sender
+ * @param addrlen size of addr
+ * @param errCode value of errno (on receiving errors)
+ */
+static void
+conn_reader(void *cls, const void *buf, size_t available,
+ const struct sockaddr * addr, socklen_t addrlen, int errCode)
+{
+ struct ReadContext *rc = cls;
+ uint32_t cid;
+
+ if (0 == available)
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ if ((NULL == buf) || (0 == available))
+ goto clo_ret;
+ (void) memcpy (&cid, buf, sizeof (uint32_t));
+ cid = ntohl (cid);
+ LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
+
+ clo_ret:
+ GNUNET_CONTAINER_DLL_remove (rhead, rtail, rc);
+ GNUNET_CONNECTION_destroy (rc->conn);
+ GNUNET_free (rc);
+}
+
+
+/**
+ * Fork a worker process. This process sets up a PTY if needed, forks a child
+ * which exec's the binary to start and manages the communication between the
+ * binary and network if given a network connection.
+ */
+static pid_t
+spawn_worker (int do_protocol)
+{
+ struct GNUNET_NETWORK_Handle *sock;
+ struct GNUNET_CONNECTION_Handle *conn;
+ pid_t ret;
+
+ ret = fork ();
+ if (0 != ret)
+ return ret;
+ /* Child process continues here */
+ if (do_protocol)
+ {
+ GNUNET_assert (MODE_SERV == mode);
+ GNUNET_assert (NULL != listen_socket);
+ sock = GNUNET_NETWORK_socket_accept (listen_socket, NULL, NULL);
+ conn = GNUNET_CONNECTION_create_from_existing (sock);
+ }
+ GNUNET_SCHEDULER_cancel (shutdown_task);
+ shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+ do_shutdown (NULL, NULL);
+ mode = MODE_WORKER;
+ if (do_protocol)
+ {
+ mode = MODE_PROTOWORKER;
+ init_daemon_server ();
+ daemon_server_add_connection (conn);
+ }
+ return 0;
+}
+
+
+/**
+ * Task to call accept and close on a listening socket
+ *
+ * @param cls NULL
+ * @param tc the scheduler task context
+ */
+static void
+accept_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct ReadContext *rctx;
+ struct GNUNET_CONNECTION_Handle *conn;
+ pid_t pid;
+ int csock;
+
+ atask = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ {
+ goto clo_ret;
+ }
+ switch (mode)
+ {
+ case MODE_PROBE:
+ LOG_DEBUG ("%d: Got a probe connect\n", rank);
+ conn = GNUNET_CONNECTION_create_from_accept (NULL, NULL, listen_socket);
+ if (NULL == conn)
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ rctx = GNUNET_malloc (sizeof (struct ReadContext));
+ rctx->conn = conn;
+ rctx->in_receive = GNUNET_YES;
+ GNUNET_CONNECTION_receive (rctx->conn, sizeof (unsigned int),
+ GNUNET_TIME_UNIT_FOREVER_REL, conn_reader,
rctx);
+ GNUNET_CONTAINER_DLL_insert_tail (rhead, rtail, rctx);
+ break;
+ case MODE_SERV:
+ pid = spawn_worker (NULL);
+ if (-1 == pid)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown (0);
+ goto clo_ret;
+ }
+ if (0 == pid) /* state is cleared and hence we return */
+ return;
+ break;
+ case MODE_WORKER:
+ case MODE_PROTOWORKER:
+ GNUNET_assert (0);
+ }
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+ return;
+
+ clo_ret:
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+ listen_socket = NULL;
+}
+
+
+/**
+ * Task to check if we received a shutdown signal through MPI message from
+ * instance 0. This task is to be run every 500ms
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+poll_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ MPI_Status status;
+ int flag;
+
+ poll_shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ flag = 0;
+ if (MPI_SUCCESS != MPI_Iprobe(0, MSH_MTYPE_SHUTDOWN, MPI_COMM_WORLD, &flag,
+ MPI_STATUS_IGNORE))
+ {
+ GNUNET_break (0);
+ goto reschedule;
+ }
+ if (0 == flag)
+ goto reschedule;
+ LOG_DEBUG ("Got termination signal. Shutting down\n");
+ GNUNET_SCHEDULER_shutdown (); /* We terminate */
+ return;
+
+ reschedule:
+ poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
+ &poll_shutdown, NULL);
+}
+
+
+/**
+ * Sends termination signal to all other instances through MPI messaging
+ */
+static void
+send_term_signal ()
+{
+ unsigned int cnt;
+ MPI_Request *req;
+
+ /* We broadcase termination signal. Can't use MPI_Bcast here... */
+ req = GNUNET_malloc (sizeof (MPI_Request) * (nproc - 1));
+ for (cnt = 1; cnt < nproc; cnt++)
+ {
+ GNUNET_assert (MPI_SUCCESS ==
+ MPI_Isend (&cnt, 1, MPI_INT, cnt, MSH_MTYPE_SHUTDOWN,
+ MPI_COMM_WORLD, &req[cnt - 1]));
+ }
+ GNUNET_assert (MPI_SUCCESS == MPI_Waitall (nproc - 1, req,
+ MPI_STATUSES_IGNORE));
+ GNUNET_free (req);
+}
+
+
+/**
+ * Callbacks of this type can be supplied to MSH_monitor_process() to be
+ * notified when the corresponding processes exits.
+ *
+ * @param cls the closure passed to MSH_monitor_process()
+ * @param type the process status type
+ * @param long the return/exit code of the process
+ */
+static void
+proc_exit_cb (void *cls, enum GNUNET_OS_ProcessStatusType type, int code)
+{
+ GNUNET_OS_process_destroy (proc);
+ proc = NULL;
+ LOG (GNUNET_ERROR_TYPE_INFO, "Main process died. Exiting.\n");
+ GNUNET_SCHEDULER_shutdown ();
+ send_term_signal ();
+}
+
+
+/**
+ * Task for running a round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedules next round. If all the rounds are completed, call the next
+ */
+static void
+schedule_next_round ()
+{
+ pid_t pid;
+ int total_rounds;
+
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
+ /* Number of rounds required to contact all processes except ourselves
(rwidth
+ in parallel in each round) */
+ total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+ if (current_round < total_rounds)
+ {
+ rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
+ return;
+ }
+ if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
+ GNUNET_break (GNUNET_OK == reduce_ntree ());
+ addressmap_print (addrmap);
+ rmap = addressmap_create_reverse_mapping (addrmap);
+ pid = getpid ();
+ GNUNET_assert (0 < asprintf (&unixpath, "%ju.sock", (intmax_t) pid));
+ setenv (MSHD_SOCK_NAME, unixpath, 1);
+ hostsfile = GNUNET_DISK_mktemp ("MSHD_HOSTS");
+ if (GNUNET_OK != addressmap_write_hosts (addrmap, hostsfile))
+ {
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ setenv (MSHD_HOSTSFILE, hostsfile, 1);
+ init_local_server (unixpath);
+ MSH_pmonitor_init ();
+ mode = MODE_SERV;
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == atask);
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+ if (0 == rank)
+ {
+ pid = spawn_worker (0);
+ if (-1 == pid)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown (0);
+ return;
+ }
+ if (0 != pid)
+ {
+ MSH_monitor_process_pid (proc, &proc_exit_cb, NULL);
+ goto end;
+ }
+ if (reverse_connect)
+ do_reverse_connect ();
+ if (need_pty)
+ create_pty ();
+ fork_and_exec (run_args[0]);
+ return;
+ }
+ end:
+ poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
+ &poll_shutdown, NULL);
+}
+
+
+/**
+ * Cleans up the address verification context
+ *
+ * @param ctx the context
+ */
+static void
+cleanup_verifyaddressctx (struct VerifyAddressesCtx *ctx)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != ctx->close_task)
+ GNUNET_SCHEDULER_cancel (ctx->close_task);
+ if (NULL != ctx->conn)
+ GNUNET_CONNECTION_destroy (ctx->conn);
+ GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
+ GNUNET_free (ctx);
+}
+
+
+/**
+ * Finalise a round by freeing the resources used by it, cancel the accept task
+ * and schedule next round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+finalise_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct VerifyAddressesCtx *ctx;
+ unsigned int cnt;
+
+ finalise_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (atask);
+ atask = GNUNET_SCHEDULER_NO_TASK;
+ while (NULL != (ctx = vactx_head))
+ {
+ cleanup_verifyaddressctx (ctx);
+ }
+ for (cnt = 0; cnt < rwidth; cnt++)
+ instance_address_info_destroy (riainfos[cnt]);
+ if (1 != bitmap_allset (bitmap))
+ {
+ LOG_ERROR ("Could not verify addresses of all hosts\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ current_round++;
+ schedule_next_round ();
+}
+
+
+/**
+ * Task for closing a connection
+ *
+ * @param cls the verify address context
+ * @param tc the scheduler task context
+ */
+static void
+conn_close_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct VerifyAddressesCtx *ctx = cls;
+ int lb;
+ int source;
+ int off;
+
+ ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
+ lb = rank - (current_round * rwidth) - rwidth + nproc;
+ GNUNET_assert (0 <= lb);
+ lb %= nproc;
+ source = instance_address_info_get_rank (ctx->iainfo);
+ if (lb <= source)
+ off = source - lb;
+ else
+ off = nproc - lb + source;
+ bitmap_set (bitmap, off, 1);
+ addressmap_add (addrmap, instance_address_info_get_rank (ctx->iainfo),
+ ctx->port, ctx->ip);
+ cleanup_verifyaddressctx (ctx);
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+conn_write_cb (void *cls, size_t size, void *buf)
+{
+ struct VerifyAddressesCtx *ctx = cls;
+ size_t rsize;
+ uint32_t rank_;
+
+ ctx->transmit_handle = NULL;
+ rsize = 0;
+ if ((NULL == buf) || (0 == size))
+ {
+ goto clo_ret;
+ }
+ if (size < sizeof (uint32_t))
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ switch (ctx->state)
+ {
+ case VERIFY_ADDRESS_CTX_WRITE:
+ rank_ = htonl (rank);
+ rsize = sizeof (uint32_t);
+ (void) memcpy (buf, &rank_, rsize);
+ ctx->transmit_handle =
+ GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &conn_write_cb, ctx);
+ ctx->state = VERIFY_ADDRESS_CTX_CLOSE;
+ return rsize;
+ case VERIFY_ADDRESS_CTX_CLOSE:
+ ctx->close_task = GNUNET_SCHEDULER_add_now (&conn_close_task, ctx);
+ GNUNET_CONNECTION_destroy (ctx->conn);
+ ctx->conn = NULL;
+ return 0;
+ default:
+ GNUNET_assert (0);
+ }
+
+ clo_ret:
+ cleanup_verifyaddressctx (ctx);
+ return size;
+}
+
+
+static unsigned int bmx;
+
+static int
+address_iterator_cb (void *cls, uint16_t port, in_addr_t ip)
+{
+ struct VerifyAddressesCtx *ctx;
+ struct InstanceAddrInfo *iainfo = cls;
+ struct sockaddr_in in_addr;;
+
+ LOG_DEBUG ("%d: \t %d Opening connection to: %s\n", rank, bmx++, ip2str
((uint32_t) ip) );
+ in_addr.sin_family = AF_INET;
+ in_addr.sin_port = htons (port);
+ in_addr.sin_addr.s_addr = htonl ((uint32_t) ip);
+ ctx = GNUNET_malloc (sizeof (struct VerifyAddressesCtx));
+ ctx->conn =
+ GNUNET_CONNECTION_create_from_sockaddr (AF_INET,
+ (const struct sockaddr *)
+ &in_addr,
+ sizeof (struct sockaddr_in));
+ if (NULL == ctx->conn)
+ {
+ GNUNET_break (0);
+ free (ctx);
+ return GNUNET_SYSERR;
+ }
+ ctx->port = port;
+ ctx->ip = ip;
+ ctx->iainfo = iainfo;
+ ctx->state = VERIFY_ADDRESS_CTX_WRITE;
+ GNUNET_CONTAINER_DLL_insert_tail (vactx_head, vactx_tail, ctx);
+ ctx->transmit_handle =
+ GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, sizeof (uint32_t),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &conn_write_cb, ctx);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Verify the addresses of an instance by connecting to the instance's listen
+ * socket
+ *
+ * @param iainfo the instance's address information
+ * @return GNUNET_OK upon success initialisation of the connection to
instance's
+ * listen socket (this does not mean that the connection is
+ * established or an address is verified); GNUNET_SYSERR upon error
+ */
+static int
+verify_addresses (struct InstanceAddrInfo *iainfo)
+{
+
+ struct InstanceAddr *iaddr;
+
+ bmx = 0;
+ if (GNUNET_OK != instance_address_info_iterate_addresses (iainfo,
+
&address_iterator_cb,
+ iainfo))
+ return GNUNET_SYSERR;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Parse a verfication message from a source for its address information
+ *
+ * @param msg the message to parse
+ * @param source the MPI id of the instance which has sent this message
+ * @return the instance's address information
+ */
+static struct InstanceAddrInfo *
+parse_verify_address_msg (struct MSH_MSG_VerifyAddress *msg, int source)
+{
+ struct InstanceAddr *iaddr;
+ struct InstanceAddrInfo *iainfo;
+ size_t size;
+ uint16_t nips;
+ uint16_t cnt;
+
+ size = ntohs (msg->header.size);
+ nips = ntohs (msg->nips);
+ if (size != (sizeof (struct MSH_MSG_VerifyAddress)
+ + (sizeof (uint32_t) * nips)))
+ {
+ LOG_ERROR ("Parsing failed\n");
+ return NULL;
+ }
+ iainfo = instance_address_info_create (source);
+ for (cnt = 0; cnt < nips; cnt++)
+ {
+ LOG_DEBUG ("%d: Parsed address: %s\n", rank, ip2str ((in_addr_t) ntohl
(msg->ipaddrs[cnt])));
+ iaddr = instance_address_create_sockaddr_in (ntohs (msg->port),
+ (in_addr_t) ntohl
(msg->ipaddrs[cnt]));
+ GNUNET_break (GNUNET_OK == instance_address_info_add_address (iainfo,
iaddr));
+ }
+ return iainfo;
+}
+
+
+/**
+ * Receives the IP addresses to verify in the current round from instances
+ *
+ * @return an array containing the instance addresses; NULL upon a receive
error
+ */
+static struct InstanceAddrInfo **
+receive_addresses ()
+{
+ struct InstanceAddrInfo **iainfos;
+ MPI_Status status;
+ int cnt;
+
+ iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
+ for (cnt=0; cnt < rwidth; cnt++)
+ {
+ struct MSH_MSG_VerifyAddress *msg;
+ int rsize;
+ int lb;
+ int up;
+ int source;
+ int ret;
+
+ GNUNET_break (MPI_SUCCESS ==
+ MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES,
+ MPI_COMM_WORLD, &status));
+ MPI_Get_elements (&status, MPI_BYTE, &rsize);
+ /* We expect a message from peers with id p in the range:
+ (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round
* rwidth) -1) */
+ lb = rank - current_round * rwidth - rwidth + nproc;
+ up = rank - (current_round * rwidth) - 1 + nproc;
+ GNUNET_assert (lb >= 0);
+ GNUNET_assert (up >= 0);
+ lb %= nproc;
+ up %= nproc;
+ source = status.MPI_SOURCE;
+ if (lb == up)
+ if (source != lb)
+ {
+ GNUNET_break (0);
+ LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb,
up);
+ goto err_ret;
+ }
+ else if ((source > up) || (source < lb))
+ {
+ GNUNET_break (0);
+ goto err_ret;
+ }
+ msg = GNUNET_malloc (rsize);
+ if (MPI_SUCCESS != MPI_Recv (msg, rsize, MPI_BYTE, source,
+ MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
+ MPI_STATUS_IGNORE))
+ {
+ GNUNET_break (0);
+ goto err_ret;
+ }
+ LOG_DEBUG ("%d: Received message of size %d from %d\n", rank, rsize,
source);
+ if (NULL == (iainfos[cnt] = parse_verify_address_msg (msg, source)))
+ {
+ free (msg);
+ goto err_ret;
+ }
+ free (msg);
+ }
+ return iainfos;
+
+ err_ret:
+ for (cnt=0; cnt < rwidth; cnt++)
+ {
+ if (NULL != iainfos[cnt])
+ instance_address_info_destroy (iainfos[cnt]);
+ }
+ free (iainfos);
+ return NULL;
+}
+
+
+/**
+ * Send our addresses to an MPI processes
+ *
+ * @param rank the rank of the process which has to receive our request
+ * @return GNUNET_OK on success; GNUNET_SYSERR upon error
+ */
+static int
+send_addresses ()
+{
+ struct MSH_MSG_VerifyAddress *msg;
+ struct MSH_MSG_VerifyAddress *cpys;
+ MPI_Request *sreqs;
+ size_t msize;
+ int cnt;
+ int ret;
+ int target;
+ unsigned int width;
+
+ msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
+ msg = GNUNET_malloc (msize);
+ msg->header.size = htons (msize);
+ msg->port = htons (listen_port);
+ msg->nips = htons (nips);
+ for (cnt = 0; cnt < nips; cnt++)
+ {
+ msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
+ }
+ width = rwidth;
+ if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) /
rwidth)) )
+ width = (nproc - 1) % rwidth;
+ cpys = NULL;
+ cpys = GNUNET_malloc (msize * width);
+ sreqs = GNUNET_malloc (width * sizeof (MPI_Request));
+ for (cnt=0; cnt < width; cnt++)
+ {
+ (void) memcpy (&cpys[cnt], msg, msize);
+ target = (current_round * rwidth) + cnt + 1;
+ GNUNET_assert (target < nproc);
+ target = (rank + target) % nproc;
+ LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
+ ret = MPI_Isend (&cpys[cnt], msize, MPI_BYTE, target,
+ MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD, &sreqs[cnt]);
+ if (MPI_SUCCESS != ret)
+ break;
+ }
+ free (msg);
+ msg = NULL;
+ if (cnt != width)
+ {
+ for (cnt--; cnt >= 0; cnt--)
+ {
+ GNUNET_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
+ GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+ }
+ goto err_ret;
+ }
+ for (cnt=0; cnt < width; cnt++)
+ {
+ GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+ }
+ LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank,
current_round);
+ if (NULL != cpys)
+ {
+ free (cpys);
+ cpys = NULL;
+ }
+
+ err_ret:
+ GNUNET_free_non_null (cpys);
+ GNUNET_free_non_null (sreqs);
+ return (MPI_SUCCESS == ret) ? GNUNET_OK : GNUNET_SYSERR;
+}
+
+
+/**
+ * This functions opens a listen socket, sends this instance's IP addresses to
+ * other instances and receives their IP addresses, starts accepting
connections
+ * on listen socket and verifies the IP addresses of other instances by
+ * connecting to their listen sockets
+ *
+ * @return GNUNET_OK if verification is successful; GNUNET_SYSERR upon error
(an error
+ * message is logged)
+ */
+static int
+run_round_ ()
+{
+ unsigned int cnt;
+
+ if (GNUNET_SYSERR == send_addresses ())
+ return GNUNET_SYSERR;
+ if (NULL == (riainfos = receive_addresses ()))
+ return GNUNET_SYSERR;
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+
+ if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ for (cnt = 0; cnt < rwidth; cnt++)
+ verify_addresses (riainfos[cnt]);
+ finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &finalise_round, NULL);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Task for running a round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ rtask = GNUNET_SCHEDULER_NO_TASK;
+ if (GNUNET_OK != run_round_ ())
+ GNUNET_SCHEDULER_shutdown ();
+}
+
+
+/**
+ * Function to copy NULL terminated list of arguments
+ *
+ * @param argv the NULL terminated list of arguments. Cannot be NULL.
+ * @return the copied NULL terminated arguments
+ */
+static char **
+copy_argv (char *const *argv)
+{
+ char **argv_dup;
+ unsigned int argp;
+
+ GNUNET_assert (NULL != argv);
+ for (argp = 0; NULL != argv[argp]; argp++) ;
+ argv_dup = GNUNET_malloc (sizeof (char *) * (argp + 1));
+ for (argp = 0; NULL != argv[argp]; argp++)
+ argv_dup[argp] = strdup (argv[argp]);
+ return argv_dup;
+}
+
+
+/**
+ * Frees the given NULL terminated arguments
+ *
+ * @param argv the NULL terminated list of arguments
+ */
+static void
+free_argv (char **argv)
+{
+ unsigned int argp;
+
+ for (argp = 0; NULL != argv[argp]; argp++)
+ GNUNET_free (argv[argp]);
+ GNUNET_free (argv);
+}
+
+
+/**
+ * Main function that will be run.
+ *
+ * @param cls closure
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be
NULL!)
+ * @param cfg configuration
+ */
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ const struct GNUNET_DISK_FileHandle *fh;
+ struct sockaddr_in addr;
+ socklen_t addrlen;
+ unsigned int cnt;
+
+ LOG_DEBUG ("Running main task\n");
+ if (0 == rwidth)
+ {
+ LOG_ERROR ("Round width cannot be 0. Exiting\n");
+ return;
+ }
+ if (nproc <= rwidth)
+ {
+ LOG_ERROR ("Round width should be less than the number of processes\n");
+ return;
+ }
+ for (cnt = 0; NULL != args[cnt]; cnt++);
+ if (0 == cnt)
+ {
+ LOG_ERROR ("Need a command to execute\n");
+ return;
+ }
+ run_args = copy_argv (args);
+ bitmap = bitmap_create (rwidth);
+ addrmap = addressmap_create (nproc);
+ addrlen = sizeof (struct sockaddr_in);
+ (void) memset (&addr, 0, addrlen);
+ addr.sin_addr.s_addr = INADDR_ANY; /* bind to all available addresses */
+ listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen,
rwidth);
+ listen_port = ntohs (addr.sin_port);
+ if (NULL == listen_socket)
+ return;
+ if (0 == listen_port)
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ LOG_DEBUG ("Listening on port %u\n", listen_port);
+ GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
+ if (0 == nips)
+ {
+ LOG_ERROR ("No IP addresses found\n");
+ return;
+ }
+ schedule_next_round ();
+ shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &do_shutdown, NULL);
+ return;
+
+ clo_ret:
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+ listen_socket = NULL;
+}
+
+
+/**
+ * The execution start point
+ *
+ * @param argc the number of arguments
+ * @param argv the argument strings
+ * @return 0 for successful termination; 1 for termination upon error
+ */
+int
+main (int argc, char **argv)
+{
+ static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+ {'w', "round-width", "COUNT",
+ "set the size of each round to COUNT",
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
+ GNUNET_GETOPT_OPTION_END
+ };
+ int ret;
+ int c;
+
+ ret = 1;
+ rwidth = 1;
+ GNUNET_log_setup ("mshd", NULL, NULL);
+ if (MPI_SUCCESS != MPI_Init(&argc, &argv))
+ {
+ LOG_ERROR ("Failed to initialise MPI\n");
+ return 1;
+ }
+ if (MPI_SUCCESS != MPI_Comm_size (MPI_COMM_WORLD, &nproc))
+ {
+ LOG_ERROR ("Cannot determine the number of mshd processes\n");
+ goto fail;
+ }
+ if (nproc <= rwidth)
+ {
+ LOG_ERROR ("Given round width is greater than or equal to number of mshd
processes\n");
+ goto fail;
+ }
+ if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
+ {
+ LOG_ERROR ("Cannot determine our MPI rank\n");
+ goto fail;
+ }
+ if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "mshd", "mshd: MSH daemon",
+ options, &run, NULL))
+ {
+ GNUNET_break (0);
+ goto fail;
+ }
+ ret = 0;
+
+ fail:
+ if (MODE_WORKER <= mode)
+ return;
+ LOG_DEBUG ("Returning\n");
+ GNUNET_break (MPI_SUCCESS == MPI_Finalize());
+ return ret;
+}
Modified: msh/src/mshd_pmonitor.c
===================================================================
--- msh/src/mshd_pmonitor.c 2013-10-08 17:18:31 UTC (rev 30002)
+++ msh/src/mshd_pmonitor.c 2013-10-08 17:33:28 UTC (rev 30003)
@@ -24,11 +24,6 @@
struct MonitorCtx
{
/**
- * The process to monitor
- */
- struct GNUNET_OS_Process *proc;
-
- /**
* Termination notification callback
*/
MSH_ProcExitCallback cb;
@@ -243,16 +238,13 @@
* @param cls the closure for the above callback
*/
void
-MSH_monitor_process (struct GNUNET_OS_Process *proc,
- MSH_ProcExitCallback cb, void *cls)
+MSH_monitor_process_pid (pid_t pid,
+ MSH_ProcExitCallback cb, void *cls)
{
struct MonitorCtx *ctx;
- pid_t pid;
GNUNET_assert (NULL != map);
- pid = GNUNET_OS_process_get_pid (proc);
ctx = GNUNET_malloc (sizeof (struct MonitorCtx));
- ctx->proc = proc;
ctx->cb = cb;
ctx->cls = cls;
GNUNET_assert
@@ -263,6 +255,25 @@
/**
+ * Monitors a process for its termination.
+ *
+ * @param proc the process to monitor for termination
+ * @param cb the callback to be called for notifying the termination of the
+ * process
+ * @param cls the closure for the above callback
+ */
+void
+MSH_monitor_process (struct GNUNET_OS_Process *proc,
+ MSH_ProcExitCallback cb, void *cls)
+{
+ pid_t pid;
+
+ pid = GNUNET_OS_process_get_pid (proc);
+ MSH_monitor_process_pid (pid, cb, cls);
+}
+
+
+/**
* Stop monitoring a process
*
* @param proc
@@ -270,13 +281,11 @@
* monitored earlier
*/
int
-MSH_monitor_process_cancel (struct GNUNET_OS_Process *proc)
+MSH_monitor_process_pid_cancel (pid_t pid)
{
struct MonitorCtx *ctx;
- pid_t pid;
-
- GNUNET_assert (NULL != map);
- pid = GNUNET_OS_process_get_pid (proc);
+
+ GNUNET_assert (NULL != map);
ctx = GNUNET_CONTAINER_multihashmap32_get (map, (uint32_t) pid);
if (NULL == ctx)
return GNUNET_SYSERR;
@@ -286,3 +295,20 @@
GNUNET_free (ctx);
return GNUNET_OK;
}
+
+
+/**
+ * Stop monitoring a process
+ *
+ * @param proc
+ * @return GNUNET_OK upon success; GNUNET_SYSERR if the process is not being
+ * monitored earlier
+ */
+int
+MSH_monitor_process_cancel (struct GNUNET_OS_Process *proc)
+{
+ pid_t pid;
+
+ pid = GNUNET_OS_process_get_pid (proc);
+ return MSH_monitor_process_pid_cancel (pid);
+}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30003 - msh/src,
gnunet <=