[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r36096 - in gnunet/src: include psyc psycstore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r36096 - in gnunet/src: include psyc psycstore |
Date: |
Sat, 18 Jul 2015 02:03:06 +0200 |
Author: tg
Date: 2015-07-18 02:03:06 +0200 (Sat, 18 Jul 2015)
New Revision: 36096
Modified:
gnunet/src/include/gnunet_psycstore_plugin.h
gnunet/src/include/gnunet_psycstore_service.h
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc_util_lib.c
gnunet/src/psycstore/Makefile.am
gnunet/src/psycstore/gnunet-service-psycstore.c
gnunet/src/psycstore/plugin_psycstore_sqlite.c
gnunet/src/psycstore/psycstore.h
gnunet/src/psycstore/psycstore_api.c
gnunet/src/psycstore/test_plugin_psycstore.c
gnunet/src/psycstore/test_psycstore.c
Log:
psyc/store: apply state modifiers
Modified: gnunet/src/include/gnunet_psycstore_plugin.h
===================================================================
--- gnunet/src/include/gnunet_psycstore_plugin.h 2015-07-18 00:03:00 UTC
(rev 36095)
+++ gnunet/src/include/gnunet_psycstore_plugin.h 2015-07-18 00:03:06 UTC
(rev 36096)
@@ -240,9 +240,10 @@
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
int
- (*state_modify_set) (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- const char *name, const void *value, size_t value_size);
+ (*state_modify_op) (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ enum GNUNET_ENV_Operator op,
+ const char *name, const void *value, size_t value_size);
/**
@@ -270,11 +271,11 @@
const struct GNUNET_CRYPTO_EddsaPublicKey
*channel_key);
/**
- * Set the value of a state variable while synchronizing state.
+ * Assign value of a state variable while synchronizing state.
*
* The state synchronization process is started with state_sync_begin(),
* which is followed by one or more calls to this function,
- * and finished with state_sync_end().
+ * and finished using state_sync_end().
*
* @see GNUNET_PSYCSTORE_state_sync()
*
@@ -281,9 +282,9 @@
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
int
- (*state_sync_set) (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- const char *name, const void *value, size_t value_size);
+ (*state_sync_assign) (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const char *name, const void *value, size_t
value_size);
/**
@@ -296,7 +297,8 @@
int
(*state_sync_end) (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- uint64_t message_id);
+ uint64_t max_state_message_id,
+ uint64_t state_hash_message_id);
/**
Modified: gnunet/src/include/gnunet_psycstore_service.h
===================================================================
--- gnunet/src/include/gnunet_psycstore_service.h 2015-07-18 00:03:00 UTC
(rev 36095)
+++ gnunet/src/include/gnunet_psycstore_service.h 2015-07-18 00:03:06 UTC
(rev 36096)
@@ -494,10 +494,6 @@
* ID of the message that contains the @a modifiers.
* @param state_delta
* Value of the @e state_delta PSYC header variable of the message.
- * @param modifier_count
- * Number of elements in the @a modifiers array.
- * @param modifiers
- * List of modifiers to apply.
* @param rcb
* Callback to call with the result of the operation.
* @param rcb_cls
@@ -510,8 +506,6 @@
const struct GNUNET_CRYPTO_EddsaPublicKey
*channel_key,
uint64_t message_id,
uint64_t state_delta,
- size_t modifier_count,
- const struct GNUNET_ENV_Modifier *modifiers,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *rcb_cls);
@@ -523,7 +517,9 @@
* Handle for the PSYCstore.
* @param channel_key
* The channel we are interested in.
- * @param message_id
+ * @param max_state_message_id
+ * ID of the last stateful message before @a state_hash_message_id.
+ * @param state_hash_message_id
* ID of the message that contains the state_hash PSYC header variable.
* @param modifier_count
* Number of elements in the @a modifiers array.
@@ -539,7 +535,8 @@
struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EddsaPublicKey
*channel_key,
- uint64_t message_id,
+ uint64_t max_state_message_id,
+ uint64_t state_hash_message_id,
size_t modifier_count,
const struct GNUNET_ENV_Modifier *modifiers,
GNUNET_PSYCSTORE_ResultCallback rcb,
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2015-07-18 00:03:00 UTC (rev
36095)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2015-07-18 00:03:06 UTC (rev
36096)
@@ -416,6 +416,8 @@
static void
transmit_message (struct Channel *chn);
+static uint64_t
+message_queue_run (struct Channel *chn);
static uint64_t
message_queue_drop (struct Channel *chn);
@@ -1274,6 +1276,39 @@
}
+struct StateModifyClosure
+{
+ struct Channel *chn;
+ struct FragmentQueue *fragq;
+ uint64_t message_id;
+};
+
+
+void
+store_recv_state_modify_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
+{
+ struct StateModifyClosure *mcls = cls;
+ struct Channel *chn = mcls->chn;
+ struct FragmentQueue *fragq = mcls->fragq;
+ uint64_t msg_id = mcls->message_id;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 "
(%.*s)\n",
+ chn, result, err_msg_size, err_msg);
+
+ if (GNUNET_OK == result)
+ {
+ chn->max_state_message_id = msg_id;
+ chn->max_message_id = msg_id;
+
+ fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP ==
fragq->state);
+ GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
+ message_queue_run (chn);
+ }
+}
+
+
/**
* Run message queue.
*
@@ -1294,6 +1329,7 @@
"%p Running message queue.\n", chn);
uint64_t n = 0;
uint64_t msg_id;
+
while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
&msg_id))
{
@@ -1325,7 +1361,7 @@
"%p Out of order message. "
"(%" PRIu64 " - 1 != %" PRIu64 ")\n",
chn, msg_id, chn->max_message_id);
- break;
+ continue;
}
}
else
@@ -1336,14 +1372,19 @@
"%p Out of order stateful message. "
"(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
chn, msg_id, fragq->state_delta,
chn->max_state_message_id);
- break;
+ continue;
}
-#if TODO
- /* FIXME: apply modifiers to state in PSYCstore */
- GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
- store_recv_state_modify_result, cls);
-#endif
- chn->max_state_message_id = msg_id;
+
+ struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
+ mcls->chn = chn;
+ mcls->fragq = fragq;
+ mcls->message_id = msg_id;
+
+ /* Apply modifiers to state in PSYCstore */
+ GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
+ fragq->state_delta,
+ store_recv_state_modify_result, mcls);
+ break;
}
chn->max_message_id = msg_id;
}
@@ -1351,6 +1392,7 @@
GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
n++;
}
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Removed %" PRIu64 " messages from queue.\n", chn, n);
return n;
@@ -2039,6 +2081,11 @@
{
pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
}
+
+ if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
+ {
+ /// @todo add state_hash to PSYC header
+ }
}
}
Modified: gnunet/src/psyc/psyc_util_lib.c
===================================================================
--- gnunet/src/psyc/psyc_util_lib.c 2015-07-18 00:03:00 UTC (rev 36095)
+++ gnunet/src/psyc/psyc_util_lib.c 2015-07-18 00:03:06 UTC (rev 36096)
@@ -343,7 +343,7 @@
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queueing message part of type %u and size %u (end: %u)).\n",
- ntohs (msg->type), size, end);
+ NULL != msg ? ntohs (msg->type) : 0, size, end);
if (NULL != tmit->msg)
{
@@ -917,7 +917,8 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message part from PSYC.\n");
+ "Received message part of type %u and size %u from PSYC.\n",
+ ptype, psize);
GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
switch (ptype)
@@ -1118,7 +1119,7 @@
ptype, psize);
return GNUNET_SYSERR;
}
- /* FIXME: check message part order */
+ /** @todo FIXME: check message part order */
}
return parts;
}
Modified: gnunet/src/psycstore/Makefile.am
===================================================================
--- gnunet/src/psycstore/Makefile.am 2015-07-18 00:03:00 UTC (rev 36095)
+++ gnunet/src/psycstore/Makefile.am 2015-07-18 00:03:06 UTC (rev 36096)
@@ -49,6 +49,7 @@
gnunet_service_psycstore_LDADD = \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/psyc/libgnunetpsycutil.la \
$(GN_LIBINTL)
plugin_LTLIBRARIES = \
Modified: gnunet/src/psycstore/gnunet-service-psycstore.c
===================================================================
--- gnunet/src/psycstore/gnunet-service-psycstore.c 2015-07-18 00:03:00 UTC
(rev 36095)
+++ gnunet/src/psycstore/gnunet-service-psycstore.c 2015-07-18 00:03:06 UTC
(rev 36096)
@@ -32,6 +32,7 @@
#include "gnunet_constants.h"
#include "gnunet_protocols.h"
#include "gnunet_statistics_service.h"
+#include "gnunet_psyc_util_lib.h"
#include "gnunet_psycstore_service.h"
#include "gnunet_psycstore_plugin.h"
#include "psycstore.h"
@@ -493,8 +494,137 @@
}
-/** @todo FIXME: stop processing further state modify messages after an error
*/
+struct StateModifyClosure
+{
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
+ struct GNUNET_PSYC_ReceiveHandle *recv;
+ enum GNUNET_PSYC_MessageState msg_state;
+ char mod_oper;
+ char *mod_name;
+ char *mod_value;
+ uint64_t mod_value_size;
+ uint64_t mod_value_remaining;
+};
+
+
static void
+recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
+ uint32_t flags, const struct GNUNET_MessageHeader
*msg)
+{
+ struct StateModifyClosure *scls = cls;
+ uint16_t psize;
+ if (NULL == msg)
+ {
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+ return;
+ }
+
+ switch (ntohs (msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ {
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ struct GNUNET_PSYC_MessageModifier *
+ pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
+ psize = ntohs (pmod->header.size);
+ uint16_t name_size = ntohs (pmod->name_size);
+ uint16_t value_size = ntohs (pmod->value_size);
+
+ const char *name = (const char *) &pmod[1];
+ const void *value = name + name_size;
+
+ if (GNUNET_ENV_OP_SET != pmod->oper)
+ { // Apply non-transient operation.
+ if (psize == sizeof (*pmod) + name_size + value_size)
+ {
+ db->state_modify_op (db->cls, scls->channel_key,
+ pmod->oper, name, value, value_size);
+ }
+ else
+ {
+ scls->mod_oper = pmod->oper;
+ scls->mod_name = GNUNET_malloc (name_size);
+ memcpy (scls->mod_name, name, name_size);
+
+ scls->mod_value_size = value_size;
+ scls->mod_value = GNUNET_malloc (scls->mod_value_size);
+ scls->mod_value_remaining
+ = scls->mod_value_size - (psize - sizeof (*pmod) - name_size);
+ memcpy (scls->mod_value, value, value_size -
scls->mod_value_remaining);
+ }
+ }
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ if (GNUNET_ENV_OP_SET != scls->mod_oper)
+ {
+ if (scls->mod_value_remaining == 0)
+ {
+ GNUNET_break_op (0);
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+ }
+ psize = ntohs (msg->size);
+ memcpy (scls->mod_value + (scls->mod_value_size -
scls->mod_value_remaining),
+ &msg[1], psize - sizeof (*msg));
+ scls->mod_value_remaining -= psize - sizeof (*msg);
+ if (0 == scls->mod_value_remaining)
+ {
+ db->state_modify_op (db->cls, scls->channel_key,
+ scls->mod_oper, scls->mod_name,
+ scls->mod_value, scls->mod_value_size);
+ GNUNET_free (scls->mod_name);
+ GNUNET_free (scls->mod_value);
+ scls->mod_oper = 0;
+ scls->mod_name = NULL;
+ scls->mod_value = NULL;
+ scls->mod_value_size = 0;
+ }
+ }
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
+ break;
+
+ default:
+ scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+ }
+}
+
+
+static int
+recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+ struct StateModifyClosure *scls = cls;
+
+ if (NULL == scls->recv)
+ {
+ scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part,
+ scls);
+ }
+
+ const struct GNUNET_PSYC_MessageHeader *
+ pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1];
+ GNUNET_PSYC_receive_message (scls->recv, pmsg);
+
+ return GNUNET_YES;
+}
+
+
+static void
handle_state_modify (void *cls,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
@@ -502,65 +632,36 @@
const struct StateModifyRequest *req
= (const struct StateModifyRequest *) msg;
- int ret = GNUNET_SYSERR;
- const char *name = (const char *) &req[1];
- uint16_t name_size = ntohs (req->name_size);
+ uint64_t message_id = GNUNET_ntohll (req->message_id);
+ uint64_t state_delta = GNUNET_ntohll (req->state_delta);
+ uint64_t ret_frags = 0;
- if (name_size <= 2 || '\0' != name[name_size - 1])
+ struct StateModifyClosure scls = { 0 };
+
+ if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key,
+ message_id, state_delta))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Tried to set invalid state variable name!\n"));
- GNUNET_break_op (0);
+ _("Failed to begin modifying state!\n"));
+ GNUNET_break (0);
}
- else
+
+ int ret = db->message_get (db->cls, &req->channel_key,
+ message_id, message_id,
+ &ret_frags, &recv_state_fragment, &scls);
+
+ if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key,
message_id))
{
- ret = GNUNET_OK;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to end modifying state!\n"));
+ GNUNET_break (0);
+ }
- if (req->flags & STATE_OP_FIRST)
- {
- ret = db->state_modify_begin (db->cls, &req->channel_key,
- GNUNET_ntohll (req->message_id),
- GNUNET_ntohll (req->state_delta));
- }
- if (ret != GNUNET_OK)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Failed to begin modifying state!\n"));
- }
- else
- {
- switch (req->oper)
- {
- case GNUNET_ENV_OP_ASSIGN:
- ret = db->state_modify_set (db->cls, &req->channel_key,
- (const char *) &req[1],
- name + ntohs (req->name_size),
- ntohs (req->header.size) - sizeof (*req)
- - ntohs (req->name_size));
- break;
- default:
-#if TODO
- ret = GNUNET_ENV_operation ((const char *) &req[1],
- current_value, current_value_size,
- req->oper, name + ntohs (req->name_size),
- ntohs (req->header.size) - sizeof (*req)
- - ntohs (req->name_size), &value,
&value_size);
-#endif
- ret = GNUNET_SYSERR;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Unknown operator: %c\n"), req->oper);
- }
- }
+ if (NULL != scls.recv)
+ {
+ GNUNET_PSYC_receive_destroy (scls.recv);
+ }
- if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
- {
- ret = db->state_modify_end (db->cls, &req->channel_key,
- GNUNET_ntohll (req->message_id));
- if (ret != GNUNET_OK)
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Failed to end modifying state!\n"));
- }
- }
send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -600,16 +701,17 @@
}
else
{
- ret = db->state_sync_set (db->cls, &req->channel_key, name,
- name + ntohs (req->name_size),
- ntohs (req->header.size) - sizeof (*req)
- - ntohs (req->name_size));
+ ret = db->state_sync_assign (db->cls, &req->channel_key, name,
+ name + ntohs (req->name_size),
+ ntohs (req->header.size) - sizeof (*req)
+ - ntohs (req->name_size));
}
if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
{
ret = db->state_sync_end (db->cls, &req->channel_key,
- GNUNET_ntohll (req->message_id));
+ GNUNET_ntohll (req->max_state_message_id),
+ GNUNET_ntohll (req->state_hash_message_id));
if (ret != GNUNET_OK)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to end synchronizing state!\n"));
Modified: gnunet/src/psycstore/plugin_psycstore_sqlite.c
===================================================================
--- gnunet/src/psycstore/plugin_psycstore_sqlite.c 2015-07-18 00:03:00 UTC
(rev 36095)
+++ gnunet/src/psycstore/plugin_psycstore_sqlite.c 2015-07-18 00:03:06 UTC
(rev 36096)
@@ -35,6 +35,7 @@
#include "gnunet_psycstore_service.h"
#include "gnunet_multicast_service.h"
#include "gnunet_crypto_lib.h"
+#include "gnunet_env_lib.h"
#include "psycstore.h"
#include <sqlite3.h>
@@ -172,15 +173,9 @@
*/
sqlite3_stmt *update_max_state_message_id;
-
/**
- * Precompiled SQL for message_modify_begin()
+ * Precompiled SQL for state_modify_op()
*/
- sqlite3_stmt *select_message_state_delta;
-
- /**
- * Precompiled SQL for state_modify_set()
- */
sqlite3_stmt *insert_state_current;
/**
@@ -353,8 +348,8 @@
"CREATE TABLE IF NOT EXISTS channels (\n"
" id INTEGER PRIMARY KEY,\n"
" pub_key BLOB UNIQUE,\n"
- " max_state_message_id INTEGER,\n"
- " state_hash_message_id INTEGER\n"
+ " max_state_message_id INTEGER,\n" // last applied state message
ID
+ " state_hash_message_id INTEGER\n" // last message ID with a
state hash
");");
sql_exec (plugin->dbh,
@@ -543,17 +538,6 @@
&plugin->update_state_hash_message_id);
sql_prepare (plugin->dbh,
- "SELECT 1\n"
- "FROM channels AS c\n"
- "LEFT JOIN messages AS m\n"
- "ON c.id = m.channel_id\n"
- "WHERE c.pub_key = ?\n"
- " AND ((? < c.state_hash_message_id AND
c.state_hash_message_id < ?)\n"
- " OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
- "LIMIT 1;",
- &plugin->select_message_state_delta);
-
- sql_prepare (plugin->dbh,
"INSERT OR REPLACE INTO state\n"
" (channel_id, name, value_current, value_signed)\n"
"SELECT new.channel_id, new.name,\n"
@@ -1447,14 +1431,14 @@
/**
- * Set a state variable to the given value.
+ * Assign a value to a state variable.
*
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
static int
-state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
- const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- const char *name, const void *value, size_t value_size)
+state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const char *name, const void *value, size_t value_size)
{
int ret = GNUNET_SYSERR;
@@ -1527,50 +1511,25 @@
uint64_t message_id, uint64_t state_delta)
{
struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->select_message_state_delta;
if (state_delta > 0)
{
- int ret = GNUNET_SYSERR;
- if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC)
- || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
- message_id - state_delta)
- || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
- message_id)
- || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
- message_id - state_delta)
- || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
-
GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
- {
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_bind");
- }
- else
- {
- switch (sqlite3_step (stmt))
- {
- case SQLITE_DONE:
- ret = GNUNET_NO;
- break;
- case SQLITE_ROW:
- ret = GNUNET_OK;
- break;
- default:
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- }
- }
- if (SQLITE_OK != sqlite3_reset (stmt))
- {
- ret = GNUNET_SYSERR;
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_reset");
- }
+ /**
+ * We can only apply state modifiers in the current message if modifiers in
+ * the previous stateful message (message_id - state_delta) were already
+ * applied.
+ */
+
+ uint64_t max_state_message_id = 0;
+ int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
if (GNUNET_OK != ret)
return ret;
+
+ if (message_id - state_delta != max_state_message_id)
+ return GNUNET_NO;
}
+ // Make sure no other transaction is going on.
if (TRANSACTION_NONE != plugin->transaction)
if (GNUNET_OK != transaction_rollback (plugin))
return GNUNET_SYSERR;
@@ -1587,16 +1546,24 @@
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
static int
-state_modify_set (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- const char *name, const void *value, size_t value_size)
+state_modify_op (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ enum GNUNET_ENV_Operator op,
+ const char *name, const void *value, size_t value_size)
{
struct Plugin *plugin = cls;
GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
- return state_set (plugin, plugin->insert_state_current, channel_key,
- name, value, value_size);
+ switch (op)
+ {
+ case GNUNET_ENV_OP_ASSIGN:
+ return state_assign (plugin, plugin->insert_state_current, channel_key,
+ name, value, value_size);
+ /// @todo implement more state operations
+ default:
+ return GNUNET_SYSERR;
+ }
}
@@ -1634,7 +1601,7 @@
/**
- * Set the current value of state variable.
+ * Assign current value of a state variable.
*
* @see GNUNET_PSYCSTORE_state_modify()
*
@@ -1641,13 +1608,13 @@
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
static int
-state_sync_set (void *cls,
+state_sync_assign (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
const char *name, const void *value, size_t value_size)
{
struct Plugin *plugin = cls;
- return state_set (cls, plugin->insert_state_sync, channel_key,
- name, value, value_size);
+ return state_assign (cls, plugin->insert_state_sync, channel_key,
+ name, value, value_size);
}
@@ -1657,7 +1624,8 @@
static int
state_sync_end (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- uint64_t message_id)
+ uint64_t max_state_message_id,
+ uint64_t state_hash_message_id)
{
struct Plugin *plugin = cls;
int ret = GNUNET_SYSERR;
@@ -1670,7 +1638,10 @@
channel_key)
&& GNUNET_OK == update_message_id (plugin,
plugin->update_state_hash_message_id,
- channel_key, message_id)
+ channel_key, state_hash_message_id)
+ && GNUNET_OK == update_message_id (plugin,
+ plugin->update_max_state_message_id,
+ channel_key, max_state_message_id)
&& GNUNET_OK == transaction_commit (plugin)
? ret = GNUNET_OK
: transaction_rollback (plugin);
@@ -1679,7 +1650,7 @@
/**
- * Reset the state of a channel.
+ * Delete the whole state.
*
* @see GNUNET_PSYCSTORE_state_reset()
*
@@ -1922,10 +1893,10 @@
api->counters_message_get = &counters_message_get;
api->counters_state_get = &counters_state_get;
api->state_modify_begin = &state_modify_begin;
- api->state_modify_set = &state_modify_set;
+ api->state_modify_op = &state_modify_op;
api->state_modify_end = &state_modify_end;
api->state_sync_begin = &state_sync_begin;
- api->state_sync_set = &state_sync_set;
+ api->state_sync_assign = &state_sync_assign;
api->state_sync_end = &state_sync_end;
api->state_reset = &state_reset;
api->state_update_signed = &state_update_signed;
Modified: gnunet/src/psycstore/psycstore.h
===================================================================
--- gnunet/src/psycstore/psycstore.h 2015-07-18 00:03:00 UTC (rev 36095)
+++ gnunet/src/psycstore/psycstore.h 2015-07-18 00:03:06 UTC (rev 36096)
@@ -441,35 +441,24 @@
struct GNUNET_MessageHeader header;
/**
- * Size of name, including NUL terminator.
+ * Operation ID.
*/
- uint16_t name_size GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
- * OR'd StateOpFlags
+ * ID of the message to apply the state changes in.
*/
- uint8_t flags;
+ uint64_t message_id GNUNET_PACKED;
/**
- * enum GNUNET_ENV_Operator
+ * State delta of the message with ID @a message_id.
*/
- uint8_t oper;
+ uint64_t state_delta GNUNET_PACKED;
/**
- * Operation ID.
- */
- uint64_t op_id GNUNET_PACKED;
-
- /**
* Channel's public key.
*/
struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
-
- uint64_t message_id GNUNET_PACKED;
-
- uint64_t state_delta GNUNET_PACKED;
-
- /* Followed by NUL-terminated name, then the value. */
};
@@ -495,8 +484,6 @@
uint8_t reserved;
- uint64_t message_id GNUNET_PACKED;
-
/**
* Operation ID.
*/
@@ -503,6 +490,16 @@
uint64_t op_id GNUNET_PACKED;
/**
+ * ID of the message that contains the state_hash PSYC header variable.
+ */
+ uint64_t state_hash_message_id GNUNET_PACKED;
+
+ /**
+ * ID of the last stateful message before @a state_hash_message_id.
+ */
+ uint64_t max_state_message_id GNUNET_PACKED;
+
+ /**
* Channel's public key.
*/
struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
Modified: gnunet/src/psycstore/psycstore_api.c
===================================================================
--- gnunet/src/psycstore/psycstore_api.c 2015-07-18 00:03:00 UTC (rev
36095)
+++ gnunet/src/psycstore/psycstore_api.c 2015-07-18 00:03:06 UTC (rev
36096)
@@ -302,16 +302,9 @@
GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
if (NULL != op->res_cb)
{
- const struct StateModifyRequest *smreq;
const struct StateSyncRequest *ssreq;
switch (ntohs (op->msg->type))
{
- case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
- smreq = (const struct StateModifyRequest *) op->msg;
- if (!(smreq->flags & STATE_OP_LAST
- || GNUNET_OK != result_code))
- op->res_cb = NULL;
- break;
case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
ssreq = (const struct StateSyncRequest *) op->msg;
if (!(ssreq->flags & STATE_OP_LAST
@@ -1234,10 +1227,6 @@
* ID of the message that contains the @a modifiers.
* @param state_delta
* Value of the _state_delta PSYC header variable of the message.
- * @param modifier_count
- * Number of elements in the @a modifiers array.
- * @param modifiers
- * List of modifiers to apply.
* @param rcb
* Callback to call with the result of the operation.
* @param rcb_cls
@@ -1250,50 +1239,31 @@
const struct GNUNET_CRYPTO_EddsaPublicKey
*channel_key,
uint64_t message_id,
uint64_t state_delta,
- size_t modifier_count,
- const struct GNUNET_ENV_Modifier *modifiers,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *rcb_cls)
{
struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
- size_t i;
+ struct StateModifyRequest *req;
- for (i = 0; i < modifier_count; i++) {
- struct StateModifyRequest *req;
- uint16_t name_size = strlen (modifiers[i].name) + 1;
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
- op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
- modifiers[i].value_size);
- op->h = h;
- op->res_cb = rcb;
- op->cls = rcb_cls;
+ req = (struct StateModifyRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->message_id = GNUNET_htonll (message_id);
+ req->state_delta = GNUNET_htonll (state_delta);
- req = (struct StateModifyRequest *) &op[1];
- op->msg = (struct GNUNET_MessageHeader *) req;
- req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
- req->header.size = htons (sizeof (*req) + name_size
- + modifiers[i].value_size);
- req->channel_key = *channel_key;
- req->message_id = GNUNET_htonll (message_id);
- req->state_delta = GNUNET_htonll (state_delta);
- req->oper = modifiers[i].oper;
- req->name_size = htons (name_size);
- req->flags
- = 0 == i
- ? STATE_OP_FIRST
- : modifier_count - 1 == i
- ? STATE_OP_LAST
- : 0;
+ op->op_id = get_next_op_id (h);
+ req->op_id = GNUNET_htonll (op->op_id);
- memcpy (&req[1], modifiers[i].name, name_size);
- memcpy ((char *) &req[1] + name_size, modifiers[i].value,
modifiers[i].value_size);
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
- op->op_id = get_next_op_id (h);
- req->op_id = GNUNET_htonll (op->op_id);
-
- GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
- transmit_next (h);
- }
return op;
/* FIXME: only the last operation is returned,
* operation_cancel() should be able to cancel all of them.
@@ -1308,7 +1278,9 @@
* Handle for the PSYCstore.
* @param channel_key
* The channel we are interested in.
- * @param message_id
+ * @param max_state_message_id
+ * ID of the last stateful message before @a state_hash_message_id.
+ * @param state_hash_message_id
* ID of the message that contains the state_hash PSYC header variable.
* @param modifier_count
* Number of elements in the @a modifiers array.
@@ -1324,7 +1296,8 @@
struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EddsaPublicKey
*channel_key,
- uint64_t message_id,
+ uint64_t max_state_message_id,
+ uint64_t state_hash_message_id,
size_t modifier_count,
const struct GNUNET_ENV_Modifier *modifiers,
GNUNET_PSYCSTORE_ResultCallback rcb,
@@ -1349,7 +1322,8 @@
req->header.size = htons (sizeof (*req) + name_size
+ modifiers[i].value_size);
req->channel_key = *channel_key;
- req->message_id = GNUNET_htonll (message_id);
+ req->max_state_message_id = GNUNET_htonll (max_state_message_id);
+ req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
req->name_size = htons (name_size);
req->flags
= (0 == i)
Modified: gnunet/src/psycstore/test_plugin_psycstore.c
===================================================================
--- gnunet/src/psycstore/test_plugin_psycstore.c 2015-07-18 00:03:00 UTC
(rev 36095)
+++ gnunet/src/psycstore/test_plugin_psycstore.c 2015-07-18 00:03:06 UTC
(rev 36096)
@@ -85,7 +85,7 @@
struct GNUNET_PSYCSTORE_PluginFunctions *ret;
char *libname;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' psycstore plugin\n"),
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Loading `%s' psycstore plugin\n"),
plugin_name);
GNUNET_asprintf (&libname, "libgnunet_plugin_psycstore_%s", plugin_name);
if (NULL == (ret = GNUNET_PLUGIN_load (libname, (void*) cfg)))
@@ -306,15 +306,17 @@
message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1;
GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls,
&channel_pub_key,
- message_id, 1));
+ message_id, 0));
- GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
- "_foo",
- C2ARG("one two three")));
+ GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+ GNUNET_ENV_OP_ASSIGN,
+ "_foo",
+ C2ARG("one two three")));
- GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
- "_foo_bar", slave_key,
- sizeof (*slave_key)));
+ GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+ GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", slave_key,
+ sizeof (*slave_key)));
GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
message_id));
@@ -366,15 +368,16 @@
GNUNET_assert (GNUNET_OK == db->state_sync_begin (db->cls,
&channel_pub_key));
- GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
- "_sync_bar", scls.value[0],
- scls.value_size[0]));
+ GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
+ "_sync_bar",
scls.value[0],
+ scls.value_size[0]));
- GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
- "_sync_foo", scls.value[1],
- scls.value_size[1]));
+ GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
+ "_sync_foo",
scls.value[1],
+ scls.value_size[1]));
GNUNET_assert (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key,
+ max_state_msg_id,
INT64_MAX - 5));
GNUNET_assert (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key,
@@ -394,11 +397,13 @@
message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6;
GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls,
&channel_pub_key,
- message_id, 3));
+ message_id,
+ message_id -
max_state_msg_id));
- GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
- "_sync_foo",
- C2ARG("five six seven")));
+ GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+ GNUNET_ENV_OP_ASSIGN,
+ "_sync_foo",
+ C2ARG("five six seven")));
GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
message_id));
Modified: gnunet/src/psycstore/test_psycstore.c
===================================================================
--- gnunet/src/psycstore/test_psycstore.c 2015-07-18 00:03:00 UTC (rev
36095)
+++ gnunet/src/psycstore/test_psycstore.c 2015-07-18 00:03:06 UTC (rev
36096)
@@ -224,8 +224,8 @@
scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
scls.name[1] = "_sync_foo";
- scls.value[1] = "one two three";
- scls.value_size[1] = sizeof ("one two three") - 1;
+ scls.value[1] = "three two one";
+ scls.value_size[1] = sizeof ("three two one") - 1;
op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync",
&state_result,
@@ -253,11 +253,11 @@
GNUNET_assert (result == 1);
scls.n = 0;
- scls.name[0] = "_bar";
- scls.value[0] = "four five six";
- scls.value_size[0] = sizeof ("four five six") - 1;
+ scls.name[0] = "_sync_bar";
+ scls.value[0] = "ten eleven twelve";
+ scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
- op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_bar_x_yy_zzz",
+ op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_sync_bar_x_yy_zzz",
&state_result, &state_get_result, &scls);
}
@@ -284,22 +284,9 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%d\n", result);
GNUNET_assert (GNUNET_OK == result);
- modifiers[0] = (struct GNUNET_ENV_Modifier) {
- .oper = '=',
- .name = "_sync_foo",
- .value = "one two three",
- .value_size = sizeof ("one two three") - 1
- };
- modifiers[1] = (struct GNUNET_ENV_Modifier) {
- .oper = '=',
- .name = "_bar",
- .value = "four five six",
- .value_size = sizeof ("four five six") - 1
- };
-
op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key,
- GNUNET_ntohll
(fcls->msg[0]->message_id), 0,
- 2, modifiers, state_modify_result, fcls);
+ GNUNET_ntohll (fcls->msg[0]->message_id),
+ 0, state_modify_result, fcls);
}
@@ -356,6 +343,7 @@
op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key,
GNUNET_ntohll (fcls->msg[0]->message_id) +
1,
+ GNUNET_ntohll (fcls->msg[0]->message_id) +
2,
2, modifiers, state_sync_result, fcls);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r36096 - in gnunet/src: include psyc psycstore,
gnunet <=