[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28705 - gnunet/src/testbed
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28705 - gnunet/src/testbed |
Date: |
Mon, 19 Aug 2013 16:13:19 +0200 |
Author: harsha
Date: 2013-08-19 16:13:19 +0200 (Mon, 19 Aug 2013)
New Revision: 28705
Modified:
gnunet/src/testbed/gnunet-service-testbed.c
gnunet/src/testbed/test_testbed_api_operations.c
gnunet/src/testbed/testbed_api.c
gnunet/src/testbed/testbed_api_hosts.c
gnunet/src/testbed/testbed_api_hosts.h
gnunet/src/testbed/testbed_api_operations.c
gnunet/src/testbed/testbed_api_operations.h
gnunet/src/testbed/testbed_api_peers.c
gnunet/src/testbed/testbed_api_peers.h
gnunet/src/testbed/testbed_api_statistics.c
Log:
fix 2893: Move adaptive parallelisation mechanism to operation queues
Modified: gnunet/src/testbed/gnunet-service-testbed.c
===================================================================
--- gnunet/src/testbed/gnunet-service-testbed.c 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/gnunet-service-testbed.c 2013-08-19 14:13:19 UTC (rev
28705)
@@ -918,7 +918,8 @@
GNUNET_assert (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_number (cfg, "TESTBED",
"MAX_OPEN_FDS", &num));
- GST_opq_openfds = GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
num);
+ GST_opq_openfds = GNUNET_TESTBED_operation_queue_create_
+ (OPERATION_QUEUE_TYPE_FIXED, (unsigned int) num);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_time (cfg, "TESTBED",
"OPERATION_TIMEOUT",
Modified: gnunet/src/testbed/test_testbed_api_operations.c
===================================================================
--- gnunet/src/testbed/test_testbed_api_operations.c 2013-08-19 14:03:01 UTC
(rev 28704)
+++ gnunet/src/testbed/test_testbed_api_operations.c 2013-08-19 14:13:19 UTC
(rev 28705)
@@ -490,9 +490,9 @@
run (void *cls, char *const *args, const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *config)
{
- q1 = GNUNET_TESTBED_operation_queue_create_ (1);
+ q1 = GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, 1);
GNUNET_assert (NULL != q1);
- q2 = GNUNET_TESTBED_operation_queue_create_ (2);
+ q2 = GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, 2);
GNUNET_assert (NULL != q2);
op1 = GNUNET_TESTBED_operation_create_ (&op1, start_cb, release_cb);
GNUNET_assert (NULL != op1);
Modified: gnunet/src/testbed/testbed_api.c
===================================================================
--- gnunet/src/testbed/testbed_api.c 2013-08-19 14:03:01 UTC (rev 28704)
+++ gnunet/src/testbed/testbed_api.c 2013-08-19 14:13:19 UTC (rev 28705)
@@ -860,7 +860,7 @@
struct OverlayConnectData *data;
data = opc->data;
- data->failed = GNUNET_YES;
+ GNUNET_TESTBED_operation_mark_failed (opc->op);
if (NULL != data->cb)
data->cb (data->cb_cls, opc->op, emsg);
}
@@ -1486,13 +1486,15 @@
GNUNET_TESTBED_mark_host_registered_at_ (host, controller);
controller->host = host;
controller->opq_parallel_operations =
- GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
- max_parallel_operations);
+ GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
+ (unsigned int)
max_parallel_operations);
controller->opq_parallel_service_connections =
- GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
+ GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
+ (unsigned int)
max_parallel_service_connections);
controller->opq_parallel_topology_config_operations =
- GNUNET_TESTBED_operation_queue_create_ ((unsigned int)
+ GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED,
+ (unsigned int)
max_parallel_topology_config_operations);
controller_hostname = GNUNET_TESTBED_host_get_hostname (host);
if (NULL == controller_hostname)
@@ -1856,13 +1858,25 @@
/**
- * Signal that the information from an operation has been fully
- * processed. This function MUST be called for each event
- * of type 'operation_finished' to fully remove the operation
- * from the operation queue. After calling this function, the
- * 'op_result' becomes invalid (!).
+ * This function is used to signal that the event information (struct
+ * GNUNET_TESTBED_EventInformation) from an operation has been fully processed
+ * i.e. if the event callback is ever called for this operation. If the event
+ * callback for this operation has not yet been called, calling this function
+ * cancels the operation, frees its resources and ensures the no event is
+ * generated with respect to this operation. Note that however cancelling an
+ * operation does NOT guarantee that the operation will be fully undone (or
that
+ * nothing ever happened).
*
- * @param operation operation to signal completion for
+ * This function MUST be called for every operation to fully remove the
+ * operation from the operation queue. After calling this function, if
+ * operation is completed and its event information is of type
+ * GNUNET_TESTBED_ET_OPERATION_FINISHED, the 'op_result' becomes invalid (!).
+
+ * If the operation is generated from GNUNET_TESTBED_service_connect() then
+ * calling this function on such as operation calls the disconnect adapter if
+ * the connect adapter was ever called.
+ *
+ * @param operation operation to signal completion or cancellation
*/
void
GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation)
Modified: gnunet/src/testbed/testbed_api_hosts.c
===================================================================
--- gnunet/src/testbed/testbed_api_hosts.c 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/testbed_api_hosts.c 2013-08-19 14:13:19 UTC (rev
28705)
@@ -35,7 +35,6 @@
#include "testbed_api_hosts.h"
#include "testbed_helper.h"
#include "testbed_api_operations.h"
-#include "testbed_api_sd.h"
#include <zlib.h>
@@ -97,28 +96,6 @@
/**
- * A slot to record time taken by an overlay connect operation
- */
-struct TimeSlot
-{
- /**
- * A key to identify this timeslot
- */
- void *key;
-
- /**
- * Time
- */
- struct GNUNET_TIME_Relative time;
-
- /**
- * Number of timing values accumulated
- */
- unsigned int nvals;
-};
-
-
-/**
* Opaque handle to a host running experiments managed by the testing
framework.
* The master process must be able to SSH to this host without password (via
* ssh-agent).
@@ -161,28 +138,6 @@
struct OperationQueue *opq_parallel_overlay_connect_operations;
/**
- * An array of timing slots; size should be equal to the current number of
parallel
- * overlay connects
- */
- struct TimeSlot *tslots;
-
- /**
- * Handle for SD calculations amount parallel overlay connect operation
finish
- * times
- */
- struct SDHandle *poc_sd;
-
- /**
- * The number of parallel overlay connects we do currently
- */
- unsigned int num_parallel_connects;
-
- /**
- * Counter to indicate when all the available time slots are filled
- */
- unsigned int tslots_filled;
-
- /**
* Is a controller started on this host? FIXME: Is this needed?
*/
int controller_started;
@@ -382,9 +337,8 @@
host->port = (0 == port) ? 22 : port;
host->cfg = GNUNET_CONFIGURATION_dup (cfg);
host->opq_parallel_overlay_connect_operations =
- GNUNET_TESTBED_operation_queue_create_ (0);
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (host, 1);
- host->poc_sd = GNUNET_TESTBED_SD_init_ (10);
+ GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_ADAPTIVE,
+ UINT_MAX);
new_size = host_list_size;
while (id >= new_size)
new_size += HOST_LIST_GROW_STEP;
@@ -740,8 +694,6 @@
GNUNET_free_non_null ((char *) host->hostname);
GNUNET_TESTBED_operation_queue_destroy_
(host->opq_parallel_overlay_connect_operations);
- GNUNET_TESTBED_SD_destroy_ (host->poc_sd);
- GNUNET_free_non_null (host->tslots);
GNUNET_CONFIGURATION_destroy (host->cfg);
GNUNET_free (host);
while (host_list_size >= HOST_LIST_GROW_STEP)
@@ -1624,192 +1576,6 @@
/**
- * Initializes the operation queue for parallel overlay connects
- *
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
- */
-void
-GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct
- GNUNET_TESTBED_Host *h,
- unsigned int npoc)
-{
- //fprintf (stderr, "%d", npoc);
- GNUNET_free_non_null (h->tslots);
- h->tslots_filled = 0;
- h->num_parallel_connects = npoc;
- h->tslots = GNUNET_malloc (npoc * sizeof (struct TimeSlot));
- GNUNET_TESTBED_operation_queue_reset_max_active_
- (h->opq_parallel_overlay_connect_operations, npoc);
-}
-
-
-/**
- * Returns a timing slot which will be exclusively locked
- *
- * @param h the host handle
- * @param key a pointer which is associated to the returned slot; should not be
- * NULL. It serves as a key to determine the correct owner of the slot
- * @return the time slot index in the array of time slots in the controller
- * handle
- */
-unsigned int
-GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Host *h, void *key)
-{
- unsigned int slot;
-
- GNUNET_assert (NULL != h->tslots);
- GNUNET_assert (NULL != key);
- for (slot = 0; slot < h->num_parallel_connects; slot++)
- if (NULL == h->tslots[slot].key)
- {
- h->tslots[slot].key = key;
- return slot;
- }
- GNUNET_assert (0); /* We should always find a free tslot */
-}
-
-
-/**
- * Decides whether any change in the number of parallel overlay connects is
- * necessary to adapt to the load on the system
- *
- * @param h the host handle
- */
-static void
-decide_npoc (struct GNUNET_TESTBED_Host *h)
-{
- struct GNUNET_TIME_Relative avg;
- int sd;
- unsigned int slot;
- unsigned int nvals;
-
- if (h->tslots_filled != h->num_parallel_connects)
- return;
- avg = GNUNET_TIME_UNIT_ZERO;
- nvals = 0;
- for (slot = 0; slot < h->num_parallel_connects; slot++)
- {
- avg = GNUNET_TIME_relative_add (avg, h->tslots[slot].time);
- nvals += h->tslots[slot].nvals;
- }
- GNUNET_assert (nvals >= h->num_parallel_connects);
- avg = GNUNET_TIME_relative_divide (avg, nvals);
- GNUNET_assert (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us !=
avg.rel_value_us);
- sd = GNUNET_TESTBED_SD_deviation_factor_ (h->poc_sd, (unsigned int)
avg.rel_value_us);
- if ( (sd <= 5) ||
- (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- h->num_parallel_connects)) )
- GNUNET_TESTBED_SD_add_data_ (h->poc_sd, (unsigned int) avg.rel_value_us);
- if (GNUNET_SYSERR == sd)
- {
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
-
h->num_parallel_connects);
- return;
- }
- GNUNET_assert (0 <= sd);
- if (0 == sd)
- {
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
- h->num_parallel_connects
- * 2);
- return;
- }
- if (1 == sd)
- {
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
- h->num_parallel_connects
- + 1);
- return;
- }
- if (1 == h->num_parallel_connects)
- {
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
- return;
- }
- if (2 == sd)
- {
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
- h->num_parallel_connects
- - 1);
- return;
- }
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
- h->num_parallel_connects /
- 2);
-}
-
-
-/**
- * Releases a time slot thus making it available for be used again
- *
- * @param h the host handle
- * @param index the index of the the time slot
- * @param key the key to prove ownership of the timeslot
- * @return GNUNET_YES if the time slot is successfully removed; GNUNET_NO if
the
- * time slot cannot be removed - this could be because of the index
- * greater than existing number of time slots or `key' being
different
- */
-int
-GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Host *h,
- unsigned int index, void *key)
-{
- struct TimeSlot *slot;
-
- GNUNET_assert (NULL != key);
- if (index >= h->num_parallel_connects)
- return GNUNET_NO;
- slot = &h->tslots[index];
- if (key != slot->key)
- return GNUNET_NO;
- slot->key = NULL;
- return GNUNET_YES;
-}
-
-
-/**
- * Function to update a time slot
- *
- * @param h the host handle
- * @param index the index of the time slot to update
- * @param key the key to identify ownership of the slot
- * @param time the new time
- * @param failed should this reading be treated as coming from a fail event
- */
-void
-GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Host *h,
- unsigned int index, void *key,
- struct GNUNET_TIME_Relative time, int failed)
-{
- struct TimeSlot *slot;
-
- if (GNUNET_YES == failed)
- {
- if (1 == h->num_parallel_connects)
- {
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h, 1);
- return;
- }
- GNUNET_TESTBED_set_num_parallel_overlay_connects_ (h,
- h->num_parallel_connects
- - 1);
- }
- if (GNUNET_NO == GNUNET_TESTBED_release_time_slot_ (h, index, key))
- return;
- slot = &h->tslots[index];
- slot->nvals++;
- if (GNUNET_TIME_UNIT_ZERO.rel_value_us == slot->time.rel_value_us)
- {
- slot->time = time;
- h->tslots_filled++;
- decide_npoc (h);
- return;
- }
- slot->time = GNUNET_TIME_relative_add (slot->time, time);
-}
-
-
-/**
* Queues the given operation in the queue for parallel overlay connects of the
* given host
*
Modified: gnunet/src/testbed/testbed_api_hosts.h
===================================================================
--- gnunet/src/testbed/testbed_api_hosts.h 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/testbed_api_hosts.h 2013-08-19 14:13:19 UTC (rev
28705)
@@ -151,61 +151,6 @@
/**
- * (re)sets the operation queue for parallel overlay connects
- *
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
- */
-void
-GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct
- GNUNET_TESTBED_Host *h,
- unsigned int npoc);
-
-
-/**
- * Releases a time slot thus making it available for be used again
- *
- * @param h the host handle
- * @param index the index of the the time slot
- * @param key the key to prove ownership of the timeslot
- * @return GNUNET_YES if the time slot is successfully removed; GNUNET_NO if
the
- * time slot cannot be removed - this could be because of the index
- * greater than existing number of time slots or `key' being
different
- */
-int
-GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Host *h,
- unsigned int index, void *key);
-
-
-/**
- * Function to update a time slot
- *
- * @param h the host handle
- * @param index the index of the time slot to update
- * @param key the key to identify ownership of the slot
- * @param time the new time
- * @param failed should this reading be treated as coming from a fail event
- */
-void
-GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Host *h,
- unsigned int index, void *key,
- struct GNUNET_TIME_Relative time, int
failed);
-
-
-/**
- * Returns a timing slot which will be exclusively locked
- *
- * @param h the host handle
- * @param key a pointer which is associated to the returned slot; should not be
- * NULL. It serves as a key to determine the correct owner of the slot
- * @return the time slot index in the array of time slots in the controller
- * handle
- */
-unsigned int
-GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Host *h, void *key);
-
-
-/**
* Queues the given operation in the queue for parallel overlay connects of the
* given host
*
Modified: gnunet/src/testbed/testbed_api_operations.c
===================================================================
--- gnunet/src/testbed/testbed_api_operations.c 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/testbed_api_operations.c 2013-08-19 14:13:19 UTC (rev
28705)
@@ -27,6 +27,7 @@
#include "platform.h"
#include "testbed_api_operations.h"
+#include "testbed_api_sd.h"
/**
@@ -60,6 +61,89 @@
* Queue of operations where we can only support a certain
* number of concurrent operations of a particular type.
*/
+struct OperationQueue;
+
+
+/**
+ * A slot to record time taken by an operation
+ */
+struct TimeSlot
+{
+ /**
+ * DLL next pointer
+ */
+ struct TimeSlot *next;
+
+ /**
+ * DLL prev pointer
+ */
+ struct TimeSlot *prev;
+
+ /**
+ * This operation queue to which this time slot belongs to
+ */
+ struct OperationQueue *queue;
+
+ /**
+ * The operation to which this timeslot is currently allocated to
+ */
+ struct GNUNET_TESTBED_Operation *op;
+
+ /**
+ * Accumulated time
+ */
+ struct GNUNET_TIME_Relative tsum;
+
+ /**
+ * Number of timing values accumulated
+ */
+ unsigned int nvals;
+};
+
+
+/**
+ * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
+ */
+struct FeedbackCtx
+{
+ /**
+ * Handle for calculating standard deviation
+ */
+ struct SDHandle *sd;
+
+ /**
+ * Head for DLL of time slots which are free to be allocated to operations
+ */
+ struct TimeSlot *alloc_head;
+
+ /**
+ * Tail for DLL of time slots which are free to be allocated to operations
+ */
+ struct TimeSlot *alloc_tail;
+
+ /**
+ * Pointer to the chunk of time slots. Free all time slots at a time using
+ * this pointer.
+ */
+ struct TimeSlot *tslots_freeptr;
+
+ /**
+ * Number of time slots filled so far
+ */
+ unsigned int tslots_filled;
+
+ /**
+ * Bound on the maximum number of operations which can be active
+ */
+ unsigned int max_active_bound;
+
+};
+
+
+/**
+ * Queue of operations where we can only support a certain
+ * number of concurrent operations of a particular type.
+ */
struct OperationQueue
{
/**
@@ -108,12 +192,26 @@
struct QueueEntry *nq_tail;
/**
+ * Feedback context; only relevant for adaptive operation queues. NULL for
+ * fixed operation queues
+ */
+ struct FeedbackCtx *fctx;
+
+ /**
+ * The type of this opeartion queue
+ */
+ enum OperationQueueType type;
+
+ /**
* Number of operations that are currently active in this queue.
*/
unsigned int active;
/**
- * Max number of operations which can be active at any time in this queue
+ * Max number of operations which can be active at any time in this queue.
+ * This value can be changed either by calling
+ * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
+ * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
*/
unsigned int max_active;
@@ -222,6 +320,21 @@
struct ReadyQueueEntry *rq_entry;
/**
+ * Head pointer for DLL of tslots allocated to this operation
+ */
+ struct TimeSlot *tslots_head;
+
+ /**
+ * Tail pointer for DLL of tslots allocated to this operation
+ */
+ struct TimeSlot *tslots_tail;
+
+ /**
+ * The time at which the operation is started
+ */
+ struct GNUNET_TIME_Absolute tstart;
+
+ /**
* Number of queues in the operation queues array
*/
unsigned int nqueues;
@@ -231,6 +344,11 @@
*/
enum OperationState state;
+ /**
+ * Is this a failed operation?
+ */
+ int failed;
+
};
/**
@@ -250,6 +368,29 @@
/**
+ * Assigns the given operation a time slot from the given operation queue
+ *
+ * @param op the operation
+ * @param queue the operation queue
+ * @return the timeslot
+ */
+static void
+assign_timeslot (struct GNUNET_TESTBED_Operation *op,
+ struct OperationQueue *queue)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+
+ GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
+ tslot = fctx->alloc_head;
+ GNUNET_assert (NULL != tslot);
+ GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
+ GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
+ tslot->op = op;
+}
+
+
+/**
* Removes a queue entry of an operation from one of the operation queues'
lists
* depending on the state of the operation
*
@@ -378,6 +519,8 @@
process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_TESTBED_Operation *op;
+ struct OperationQueue *queue;
+ unsigned int cnt;
process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (NULL != rq_head);
@@ -386,8 +529,15 @@
if (NULL != rq_head)
process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
change_state (op, OP_STATE_ACTIVE);
+ for (cnt = 0; cnt < op->nqueues; cnt++)
+ {
+ queue = op->queues[cnt];
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+ assign_timeslot (op, queue);
+ }
+ op->tstart = GNUNET_TIME_absolute_get ();
if (NULL != op->start)
- op->start (op->cb_cls);
+ op->start (op->cb_cls);
}
@@ -582,7 +732,7 @@
if (NULL != evict_ops)
{
for (i = 0; i < n_evict_ops; i++)
- GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+ GNUNET_TESTBED_operation_release_ (evict_ops[i]);
GNUNET_free (evict_ops);
evict_ops = NULL;
/* Evicting the operations should schedule this operation */
@@ -619,6 +769,162 @@
/**
+ * Cleanups the array of timeslots of an operation queue. For each time slot
in
+ * the array, if it is allocated to an operation, it will be deallocated from
+ * the operation
+ *
+ * @param queue the operation queue
+ */
+static void
+cleanup_tslots (struct OperationQueue *queue)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ struct GNUNET_TESTBED_Operation *op;
+ unsigned int cnt;
+
+ GNUNET_assert (NULL != fctx);
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ op = tslot->op;
+ if (NULL == op)
+ continue;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ }
+ GNUNET_free_non_null (fctx->tslots_freeptr);
+ fctx->tslots_freeptr = NULL;
+ fctx->alloc_head = NULL;
+ fctx->alloc_tail = NULL;
+ fctx->tslots_filled = 0;
+}
+
+
+/**
+ * Initializes the operation queue for parallel overlay connects
+ *
+ * @param h the host handle
+ * @param npoc the number of parallel overlay connects - the queue size
+ */
+static void
+adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ unsigned int cnt;
+
+ cleanup_tslots (queue);
+ n = GNUNET_MIN (n ,fctx->max_active_bound);
+ fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+ for (cnt = 0; cnt < n; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ tslot->queue = queue;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
tslot);
+ }
+ GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
+}
+
+
+/**
+ * Adapts parallelism in an adaptive queue by using the statistical data from
+ * the feedback context.
+ *
+ * @param queue the queue
+ * @param fail GNUNET_YES if the last operation failed; GNUNET_NO if not;
+ */
+static void
+adapt_parallelism (struct OperationQueue *queue, int fail)
+{
+ struct GNUNET_TIME_Relative avg;
+ struct FeedbackCtx *fctx;
+ struct TimeSlot *tslot;
+ int sd;
+ unsigned int nvals;
+ unsigned int cnt;
+
+ avg = GNUNET_TIME_UNIT_ZERO;
+ nvals = 0;
+ fctx = queue->fctx;
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
+ nvals += tslot->nvals;
+ }
+ GNUNET_assert (nvals >= queue->max_active);
+ avg = GNUNET_TIME_relative_divide (avg, nvals);
+ sd = GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, (unsigned int)
+ avg.rel_value_us);
+ if ( (sd <= 5) ||
+ (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ queue->max_active)) )
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (GNUNET_SYSERR == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
+ return;
+ }
+ GNUNET_assert (0 <= sd);
+ if ((0 == sd) && (! fail))
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active * 2);
+ return;
+ }
+ if ((1 == sd) && (! fail))
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active + 1);
+ return;
+ }
+ if (1 == queue->max_active)
+ {
+ adaptive_queue_set_max_active (queue, 1);
+ return;
+ }
+ if (((sd < 2) && (fail)) || (2 == sd))
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active - 1);
+ return;
+ }
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+}
+
+
+/**
+ * update tslots with the operation's completion time. Additionally, if
+ * updating a timeslot makes all timeslots filled in an adaptive operation
+ * queue, call adapt_parallelism() for that queue.
+ *
+ * @param op the operation
+ */
+static void
+update_tslots (struct GNUNET_TESTBED_Operation *op)
+{
+ struct OperationQueue *queue;
+ struct GNUNET_TIME_Relative t;
+ struct TimeSlot *tslot;
+ struct FeedbackCtx *fctx;
+
+ t = GNUNET_TIME_absolute_get_duration (op->tstart);
+ while (NULL != (tslot = op->tslots_head)) /* update time slots */
+ {
+ queue = tslot->queue;
+ fctx = queue->fctx;
+ tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ tslot->op = NULL;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+ tslot);
+ if (0 != tslot->nvals++)
+ continue;
+ fctx->tslots_filled++;
+ if (queue->max_active == fctx->tslots_filled)
+ adapt_parallelism (queue, op->failed);
+ }
+}
+
+
+/**
* Create an 'operation' to be performed.
*
* @param cls closure for the callbacks
@@ -644,17 +950,32 @@
/**
* Create an operation queue.
*
+ * @param type the type of operation queue
* @param max_active maximum number of operations in this
* queue that can be active in parallel at the same time
* @return handle to the queue
*/
struct OperationQueue *
-GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
+GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
+ unsigned int max_active)
{
struct OperationQueue *queue;
+ struct FeedbackCtx *fctx;
queue = GNUNET_malloc (sizeof (struct OperationQueue));
- queue->max_active = max_active;
+ queue->type = type;
+ if (OPERATION_QUEUE_TYPE_FIXED == type)
+ {
+ queue->max_active = max_active;
+ }
+ else
+ {
+ fctx = GNUNET_malloc (sizeof (struct FeedbackCtx));
+ fctx->max_active_bound = max_active;
+ fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
+ queue->fctx = fctx;
+ adaptive_queue_set_max_active (queue, 1); /* start with 1 */
+ }
return queue;
}
@@ -668,7 +989,16 @@
void
GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
{
+ struct FeedbackCtx *fctx;
+
GNUNET_break (GNUNET_YES == is_queue_empty (queue));
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+ {
+ cleanup_tslots (queue);
+ fctx = queue->fctx;
+ GNUNET_TESTBED_SD_destroy_ (fctx->sd);
+ GNUNET_free (fctx);
+ }
GNUNET_free (queue);
}
@@ -867,8 +1197,10 @@
rq_remove (op);
if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
GNUNET_TESTBED_operation_activate_ (op);
+ if (OP_STATE_ACTIVE == op->state)
+ update_tslots (op);
GNUNET_assert (NULL != op->queues);
- GNUNET_assert (NULL != op->qentries);
+ GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
{
entry = op->qentries[i];
@@ -882,8 +1214,8 @@
break;
case OP_STATE_WAITING:
break;
+ case OP_STATE_ACTIVE:
case OP_STATE_READY:
- case OP_STATE_ACTIVE:
GNUNET_assert (0 != opq->active);
GNUNET_assert (opq->active >= entry->nres);
opq->active -= entry->nres;
@@ -901,4 +1233,16 @@
}
+/**
+ * Marks an operation as failed
+ *
+ * @param op the operation to be marked as failed
+ */
+void
+GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
+{
+ op->failed = GNUNET_YES;
+}
+
+
/* end of testbed_api_operations.c */
Modified: gnunet/src/testbed/testbed_api_operations.h
===================================================================
--- gnunet/src/testbed/testbed_api_operations.h 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/testbed_api_operations.h 2013-08-19 14:13:19 UTC (rev
28705)
@@ -38,14 +38,35 @@
/**
+ * The type of operation queue
+ */
+enum OperationQueueType
+{
+ /**
+ * Operation queue which permits a fixed maximum number of operations to be
+ * active at any time
+ */
+ OPERATION_QUEUE_TYPE_FIXED,
+
+ /**
+ * Operation queue which adapts the number of operations to be active based
on
+ * the operation completion times of previously executed operation in it
+ */
+ OPERATION_QUEUE_TYPE_ADAPTIVE
+};
+
+
+/**
* Create an operation queue.
*
- * @param max_active maximum number of operations in this
- * queue that can be active in parallel at the same time
+ * @param type the type of operation queue
+ * @param max_active maximum number of operations in this queue that can be
+ * active in parallel at the same time.
* @return handle to the queue
*/
struct OperationQueue *
-GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active);
+GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
+ unsigned int max_active);
/**
@@ -199,5 +220,14 @@
GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op);
+/**
+ * Marks an operation as failed
+ *
+ * @param op the operation to be marked as failed
+ */
+void
+GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op);
+
+
#endif
/* end of testbed_api_operations.h */
Modified: gnunet/src/testbed/testbed_api_peers.c
===================================================================
--- gnunet/src/testbed/testbed_api_peers.c 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/testbed_api_peers.c 2013-08-19 14:13:19 UTC (rev
28705)
@@ -410,8 +410,6 @@
opc->state = OPC_STATE_STARTED;
data = opc->data;
GNUNET_assert (NULL != data);
- data->tslot_index = GNUNET_TESTBED_get_tslot_ (data->p1->host, data);
- data->tstart = GNUNET_TIME_absolute_get ();
msg = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
msg->header.size =
htons (sizeof (struct GNUNET_TESTBED_OverlayConnectMessage));
@@ -434,7 +432,6 @@
oprelease_overlay_connect (void *cls)
{
struct OperationContext *opc = cls;
- struct GNUNET_TIME_Relative duration;
struct OverlayConnectData *data;
data = opc->data;
@@ -443,14 +440,10 @@
case OPC_STATE_INIT:
break;
case OPC_STATE_STARTED:
- (void) GNUNET_TESTBED_release_time_slot_ (data->p1->host,
data->tslot_index,
- data);
GNUNET_TESTBED_remove_opc_ (opc->c, opc);
break;
case OPC_STATE_FINISHED:
- duration = GNUNET_TIME_absolute_get_duration (data->tstart);
- GNUNET_TESTBED_update_time_slot_ (data->p1->host, data->tslot_index, data,
- duration, data->failed);
+ break;
}
GNUNET_free (data);
GNUNET_free (opc);
Modified: gnunet/src/testbed/testbed_api_peers.h
===================================================================
--- gnunet/src/testbed/testbed_api_peers.h 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/testbed_api_peers.h 2013-08-19 14:13:19 UTC (rev
28705)
@@ -250,21 +250,6 @@
*/
struct OperationContext *sub_opc;
- /**
- * The starting time of this operation
- */
- struct GNUNET_TIME_Absolute tstart;
-
- /**
- * Has this operation failed
- */
- int failed;
-
- /**
- * The timing slot index for this operation
- */
- unsigned int tslot_index;
-
};
Modified: gnunet/src/testbed/testbed_api_statistics.c
===================================================================
--- gnunet/src/testbed/testbed_api_statistics.c 2013-08-19 14:03:01 UTC (rev
28704)
+++ gnunet/src/testbed/testbed_api_statistics.c 2013-08-19 14:13:19 UTC (rev
28705)
@@ -415,8 +415,8 @@
GNUNET_assert (NULL != proc);
GNUNET_assert (NULL != cont);
if (NULL == no_wait_queue)
- no_wait_queue =
- GNUNET_TESTBED_operation_queue_create_ (UINT_MAX);
+ no_wait_queue = GNUNET_TESTBED_operation_queue_create_
+ (OPERATION_QUEUE_TYPE_FIXED, UINT_MAX);
sc = GNUNET_malloc (sizeof (struct GetStatsContext));
sc->peers = peers;
sc->subsystem = (NULL == subsystem) ? NULL : GNUNET_strdup (subsystem);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28705 - gnunet/src/testbed,
gnunet <=