[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r36443 - in gnunet/src: consensus include set
From: |
gnunet |
Subject: |
[GNUnet-SVN] r36443 - in gnunet/src: consensus include set |
Date: |
Mon, 5 Oct 2015 23:26:56 +0200 |
Author: dold
Date: 2015-10-05 23:26:56 +0200 (Mon, 05 Oct 2015)
New Revision: 36443
Modified:
gnunet/src/consensus/gnunet-consensus-profiler.c
gnunet/src/consensus/gnunet-service-consensus.c
gnunet/src/consensus/test_consensus.conf
gnunet/src/include/gnunet_set_service.h
gnunet/src/set/gnunet-service-set.c
gnunet/src/set/set_api.c
Log:
work on consensus and set
- evil peers for consensus
- various fixes for consensus and set
Modified: gnunet/src/consensus/gnunet-consensus-profiler.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-profiler.c 2015-10-05 16:35:44 UTC
(rev 36442)
+++ gnunet/src/consensus/gnunet-consensus-profiler.c 2015-10-05 21:26:56 UTC
(rev 36443)
@@ -437,7 +437,7 @@
gettext_noop ("number of peers in consensus"),
GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers },
{ 'k', "value-replication", NULL,
- gettext_noop ("how many peers receive one value?"),
+ gettext_noop ("how many peers (random selection without replacement)
receive one value?"),
GNUNET_YES, &GNUNET_GETOPT_set_uint, &replication },
{ 'x', "num-values", NULL,
gettext_noop ("number of values"),
Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c 2015-10-05 16:35:44 UTC
(rev 36442)
+++ gnunet/src/consensus/gnunet-service-consensus.c 2015-10-05 21:26:56 UTC
(rev 36443)
@@ -128,26 +128,35 @@
PHASE_KIND_GRADECAST_CONFIRM,
PHASE_KIND_GRADECAST_CONFIRM_GRADE,
PHASE_KIND_GRADECAST_APPLY_RESULT,
+ /**
+ * Apply a repetition of the all-to-all
+ * gradecast to the current set.
+ */
+ PHASE_KIND_APPLY_REP,
PHASE_KIND_FINISH,
};
-enum ActionType
+enum TaskKind
{
/**
* Do a set reconciliation with another peer (or via looback).
*/
- ACTION_RECONCILE,
+ TASK_RECONCILE,
/**
+ * Same as reconciliation, but only care about added elements.
+ */
+ TASK_UNION,
+ /**
* Apply a referendum with a threshold
* to a set and/or a diff.
*/
- ACTION_EVAL_RFN,
+ TASK_EVAL_RFN,
/**
* Apply a diff to a set.
*/
- ACTION_APPLY_DIFF,
- ACTION_FINISH,
+ TASK_APPLY_DIFF,
+ FASK_FINISH,
};
enum SetKind
@@ -154,7 +163,7 @@
{
SET_KIND_NONE = 0,
SET_KIND_CURRENT,
- SET_KIND_LEADER,
+ SET_KIND_LEADER_PROPOSAL,
SET_KIND_ECHO_RESULT,
};
@@ -161,7 +170,8 @@
enum DiffKind
{
DIFF_KIND_NONE = 0,
- DIFF_KIND_LEADER,
+ DIFF_KIND_LEADER_PROPOSAL,
+ DIFF_KIND_LEADER_CONSENSUS,
DIFF_KIND_GRADECAST_RESULT,
};
@@ -170,9 +180,74 @@
RFN_KIND_NONE = 0,
RFN_KIND_ECHO,
RFN_KIND_CONFIRM,
+ RFN_KIND_GRADECAST_RESULT
};
+struct SetOpCls
+{
+ struct SetKey input_set;
+
+ struct SetKey output_set;
+ struct RfnKey output_rfn;
+ struct DiffKey output_diff;
+
+ int do_not_remove;
+
+ struct GNUNET_SET_OperationHandle *op;
+};
+
+struct EvalRfnCls
+{
+ struct SetKey input_set;
+ struct RfnKey input_rfn;
+
+ uint16_t threshold;
+
+ struct SetKey output_set;
+ struct DiffKey output_diff;
+};
+
+
+struct ApplyDiffCls
+{
+ struct SetKey input_set;
+ struct DiffKey input_diff;
+ struct SetKey output_set;
+};
+
+
+struct LeaderApplyCls
+{
+ struct DiffKey input_diff_1;
+ struct DiffKey input_diff_2;
+
+ struct RfnKey output_rfn;
+};
+
+
+struct FinishCls
+{
+ struct SetKey input_set;
+};
+
+/**
+ * Closure for both @a start_task
+ * and @a cancel_task.
+ */
+union TaskFuncCls
+{
+ struct SetOpCls setop;
+ struct EvalRfnCls eval_rfn;
+ struct ApplyDiffCls apply_diff;
+ struct LeaderApplyCls leader_apply;
+ struct FinishCls finish;
+};
+
+struct TaskEntry;
+
+typedef void (*TaskFunc) (struct TaskEntry *task);
+
/*
* Node in the consensus task graph.
*/
@@ -182,30 +257,16 @@
struct Step *step;
- int is_running;
+ int is_started;
int is_finished;
- enum ActionType action;
+ enum TaskKind kind;
- struct SetKey input_set;
- struct DiffKey input_diff;
- struct RfnKey input_rfn;
- struct SetKey output_set;
- struct DiffKey output_diff;
- struct RfnKey output_rfn;
+ TaskFunc start;
+ TaskFunc cancel;
- /**
- * Threshold when evaluating referendums.
- */
- uint16_t threshold;
-
- /**
- * Operation that is running for this task.
- */
- struct GNUNET_SET_OperationHandle *op;
-
- struct GNUNET_SET_Handle *commited_set;
+ union TaskFuncCls cls;
};
@@ -280,17 +341,10 @@
char *debug_name;
};
-struct RfnPeerInfo
-{
- /* Peers can propose changes,
- * but they are only accepted once
- * the whole set operation is done. */
- int is_commited;
-};
struct RfnElementInfo
{
- struct GNUNET_SET_Element *element;
+ const struct GNUNET_SET_Element *element;
/*
* Vote (or VOTE_NONE) from every peer
@@ -323,12 +377,20 @@
* not counted for majority votes or thresholds.
*/
int *peer_commited;
+
+
+ /**
+ * Contestation state of the peer. If a peer is contested, the values it
+ * contributed are still counted for applying changes, but the grading is
+ * affected.
+ */
+ int *peer_contested;
};
struct DiffElementInfo
{
- struct GNUNET_SET_Element *element;
+ const struct GNUNET_SET_Element *element;
/**
* Positive weight for 'add', negative
@@ -468,13 +530,13 @@
finish_task (struct TaskEntry *task);
static void
-run_task_remote_union (struct ConsensusSession *session, struct TaskEntry
*task);
+task_start_reconcile (struct TaskEntry *task);
static void
-run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task);
+task_start_eval_rfn (struct TaskEntry *task);
static void
-run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task);
+task_start_apply_diff (struct TaskEntry *task);
static void
run_ready_steps (struct ConsensusSession *session);
@@ -492,6 +554,7 @@
case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
+ case PHASE_KIND_APPLY_REP: return "APPLY_REP";
default: return "(unknown)";
}
}
@@ -503,7 +566,7 @@
switch (kind)
{
case SET_KIND_CURRENT: return "CURRENT";
- case SET_KIND_LEADER: return "LEADER";
+ case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
case SET_KIND_NONE: return "NONE";
default: return "(unknown)";
}
@@ -527,13 +590,27 @@
switch (kind)
{
case DIFF_KIND_NONE: return "NONE";
- case DIFF_KIND_LEADER: return "LEADER";
+ case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
+ case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
default: return "(unknown)";
}
}
+#ifdef GNUNET_EXTRA_LOGGING
+
+
static const char *
+debug_str_element (const struct GNUNET_SET_Element *el)
+{
+ struct GNUNET_HashCode hash;
+
+ GNUNET_SET_element_hash (el, &hash);
+
+ return GNUNET_h2s (&hash);
+}
+
+static const char *
debug_str_task_key (struct TaskKey *tk)
{
static char buf[256];
@@ -583,7 +660,9 @@
return buf;
}
+#endif /* GNUNET_EXTRA_LOGGING */
+
/**
* Destroy a session, free all resources associated with it.
*
@@ -602,6 +681,8 @@
{
GNUNET_MQ_destroy (session->client_mq);
session->client_mq = NULL;
+ /* The MQ cleanup will also disconnect the underlying client. */
+ session->client = NULL;
}
if (NULL != session->client)
{
@@ -634,8 +715,9 @@
struct GNUNET_CONSENSUS_ElementMessage *m;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "P%d: got element for client\n",
- session->local_peer_idx);
+ "P%d: sending element %s to client\n",
+ session->local_peer_idx,
+ debug_str_element (element));
ev = GNUNET_MQ_msg_extra (m, element->size,
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
@@ -736,7 +818,36 @@
int weight,
const struct GNUNET_SET_Element *element)
{
- GNUNET_assert (0);
+ struct DiffElementInfo *di;
+ struct GNUNET_HashCode hash;
+
+ GNUNET_assert ( (1 == weight) || (-1 == weight));
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "diff_insert with element size %u\n",
+ element->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "hashing element\n");
+
+ GNUNET_SET_element_hash (element, &hash);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "hashed element\n");
+
+ di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
+
+ if (NULL == di)
+ {
+ di = GNUNET_new (struct DiffElementInfo);
+ di->element = GNUNET_SET_element_dup (element);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (diff->changes,
+ &hash, di,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+ }
+
+ di->weight = weight;
}
@@ -747,10 +858,39 @@
int vote,
const struct GNUNET_SET_Element *element)
{
+ struct RfnElementInfo *ri;
+ struct GNUNET_HashCode hash;
+
GNUNET_assert (voting_peer < num_peers);
- GNUNET_assert (0);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "voting for element of size %u\n",
+ element->size);
+
+ rfn->peer_commited[voting_peer] = GNUNET_YES;
+
+ GNUNET_SET_element_hash (element, &hash);
+ ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
+
+
+ if (NULL == ri)
+ {
+ ri = GNUNET_new (struct RfnElementInfo);
+ ri->element = GNUNET_SET_element_dup (element);
+ ri->votes = GNUNET_new_array (num_peers, int);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
+ &hash, ri,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "rfn vote element %p\n",
+ ri->element);
+ ri->votes[voting_peer] = vote;
}
+
uint16_t
task_other_peer (struct TaskEntry *task)
{
@@ -760,6 +900,7 @@
return task->key.peer1;
}
+
/**
* Callback for set operation results. Called for each element
* in the result set.
@@ -779,6 +920,10 @@
struct DiffEntry *output_diff = NULL;
struct ReferendumEntry *output_rfn = NULL;
unsigned int other_idx;
+ struct SetOpCls *setop;
+
+ setop = &task->cls.setop;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"P%u: got set result for {%s}, status %u\n",
@@ -786,7 +931,7 @@
debug_str_task_key (&task->key),
status);
- if (GNUNET_NO == task->is_running)
+ if (GNUNET_NO == task->is_started)
{
GNUNET_break_op (0);
return;
@@ -808,14 +953,23 @@
GNUNET_assert (0);
}
- if (SET_KIND_NONE != task->output_set.set_kind)
- output_set = lookup_set (session, &task->output_set);
+ if (SET_KIND_NONE != setop->output_set.set_kind)
+ {
+ output_set = lookup_set (session, &setop->output_set);
+ GNUNET_assert (NULL != output_set);
+ }
- if (DIFF_KIND_NONE != task->output_diff.diff_kind)
- output_diff = lookup_diff (session, &task->output_diff);
+ if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
+ {
+ output_diff = lookup_diff (session, &setop->output_diff);
+ GNUNET_assert (NULL != output_diff);
+ }
- if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
- output_rfn = lookup_rfn (session, &task->output_rfn);
+ if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
+ {
+ output_rfn = lookup_rfn (session, &setop->output_rfn);
+ GNUNET_assert (NULL != output_rfn);
+ }
if (GNUNET_YES == session->peers_ignored[other_idx])
{
@@ -827,8 +981,10 @@
switch (status)
{
- // case GNUNET_SET_STATUS_MISSING_LOCAL:
- case GNUNET_SET_STATUS_OK:
+ case GNUNET_SET_STATUS_ADD_LOCAL:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding element in Task {%s}\n",
+ debug_str_task_key (&task->key));
if (NULL != output_set)
{
// FIXME: record pending adds, use callback
@@ -836,25 +992,95 @@
element,
NULL,
NULL);
-
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: adding element %s into set {%s} of task {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (element),
+ debug_str_set_key (&setop->output_set),
+ debug_str_task_key (&task->key));
+#endif
}
if (NULL != output_diff)
{
diff_insert (output_diff, 1, element);
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: adding element %s into diff {%s} of task {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (element),
+ debug_str_diff_key (&setop->output_diff),
+ debug_str_task_key (&task->key));
+#endif
}
if (NULL != output_rfn)
{
rfn_vote (output_rfn, task_other_peer (task), session->num_peers,
VOTE_ADD, element);
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: adding element %s into rfn {%s} of task {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (element),
+ debug_str_rfn_key (&setop->output_rfn),
+ debug_str_task_key (&task->key));
+#endif
}
// XXX: add result to structures in task
break;
- //case GNUNET_SET_STATUS_MISSING_REMOTE:
- // // XXX: add result to structures in task
- // break;
+ case GNUNET_SET_STATUS_ADD_REMOTE:
+ if (GNUNET_YES == setop->do_not_remove)
+ break;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing element in Task {%s}\n",
+ debug_str_task_key (&task->key));
+ if (NULL != output_set)
+ {
+ // FIXME: record pending adds, use callback
+ GNUNET_SET_remove_element (output_set->h,
+ element,
+ NULL,
+ NULL);
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: removing element %s from set {%s} of task {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (element),
+ debug_str_set_key (&setop->output_set),
+ debug_str_task_key (&task->key));
+#endif
+ }
+ if (NULL != output_diff)
+ {
+ diff_insert (output_diff, -1, element);
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: removing element %s from diff {%s} of task {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (element),
+ debug_str_diff_key (&setop->output_diff),
+ debug_str_task_key (&task->key));
+#endif
+ }
+ if (NULL != output_rfn)
+ {
+ rfn_vote (output_rfn, task_other_peer (task), session->num_peers,
VOTE_REMOVE, element);
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: removing element %s from rfn {%s} of task {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (element),
+ debug_str_rfn_key (&setop->output_rfn),
+ debug_str_task_key (&task->key));
+#endif
+ }
+ break;
case GNUNET_SET_STATUS_DONE:
// XXX: check first if any changes to the underlying
// set are still pending
// XXX: commit other peer in referendum
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Finishing setop in Task {%s}\n",
+ debug_str_task_key (&task->key));
finish_task (task);
break;
case GNUNET_SET_STATUS_FAILURE:
@@ -867,8 +1093,86 @@
}
}
+#ifdef EVIL
+enum Evilness
+{
+ EVILNESS_NONE,
+ EVILNESS_CRAM,
+ EVILNESS_SLACK,
+};
+static void
+get_evilness (struct ConsensusSession *session, enum Evilness *ret_type,
unsigned int *ret_num)
+{
+ char *evil_spec;
+ char *field;
+ char *evil_type_str = NULL;
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus",
"EVIL_SPEC", &evil_spec))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "P%u: no evilness\n",
+ session->local_peer_idx);
+ *ret_type = EVILNESS_NONE;
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "P%u: got evilness spec\n",
+ session->local_peer_idx);
+
+ for (field = strtok (evil_spec, "/");
+ NULL != field;
+ field = strtok (NULL, "/"))
+ {
+ unsigned int peer_num;
+ unsigned int evil_num;
+ int ret;
+
+ evil_type_str = NULL;
+
+ ret = sscanf (field, "%u;%m[a-z];%u", &peer_num, &evil_type_str,
&evil_num);
+
+ if (ret != 3)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC,
behaving like a good peer.\n",
+ field);
+ goto not_evil;
+ }
+
+ GNUNET_assert (NULL != evil_type_str);
+
+ if (peer_num == session->local_peer_idx)
+ {
+ if (0 == strcmp ("slack", evil_type_str))
+ *ret_type = EVILNESS_SLACK;
+ else if (0 == strcmp ("cram", evil_type_str))
+ {
+ *ret_type = EVILNESS_CRAM;
+ *ret_num = evil_num;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in
EVIL_SPEC (unknown type), behaving like a good peer.\n");
+ goto not_evil;
+ }
+ goto cleanup;
+ }
+ /* No GNUNET_free since memory was allocated by libc */
+ free (evil_type_str);
+ evil_type_str = NULL;
+ }
+not_evil:
+ *ret_type = EVILNESS_NONE;
+cleanup:
+ GNUNET_free (evil_spec);
+ if (NULL != evil_type_str)
+ free (evil_type_str);
+}
+
+#endif
+
+
/**
* Commit the appropriate set for a
* task.
@@ -878,11 +1182,67 @@
struct TaskEntry *task)
{
struct SetEntry *set;
+ struct SetOpCls *setop = &task->cls.setop;
- GNUNET_assert (NULL != task->op);
- set = lookup_set (session, &task->input_set);
+ GNUNET_assert (NULL != setop->op);
+ set = lookup_set (session, &setop->input_set);
GNUNET_assert (NULL != set);
- GNUNET_SET_commit (task->op, set->h);
+
+#ifdef EVIL
+ {
+ unsigned int i;
+ unsigned int evil_num;
+ enum Evilness evilness;
+
+ get_evilness (session, &evilness, &evil_num);
+ switch (evilness)
+ {
+ case EVILNESS_CRAM:
+ /* We're not cramming elements in the
+ all-to-all round, since that would just
+ add more elements to the result set, but
+ wouldn't test robustness. */
+ if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
+ {
+ GNUNET_SET_commit (setop->op, set->h);
+ break;
+ }
+ for (i = 0; i < evil_num; i++)
+ {
+ struct GNUNET_HashCode hash;
+ struct GNUNET_SET_Element element;
+ element.data = &hash;
+ element.size = sizeof (struct GNUNET_HashCode);
+ element.element_type = 0;
+
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG,
&hash);
+ GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: evil peer: cramming element %s into set {%s} of
task {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (&element),
+ debug_str_set_key (&setop->input_set),
+ debug_str_task_key (&task->key));
+#endif
+ }
+ GNUNET_SET_commit (setop->op, set->h);
+ break;
+ case EVILNESS_SLACK:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: evil peer: slacking\n",
+ session->local_peer_idx,
+ evil_num);
+ /* Do nothing. */
+ break;
+ case EVILNESS_NONE:
+ GNUNET_SET_commit (setop->op, set->h);
+ break;
+ }
+ }
+#else
+ GNUNET_SET_commit (setop->op, set->h);
+#endif
}
@@ -892,6 +1252,8 @@
{
struct GNUNET_HashCode hash;
+ GNUNET_assert (NULL != diff);
+
GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash,
diff,
@@ -906,6 +1268,10 @@
GNUNET_assert (NULL != set->h);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Putting set %s\n",
+ debug_str_set_key (&set->key));
+
GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash,
set,
@@ -931,26 +1297,183 @@
output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
{
struct TaskEntry *task = (struct TaskEntry *) cls;
+ struct SetOpCls *setop = &task->cls.setop;
struct ConsensusSession *session = task->step->session;
struct SetEntry *set = GNUNET_new (struct SetEntry);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"P%u: Received lazy copy, storing output set %s\n",
- session->local_peer_idx, debug_str_set_key (&task->output_set));
+ session->local_peer_idx, debug_str_set_key (&setop->output_set));
- set->key = task->output_set;
+ set->key = setop->output_set;
set->h = copy;
put_set (task->step->session, set);
- run_task_remote_union (task->step->session, task);
+ task_start_reconcile (task);
}
static void
-run_task_remote_union (struct ConsensusSession *session, struct TaskEntry
*task)
+task_cancel_reconcile (struct TaskEntry *task)
{
+ /* not implemented yet */
+ GNUNET_assert (0);
+}
+
+
+static void
+apply_diff_to_rfn (struct DiffEntry *diff,
+ struct ReferendumEntry *rfn,
+ uint16_t voting_peer,
+ uint16_t num_peers)
+{
+ struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+ struct DiffElementInfo *di;
+
+ iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
+
+ while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &di))
+ {
+ if (di->weight > 0)
+ {
+ rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element);
+ }
+ if (di->weight < 0)
+ {
+ rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element);
+ }
+ }
+
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+}
+
+
+struct DiffEntry *
+diff_create ()
+{
+ struct DiffEntry *d = GNUNET_new (struct DiffEntry);
+
+ d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
+
+ return d;
+}
+
+
+struct DiffEntry *
+diff_compose (struct DiffEntry *diff_1,
+ struct DiffEntry *diff_2)
+{
+ struct DiffEntry *diff_new;
+ struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+ struct DiffElementInfo *di;
+
+ diff_new = diff_create ();
+
+ iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
+ while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &di))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "iterating first diff\n");
+ diff_insert (diff_new, di->weight, di->element);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "insert done\n");
+ }
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "iterating first diff done\n");
+
+ iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
+ while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &di))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "iterating second diff\n");
+ diff_insert (diff_new, di->weight, di->element);
+ }
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "iterating second diff done\n");
+
+ return diff_new;
+}
+
+
+struct ReferendumEntry *
+rfn_create (uint16_t size)
+{
+ struct ReferendumEntry *rfn;
+
+ rfn = GNUNET_new (struct ReferendumEntry);
+ rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
+ rfn->peer_commited = GNUNET_new_array (size, int);
+
+ return rfn;
+}
+
+
+static void
+diff_destroy (struct DiffEntry *diff)
+{
+ GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
+ GNUNET_free (diff);
+}
+
+
+static void
+task_start_leader_apply (struct TaskEntry *task)
+{
+ struct LeaderApplyCls *lacls = &task->cls.leader_apply;
+ struct ConsensusSession *session = task->step->session;
+ struct DiffEntry *diff_1;
+ struct DiffEntry *diff_2;
+ struct DiffEntry *diff_combined;
+ struct ReferendumEntry *rfn;
+
+ diff_1 = lookup_diff (session, &lacls->input_diff_1);
+ GNUNET_assert (NULL != diff_1);
+
+ diff_2 = lookup_diff (session, &lacls->input_diff_2);
+ GNUNET_assert (NULL != diff_2);
+
+ rfn = lookup_rfn (session, &lacls->output_rfn);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "looked up everything\n");
+
+ if (NULL == rfn)
+ {
+ rfn = rfn_create (session->num_peers);
+ rfn->key = lacls->output_rfn;
+ put_rfn (session, rfn);
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "ensured rfn\n");
+
+ diff_combined = diff_compose (diff_1, diff_2);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "composed diffs\n");
+
+ apply_diff_to_rfn (diff_combined, rfn, task->key.leader, session->num_peers);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "applied diffs to rfns\n");
+
+ diff_destroy (diff_combined);
+
+ finish_task (task);
+}
+
+
+static void
+task_start_reconcile (struct TaskEntry *task)
+{
struct SetEntry *input;
+ struct SetOpCls *setop = &task->cls.setop;
+ struct ConsensusSession *session = task->step->session;
- input = lookup_set (session, &task->input_set);
+ input = lookup_set (session, &setop->input_set);
GNUNET_assert (NULL != input);
GNUNET_assert (NULL != input->h);
@@ -959,11 +1482,11 @@
because we want something valid in there, even
if the other peer doesn't talk to us */
- if (SET_KIND_NONE != task->output_set.set_kind)
+ if (SET_KIND_NONE != setop->output_set.set_kind)
{
/* If we don't have an existing output set,
we clone the input set. */
- if (NULL == lookup_set (session, &task->output_set))
+ if (NULL == lookup_set (session, &setop->output_set))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Output set missing, copying from input set\n");
@@ -975,9 +1498,9 @@
}
}
- if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
+ if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
{
- if (NULL == lookup_rfn (session, &task->output_rfn))
+ if (NULL == lookup_rfn (session, &setop->output_rfn))
{
struct ReferendumEntry *rfn;
@@ -984,16 +1507,26 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"P%u: output rfn <%s> missing, creating.\n",
session->local_peer_idx,
- debug_str_rfn_key (&task->output_rfn));
+ debug_str_rfn_key (&setop->output_rfn));
- rfn = GNUNET_new (struct ReferendumEntry);
- rfn->key = task->output_rfn;
- rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
- rfn->peer_commited = GNUNET_new_array (session->num_peers, int);
+ rfn = rfn_create (session->num_peers);
+ rfn->key = setop->output_rfn;
put_rfn (session, rfn);
}
}
+ if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
+ {
+ if (NULL == lookup_diff (session, &setop->output_diff))
+ {
+ struct DiffEntry *diff;
+
+ diff = diff_create ();
+ diff->key = setop->output_diff;
+ put_diff (session, diff);
+ }
+ }
+
if (task->key.peer1 == session->local_peer_idx)
{
struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
@@ -1001,7 +1534,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"P%u: Looking up set {%s} to run remote union\n",
session->local_peer_idx,
- debug_str_set_key (&task->input_set));
+ debug_str_set_key (&setop->input_set));
rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
rcm.header.size = htons (sizeof (struct
GNUNET_CONSENSUS_RoundContextMessage));
@@ -1012,23 +1545,20 @@
rcm.leader = htons (task->key.leader);
rcm.repetition = htons (task->key.repetition);
- GNUNET_assert (NULL == task->op);
+ GNUNET_assert (NULL == setop->op);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our
set is %s\n",
- session->local_peer_idx, task->key.peer2, debug_str_set_key
(&task->input_set));
+ session->local_peer_idx, task->key.peer2, debug_str_set_key
(&setop->input_set));
// XXX: maybe this should be done while
// setting up tasks alreays?
- task->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
- &session->global_id,
- &rcm.header,
- GNUNET_SET_RESULT_ADDED, /* XXX: will be
obsolete soon */
- set_result_cb,
- task);
+ setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
+ &session->global_id,
+ &rcm.header,
+ GNUNET_SET_RESULT_SYMMETRIC,
+ set_result_cb,
+ task);
- /* Referendums must be materialized as a set before */
- GNUNET_assert (RFN_KIND_NONE == task->input_rfn.rfn_kind);
-
- if (GNUNET_OK != GNUNET_SET_commit (task->op, input->h))
+ if (GNUNET_OK != GNUNET_SET_commit (setop->op, input->h))
{
GNUNET_break (0);
/* XXX: cleanup? */
@@ -1041,9 +1571,8 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
session->local_peer_idx, task->key.peer1);
- if (NULL != task->op)
+ if (NULL != setop->op)
{
- GNUNET_assert (NULL == task->commited_set);
commit_set (session, task);
}
}
@@ -1159,11 +1688,11 @@
set = GNUNET_new (struct SetEntry);
set->h = copy;
- set->key = task->output_set;
+ set->key = task->cls.eval_rfn.output_set;
put_set (session, set);
- run_task_eval_rfn (session, task);
+ task_start_eval_rfn (task);
}
@@ -1173,7 +1702,7 @@
* set and store the result in the output set and/or output diff.
*/
static void
-run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_eval_rfn (struct TaskEntry *task)
{
struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
struct ReferendumEntry *input_rfn;
@@ -1181,24 +1710,23 @@
struct SetEntry *output_set = NULL;
struct DiffEntry *output_diff = NULL;
struct SetChangeProgressCls *progress_cls;
+ struct EvalRfnCls *rcls = &task->cls.eval_rfn;
+ struct ConsensusSession *session = task->step->session;
/* Have at least one output */
- GNUNET_assert ( (task->output_set.set_kind != SET_KIND_NONE) ||
- (task->output_diff.diff_kind != DIFF_KIND_NONE));
+ GNUNET_assert ( (rcls->output_set.set_kind != SET_KIND_NONE) ||
+ (rcls->output_diff.diff_kind != DIFF_KIND_NONE));
- /* Not allowed as output */
- GNUNET_assert ( (task->output_rfn.rfn_kind == RFN_KIND_NONE));
-
- if (SET_KIND_NONE != task->output_set.set_kind)
+ if (SET_KIND_NONE != rcls->output_set.set_kind)
{
/* We have a set output, thus the output set must
exist or copy it from the input set */
- output_set = lookup_set (session, &task->output_set);
+ output_set = lookup_set (session, &rcls->output_set);
if (NULL == output_set)
{
struct SetEntry *input_set;
- input_set = lookup_set (session, &task->input_set);
+ input_set = lookup_set (session, &rcls->input_set);
GNUNET_assert (NULL != input_set);
GNUNET_SET_copy_lazy (input_set->h,
eval_rfn_copy_cb,
@@ -1209,21 +1737,26 @@
}
}
- if (DIFF_KIND_NONE != task->output_diff.diff_kind)
+ if (DIFF_KIND_NONE != rcls->output_diff.diff_kind)
{
- output_diff = lookup_diff (session, &task->output_diff);
+ output_diff = lookup_diff (session, &rcls->output_diff);
if (NULL == output_diff)
{
- output_diff = GNUNET_new (struct DiffEntry);
- output_diff->key = task->output_diff;
- output_diff->changes = GNUNET_CONTAINER_multihashmap_create (8,
GNUNET_NO);
+ output_diff = diff_create ();
+ output_diff->key = rcls->output_diff;
put_diff (session, output_diff);
}
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Evaluating referendum in Task {%s}\n",
+ debug_str_task_key (&task->key));
+
+
progress_cls = GNUNET_new (struct SetChangeProgressCls);
+ progress_cls->task = task;
- input_rfn = lookup_rfn (session, &task->input_rfn);
+ input_rfn = lookup_rfn (session, &rcls->input_rfn);
GNUNET_assert (NULL != input_rfn);
@@ -1232,18 +1765,28 @@
while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &ri))
{
- int majority_vote = rfn_majority (session->num_peers, input_rfn, ri,
task->threshold);
+ int majority_vote = rfn_majority (session->num_peers, input_rfn, ri,
rcls->threshold);
switch (majority_vote)
{
case VOTE_ADD:
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "P%u: referendum vote result: VOTE_ADD for element %s in
task {%s} with"
+ "output set {%s} and output diff {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (ri->element),
+ debug_str_task_key (&task->key),
+ debug_str_set_key (&rcls->output_set),
+ debug_str_diff_key (&rcls->output_diff));
+#endif
if (NULL != output_set)
{
progress_cls->num_pending++;
GNUNET_assert (GNUNET_OK ==
GNUNET_SET_add_element (output_set->h,
- ri->element,
- eval_rfn_progress,
- progress_cls));
+ ri->element,
+ eval_rfn_progress,
+ progress_cls));
}
if (NULL != output_diff)
{
@@ -1253,16 +1796,37 @@
case VOTE_CONTESTED:
if (NULL != output_set)
output_set->is_contested = GNUNET_YES;
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "P%u: referendum vote result: VOTE_CONTESTED for element
%s in task {%s} with"
+ "output set {%s} and output diff {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (ri->element),
+ debug_str_task_key (&task->key),
+ debug_str_set_key (&rcls->output_set),
+ debug_str_diff_key (&rcls->output_diff));
+#endif
/* fallthrough */
case VOTE_REMOVE:
+#ifdef GNUNET_EXTRA_LOGGING
+ if (VOTE_REMOVE == majority_vote)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "P%u: referendum vote result: VOTE_REMOVE for element %s
in task {%s} with"
+ "output set {%s} and output diff {%s}\n",
+ session->local_peer_idx,
+ debug_str_element (ri->element),
+ debug_str_task_key (&task->key),
+ debug_str_set_key (&rcls->output_set),
+ debug_str_diff_key (&rcls->output_diff));
+#endif
if (NULL != output_set)
{
progress_cls->num_pending++;
GNUNET_assert (GNUNET_OK ==
GNUNET_SET_remove_element (output_set->h,
- ri->element,
- eval_rfn_progress,
- progress_cls));
+ ri->element,
+ eval_rfn_progress,
+ progress_cls));
}
if (NULL != output_diff)
{
@@ -1270,6 +1834,8 @@
}
break;
case VOTE_NONE:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "referendum vote result: VOTE_NONE\n");
/* Nothing to do. */
break;
default:
@@ -1294,14 +1860,15 @@
struct TaskEntry *task = (struct TaskEntry *) cls;
struct ConsensusSession *session = task->step->session;
struct SetEntry *set;
+ struct ApplyDiffCls *diffop = &task->cls.apply_diff;
set = GNUNET_new (struct SetEntry);
set->h = copy;
- set->key = task->output_set;
+ set->key = diffop->output_set;
put_set (session, set);
- run_task_apply_diff (session, task);
+ task_start_apply_diff (task);
}
@@ -1334,7 +1901,7 @@
static void
-run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_apply_diff (struct TaskEntry *task)
{
struct SetEntry *output_set;
struct DiffEntry *input_diff;
@@ -1341,21 +1908,23 @@
struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
struct DiffElementInfo *di;
struct SetChangeProgressCls *progress_cls;
+ struct ApplyDiffCls *diffop = &task->cls.apply_diff;
+ struct ConsensusSession *session = task->step->session;
- GNUNET_assert (task->output_set.set_kind != SET_KIND_NONE);
- GNUNET_assert (task->input_diff.diff_kind != DIFF_KIND_NONE);
+ GNUNET_assert (diffop->output_set.set_kind != SET_KIND_NONE);
+ GNUNET_assert (diffop->input_diff.diff_kind != DIFF_KIND_NONE);
- input_diff = lookup_diff (session, &task->input_diff);
+ input_diff = lookup_diff (session, &diffop->input_diff);
GNUNET_assert (NULL != input_diff);
- output_set = lookup_set (session, &task->output_set);
+ output_set = lookup_set (session, &diffop->output_set);
if (NULL == output_set)
{
struct SetEntry *input_set;
- input_set = lookup_set (session, &task->input_set);
+ input_set = lookup_set (session, &diffop->input_set);
GNUNET_assert (NULL != input_set);
GNUNET_SET_copy_lazy (input_set->h,
apply_diff_copy_cb,
@@ -1403,11 +1972,12 @@
static void
-run_task_finish (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_finish (struct TaskEntry *task)
{
struct SetEntry *final_set;
+ struct ConsensusSession *session = task->step->session;
- final_set = lookup_set (session, &task->input_set);
+ final_set = lookup_set (session, &task->cls.finish.input_set);
GNUNET_assert (NULL != final_set);
@@ -1418,37 +1988,17 @@
}
static void
-run_task (struct ConsensusSession *session, struct TaskEntry *task)
+start_task (struct ConsensusSession *session, struct TaskEntry *task)
{
- GNUNET_assert (GNUNET_NO == task->is_running);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n",
session->local_peer_idx, debug_str_task_key (&task->key));
+
+ GNUNET_assert (GNUNET_NO == task->is_started);
GNUNET_assert (GNUNET_NO == task->is_finished);
+ GNUNET_assert (NULL != task->start);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running task {%s}\n",
session->local_peer_idx, debug_str_task_key (&task->key));
+ task->start (task);
- switch (task->action)
- {
- case ACTION_RECONCILE:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_RECONCILE
task\n", session->local_peer_idx);
- run_task_remote_union (session, task);
- break;
- case ACTION_EVAL_RFN:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_EVAL_RFN
task\n", session->local_peer_idx);
- run_task_eval_rfn (session, task);
- break;
- case ACTION_APPLY_DIFF:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_APPLY_DIFF
task\n", session->local_peer_idx);
- run_task_apply_diff (session, task);
- break;
- case ACTION_FINISH:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_FINISH
task\n", session->local_peer_idx);
- run_task_finish (session, task);
- break;
- default:
- /* not reached */
- GNUNET_assert (0);
- }
- task->is_running = GNUNET_YES;
+ task->is_started = GNUNET_YES;
}
@@ -1515,7 +2065,7 @@
step->is_running = GNUNET_YES;
for (i = 0; i < step->tasks_len; i++)
- run_task (session, step->tasks[i]);
+ start_task (session, step->tasks[i]);
/* Sometimes there is no task to trigger finishing the step, so we have
to do it here. */
if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO ==
step->is_finished))
@@ -1730,12 +2280,6 @@
return;
}
- if (ACTION_RECONCILE != task->action)
- {
- GNUNET_break_op (0);
- return;
- }
-
if (GNUNET_YES == task->is_finished)
{
GNUNET_break_op (0);
@@ -1754,8 +2298,8 @@
else
my_result_cb = set_result_cb;
- task->op = GNUNET_SET_accept (request,
- GNUNET_SET_RESULT_ADDED, /* XXX: obsolete soon
*/
+ task->cls.setop.op = GNUNET_SET_accept (request,
+ GNUNET_SET_RESULT_SYMMETRIC,
my_result_cb,
task);
@@ -1762,7 +2306,7 @@
/* If the task hasn't been started yet,
we wait for that until we commit. */
- if (GNUNET_YES == task->is_running)
+ if (GNUNET_YES == task->is_started)
{
commit_set (session, task);
}
@@ -1969,11 +2513,11 @@
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n",
session->local_peer_idx, p1, p2, rep, lead);
task = ((struct TaskEntry) {
.step = step,
- .action = ACTION_RECONCILE,
+ .start = task_start_reconcile,
+ .cancel = task_cancel_reconcile,
.key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me
},
- .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
- .output_set = (struct SetKey) { SET_KIND_NONE },
});
+ task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
put_task (session->taskmap, &task);
}
/* We run this task to make sure that the leader
@@ -1982,12 +2526,13 @@
without the code having to handle any special cases. */
task = ((struct TaskEntry) {
.step = step,
- .action = ACTION_RECONCILE,
.key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
- .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
- .output_set = (struct SetKey) { SET_KIND_LEADER, rep, me },
- .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, me },
+ .start = task_start_reconcile,
+ .cancel = task_cancel_reconcile,
});
+ task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
+ task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL,
rep, me };
+ task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
rep, me };
put_task (session->taskmap, &task);
}
else
@@ -1998,12 +2543,13 @@
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n",
session->local_peer_idx, p1, p2, rep, lead);
task = ((struct TaskEntry) {
.step = step,
- .action = ACTION_RECONCILE,
.key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
lead},
- .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
- .output_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
- .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, lead },
+ .start = task_start_reconcile,
+ .cancel = task_cancel_reconcile,
});
+ task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
+ task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL,
rep, lead };
+ task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
rep, lead };
put_task (session->taskmap, &task);
}
@@ -2022,11 +2568,12 @@
arrange_peers (&p1, &p2, n);
task = ((struct TaskEntry) {
.step = step,
- .action = ACTION_RECONCILE,
.key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
- .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
- .output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
+ .start = task_start_reconcile,
+ .cancel = task_cancel_reconcile,
});
+ task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL,
rep, lead };
+ task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
put_task (session->taskmap, &task);
}
@@ -2041,12 +2588,12 @@
task = ((struct TaskEntry) {
.key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep,
lead },
.step = step,
- .action = ACTION_EVAL_RFN,
- .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
- .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
- .output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
- .threshold = n - t,
+ .start = task_start_eval_rfn
});
+ task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL,
rep, lead },
+ task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
+ task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep,
lead },
+ task.cls.eval_rfn.threshold = n - t,
put_task (session->taskmap, &task);
prev_step = step;
@@ -2064,11 +2611,12 @@
arrange_peers (&p1, &p2, n);
task = ((struct TaskEntry) {
.step = step,
- .action = ACTION_RECONCILE,
+ .start = task_start_reconcile,
+ .cancel = task_cancel_reconcile,
.key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep,
lead},
- .input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
- .output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead },
});
+ task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep,
lead };
+ task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead
};
put_task (session->taskmap, &task);
}
@@ -2081,15 +2629,34 @@
// evaluate ConfirmationReferendum and
// apply it to the LeaderReferendum
+ // XXX: the diff should contain grading information
task = ((struct TaskEntry) {
.step = step,
.key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep,
lead },
- .action = ACTION_EVAL_RFN,
- .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
- .output_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, rep },
+ .start = task_start_eval_rfn,
});
+ task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
+ task.cls.eval_rfn.output_diff = (struct DiffKey) {
DIFF_KIND_LEADER_CONSENSUS, rep, lead };
put_task (session->taskmap, &task);
+
+ prev_step = step;
+ step = create_step (session, round, 1);
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_asprintf (&step->debug_name, "gc apply, lead %u rep %u", lead, rep);
+#endif
+ step_depend_on (step, prev_step);
+
+ task = ((struct TaskEntry) {
+ .step = step,
+ .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, rep,
lead },
+ .start = task_start_leader_apply,
+ });
+ task.cls.leader_apply.input_diff_1 = (struct DiffKey) {
DIFF_KIND_LEADER_PROPOSAL, rep, lead };
+ task.cls.leader_apply.input_diff_2 = (struct DiffKey) {
DIFF_KIND_LEADER_CONSENSUS, rep, lead };
+ task.cls.leader_apply.output_rfn = (struct RfnKey) {
RFN_KIND_GRADECAST_RESULT, rep };
+ put_task (session->taskmap, &task);
+
step_depend_on (step_after, step);
}
@@ -2142,10 +2709,12 @@
task = ((struct TaskEntry) {
.key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
.step = step,
- .action = ACTION_RECONCILE,
- .input_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
- .output_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
+ .start = task_start_reconcile,
+ .cancel = task_cancel_reconcile,
});
+ task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
+ task.cls.setop.output_set = task.cls.setop.input_set;
+ task.cls.setop.do_not_remove = GNUNET_YES;
put_task (session->taskmap, &task);
}
@@ -2164,7 +2733,7 @@
step_rep_start = create_step (session, round, 1);
#ifdef GNUNET_EXTRA_LOGGING
- GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u",
i);
+ GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
#endif
step_depend_on (step_rep_start, prev_step);
@@ -2171,7 +2740,7 @@
step_rep_end = create_step (session, round, 1);
#ifdef GNUNET_EXTRA_LOGGING
- GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
+ GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
#endif
/* parallel gradecasts */
@@ -2178,18 +2747,16 @@
for (lead = 0; lead < n; lead++)
construct_task_graph_gradecast (session, i, lead, step_rep_start,
step_rep_end);
- // TODO: add peers to ignore list,
- //
- // evaluate ConfirmationReferendum and
- // apply it to the LeaderReferendum
+ // TODO: add peers to ignore list, either here or
+ // already in the gradecast.
task = ((struct TaskEntry) {
.step = step_rep_end,
- .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, i,
-1},
- .action = ACTION_APPLY_DIFF,
- .input_set = (struct SetKey) { SET_KIND_CURRENT, i },
- .input_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, i },
- .output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 },
+ .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
+ .start = task_start_eval_rfn,
});
+ task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_CURRENT, i };
+ task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT,
i };
+ task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 };
put_task (session->taskmap, &task);
prev_step = step_rep_end;
@@ -2206,9 +2773,9 @@
task = ((struct TaskEntry) {
.step = step,
.key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
- .input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 },
- .action = ACTION_FINISH,
+ .start = task_start_finish,
});
+ task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
put_task (session->taskmap, &task);
}
@@ -2399,10 +2966,21 @@
}
session->num_client_insert_pending++;
GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
+
+#ifdef GNUNET_EXTRA_LOGGING
+ {
+ struct GNUNET_HashCode hash;
+
+ GNUNET_SET_element_hash (element, &hash);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
+ session->local_peer_idx,
+ GNUNET_h2s (&hash));
+ }
+#endif
+
GNUNET_free (element);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n",
session->local_peer_idx);
}
Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf 2015-10-05 16:35:44 UTC (rev
36442)
+++ gnunet/src/consensus/test_consensus.conf 2015-10-05 21:26:56 UTC (rev
36443)
@@ -1,15 +1,14 @@
address@hidden@ ../../contrib/no_forcestart.conf
address@hidden@ ../../contrib/no_forcestart.conf
[PATHS]
GNUNET_TEST_HOME = /tmp/test-consensus/
[consensus]
-# PREFIX = valgrind
+PREFIX = valgrind
OPTIONS = -L INFO
BINARY = gnunet-service-evil-consensus
-# Evil behavior: Peer 0 does not execute leader step
-#EVIL_SPEC = 0;pass;leader
+EVIL_SPEC = 0;cram;5
# Evil behavior: Peer 0 adds 5 random elements when he is the gradecast leader
# (every peer gets the same element.
@@ -19,6 +18,11 @@
# (every peer gets different elements).
#EVIL_SPEC = 0;stuff-different;leader;5
+
+
+[core]
+FORECESTART = YES
+
[cadet]
#PREFIX = valgrind
Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h 2015-10-05 16:35:44 UTC (rev
36442)
+++ gnunet/src/include/gnunet_set_service.h 2015-10-05 21:26:56 UTC (rev
36443)
@@ -470,7 +470,27 @@
void
GNUNET_SET_iterate_cancel (struct GNUNET_SET_Handle *set);
+/**
+ * Create a copy of an element. The copy
+ * must be GNUNET_free-d by the caller.
+ *
+ * @param element the element to copy
+ * @return the copied element
+ */
+struct GNUNET_SET_Element *
+GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element);
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param ret_hash a pointer to where the hash of @a element
+ * should be stored
+ */
+void
+GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element, struct
GNUNET_HashCode *ret_hash);
+
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2015-10-05 16:35:44 UTC (rev 36442)
+++ gnunet/src/set/gnunet-service-set.c 2015-10-05 21:26:56 UTC (rev 36443)
@@ -844,6 +844,11 @@
ee->mutations = NULL;
ee->mutations_size = 0;
ee->element_hash = hash;
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_put (set->content->elements,
+ &ee->element_hash,
+ ee,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
}
else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
{
@@ -859,11 +864,6 @@
GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
}
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_put (set->content->elements,
- &ee->element_hash,
- ee,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
set->vt->add (set->state, ee);
}
Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c 2015-10-05 16:35:44 UTC (rev 36442)
+++ gnunet/src/set/set_api.c 2015-10-05 21:26:56 UTC (rev 36443)
@@ -1112,4 +1112,40 @@
}
+/**
+ * Create a copy of an element. The copy
+ * must be GNUNET_free-d by the caller.
+ *
+ * @param element the element to copy
+ * @return the copied element
+ */
+struct GNUNET_SET_Element *
+GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
+{
+ struct GNUNET_SET_Element *copy;
+
+ copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
+ copy->size = element->size;
+ copy->element_type = element->element_type;
+ copy->data = ©[1];
+ memcpy ((void *) copy->data, element->data, copy->size);
+
+ return copy;
+}
+
+
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param ret_hash a pointer to where the hash of @a element
+ * should be stored
+ */
+void
+GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element, struct
GNUNET_HashCode *ret_hash)
+{
+ /* FIXME: The element type should also be hashed. */
+ GNUNET_CRYPTO_hash (element->data, element->size, ret_hash);
+}
+
/* end of set_api.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r36443 - in gnunet/src: consensus include set,
gnunet <=