[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28338 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28338 - msh/src |
Date: |
Tue, 30 Jul 2013 15:45:46 +0200 |
Author: harsha
Date: 2013-07-30 15:45:46 +0200 (Tue, 30 Jul 2013)
New Revision: 28338
Modified:
msh/src/reduce.c
Log:
-addressmap consensus by broadcasting
Modified: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c 2013-07-30 12:22:57 UTC (rev 28337)
+++ msh/src/reduce.c 2013-07-30 13:45:46 UTC (rev 28338)
@@ -19,205 +19,93 @@
#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
-
-
/**
- * Send our addressmap to the instance with the given rank
+ * Perform an ntree reduction on address maps
*
- * @param instance the rank of the instance to which our addressmap has to be
sent
* @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
*/
-static int
-send_addressmap (int instance)
-{
+int
+reduce_ntree ()
+{
struct MSH_MSG_InstanceAdresses **iaddr_msgs;
- MPI_Request *send_reqs;
- MPI_Status *stats;
+ char *buf;
int nmsg;
- int cnt;
- int ret;
-
- ret = GNUNET_SYSERR;
- send_reqs = NULL;
- if (GNUNET_SYSERR == (nmsg = addressmap_pack (addrmap, &iaddr_msgs)))
- {
- GNUNET_break (0);
- return ret;
- }
- send_reqs = GNUNET_malloc (sizeof (MPI_Request) * nmsg);
- LOG_DEBUG ("%d: Sending addressmap to instance %d\n", rank, instance);
- for (cnt = 0; cnt < nmsg; cnt++)
- {
- if (MPI_SUCCESS !=
- MPI_Isend (iaddr_msgs[cnt], ntohs (iaddr_msgs[cnt]->header.size),
MPI_BYTE,
- instance, MSH_MTYPE_INSTANCE_ADDRESS, MPI_COMM_WORLD,
- &send_reqs[cnt]))
- {
- GNUNET_break (0);
- cnt--;
- goto cleanup;
- }
- }
- stats = GNUNET_malloc (sizeof (MPI_Status) * nmsg);
- if (MPI_SUCCESS != MPI_Waitall (nmsg, send_reqs, stats))
- {
- GNUNET_break (0);
- goto cleanup;
- }
- GNUNET_free (send_reqs);
- send_reqs = NULL;
- for (cnt = 0; cnt < nmsg; cnt++)
- {
- if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
- {
- GNUNET_break (0);
- goto cleanup;
- }
- }
- ret = GNUNET_OK;
+ unsigned int cnt;
+ unsigned int grow;
+ unsigned int size;
+ unsigned int preamble[2]; /* 0: total size; 1: n messages */
+ unsigned int nr;
- cleanup:
- for (;(cnt > 0) && (NULL != send_reqs); cnt--)
+ for (cnt = 0; cnt < nproc; cnt++)
{
- GNUNET_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt - 1]));
- GNUNET_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt - 1],
MPI_STATUS_IGNORE));
- }
- for (cnt = 0; cnt < nmsg; cnt++)
- free (iaddr_msgs[cnt]);
- GNUNET_free_non_null (iaddr_msgs);
- GNUNET_free_non_null (send_reqs);
- GNUNET_free_non_null (stats);
- return ret;
-}
-
-
-/**
- * Receive address maps during a given reduction step
- *
- * @param step the current reduction step
- * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
- */
-static int
-receive_addressmap (unsigned int step)
-{
-
- struct MSH_MSG_InstanceAdresses *msg;
- MPI_Status stat;
- unsigned int width;
- unsigned int lb;
- unsigned int ub;
- unsigned int cnt;
- int nrecv;
- int ret;
- int msize;
-
- width = (rwidth <= 1 ? 2 : rwidth);
- width--; /* we don't receive from us */
- lb = rank + 1;
- if (lb == nproc)
- lb = rank;
- ub = lb;
- for (cnt = 0; cnt < width; cnt++)
- {
- if (ub + pow (width, step) >= nproc)
- break;
- ub = ub + pow (width, step);
- }
- nrecv = cnt;
- if (0 == nrecv)
- {
- LOG_DEBUG ("Not waiting to receive address map\n");
- return GNUNET_OK;
- }
- GNUNET_assert (nrecv >= 0);
- nrecv *= nproc; /* we get a message for each instance from each instance */
- LOG_DEBUG ("%d: Waiting to receive %d instance address messages\n", rank,
nrecv);
- ret = GNUNET_SYSERR;
- for (cnt = 0; cnt < nrecv; cnt++)
- {
- msg = NULL;
- if (MPI_SUCCESS != MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_INSTANCE_ADDRESS,
- MPI_COMM_WORLD, &stat))
+ buf = NULL;
+ size = 0;
+ grow = 0;
+ preamble[0] = 0;
+ preamble[1] = 0;
+ iaddr_msgs = NULL;
+ if (rank == cnt)
{
- GNUNET_break (0);
- goto cleanup;
+ if (GNUNET_SYSERR == (nmsg = addressmap_pack (addrmap, &iaddr_msgs)))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ for (nr = 0; nr < nmsg; nr++)
+ {
+ grow = ntohs (iaddr_msgs[nr]->header.size);
+ buf = GNUNET_realloc (buf, size + grow);
+ (void) memcpy (buf + size, iaddr_msgs[nr], grow);
+ size += grow;
+ }
+ preamble[0] = size;
+ preamble[1] = nmsg;
+ LOG_DEBUG ("Broadcasting address map from instance %u\n", cnt);
}
- if (!((lb <= stat.MPI_SOURCE ) && (stat.MPI_SOURCE <= ub)))
+ if (MPI_SUCCESS != MPI_Bcast (preamble, 2, MPI_UNSIGNED, cnt,
MPI_COMM_WORLD))
{
GNUNET_break (0);
- LOG_ERROR ("Received a message from unexpected source %d\n",
stat.MPI_SOURCE);
- goto cleanup;
+ return GNUNET_SYSERR;
}
- LOG_DEBUG ("%d: Receiving %d (nd/th) addressmap message from instance
%d\n",
- rank, cnt, stat.MPI_SOURCE);
- msize = 0;
- if ((MPI_SUCCESS != MPI_Get_elements (&stat, MPI_BYTE, &msize))
- || (msize <= 0))
+ if (rank != cnt)
{
- GNUNET_break (0);
- goto cleanup;
+ size = preamble[0];
+ nmsg = preamble[1];
+ buf = GNUNET_malloc (size);
}
- msg = GNUNET_malloc (msize);
- if (MPI_SUCCESS != MPI_Recv (msg, msize, MPI_BYTE, stat.MPI_SOURCE,
- MSH_MTYPE_INSTANCE_ADDRESS, MPI_COMM_WORLD,
- MPI_STATUS_IGNORE))
+ GNUNET_assert (NULL != buf);
+ GNUNET_assert (0 < size);
+ if (MPI_SUCCESS != MPI_Bcast (buf, size, MPI_BYTE, cnt, MPI_COMM_WORLD))
{
GNUNET_break (0);
- goto cleanup;
+ return GNUNET_SYSERR;
}
- LOG_DEBUG ("%d: [%u/%d] Received an instance address from %d\n", rank,
- cnt + 1, nrecv, stat.MPI_SOURCE);
- if (0 >= addressmap_intersect_msg (addrmap, msg))
+ if (rank == cnt)
{
- LOG_ERROR ("%d: No common addresses found for instance %d\n", rank,
- ntohs (msg->rank));
- goto cleanup;
+ GNUNET_free (buf);
+ for (nr = 0; nr < nmsg; nr++)
+ GNUNET_free (iaddr_msgs[nr]);
+ GNUNET_free (iaddr_msgs);
+ continue;
}
- free (msg);
- msg = NULL;
- }
-
- ret = GNUNET_OK;
- cleanup:
- GNUNET_free_non_null (msg);
- return ret;
-}
-
-
-/**
- * Perform an ntree reduction on address maps
- *
- * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
- */
-int
-reduce_ntree ()
-{
- unsigned int step;
- unsigned int max_steps;
- unsigned int aggregator;
- unsigned int step_width;
- unsigned int width;
-
- width = (rwidth <= 1 ? 2 : rwidth);
- max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double)
width));
- LOG_DEBUG ("Reduction with max steps: %u\n", max_steps);
- if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
- return GNUNET_SYSERR;
- for (step = 0; step < max_steps; step++)
- {
- LOG_DEBUG ("Reduction step %u\n", step);
- step_width = (unsigned int) pow (width, step + 1);
- if (0 != (aggregator = (rank % step_width)))
+ iaddr_msgs = GNUNET_malloc (sizeof (struct MSH_MSG_InstanceAdresses *) *
+ nmsg);
+ grow = 0;
+ for (nr = 0; nr < nmsg; nr++)
{
- aggregator = rank - aggregator;
- if (GNUNET_SYSERR == send_addressmap (aggregator))
- return GNUNET_SYSERR;
- return GNUNET_OK;
+ iaddr_msgs[nr] = (struct MSH_MSG_InstanceAdresses *) (buf + grow);
+ grow += ntohs (iaddr_msgs[nr]->header.size);
+ if (0 > addressmap_intersect_msg (addrmap, iaddr_msgs[nr]))
+ {
+ LOG_ERROR ("No common address found for instance %u\n",
+ ntohs (iaddr_msgs[nr]->rank));
+ break;
+ }
}
- /* receive address maps */
- if (GNUNET_SYSERR == receive_addressmap (step))
- return GNUNET_SYSERR;
+ LOG_DEBUG ("Intersected received addressmap from instance %u\n", cnt);
+ GNUNET_free (buf);
+ GNUNET_free (iaddr_msgs);
}
return GNUNET_OK;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28338 - msh/src,
gnunet <=