[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated: ZONEMASTER: Use parallel worker thread f
From: |
gnunet |
Subject: |
[gnunet] branch master updated: ZONEMASTER: Use parallel worker thread for GNS block signing |
Date: |
Thu, 20 Oct 2022 10:01:55 +0200 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new 64aefd7b6 ZONEMASTER: Use parallel worker thread for GNS block signing
64aefd7b6 is described below
commit 64aefd7b6fb27b8625af12783201f3c87da41f47
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Thu Oct 20 17:01:48 2022 +0900
ZONEMASTER: Use parallel worker thread for GNS block signing
---
src/gnsrecord/gnsrecord_crypto.c | 233 +++++++++++++++------
src/include/gnunet_gnsrecord_lib.h | 40 +++-
src/zonemaster/gnunet-service-zonemaster.c | 311 ++++++++++++++++++++++++-----
src/zonemaster/zonemaster.conf.in | 1 +
4 files changed, 472 insertions(+), 113 deletions(-)
diff --git a/src/gnsrecord/gnsrecord_crypto.c b/src/gnsrecord/gnsrecord_crypto.c
index 6c1bc6045..d794c9cb4 100644
--- a/src/gnsrecord/gnsrecord_crypto.c
+++ b/src/gnsrecord/gnsrecord_crypto.c
@@ -95,7 +95,8 @@ eddsa_symmetric_decrypt (
if (ctlen < 0)
return GNUNET_SYSERR;
if (0 != crypto_secretbox_open_detached (result,
- ((unsigned char*) block) +
crypto_secretbox_MACBYTES, // Ciphertext
+ ((unsigned char*) block)
+ + crypto_secretbox_MACBYTES,
// Ciphertext
block, // Tag
ctlen,
nonce, key))
@@ -193,6 +194,116 @@ block_get_size_ecdsa (const struct GNUNET_GNSRECORD_Data
*rd,
return len;
}
+enum GNUNET_GenericReturnValue
+block_sign_ecdsa (const struct
+ GNUNET_CRYPTO_EcdsaPrivateKey *key,
+ const struct
+ GNUNET_CRYPTO_EcdsaPublicKey *pkey,
+ const char *label,
+ struct GNUNET_GNSRECORD_Block *block)
+{
+ struct GNRBlockPS *gnr_block;
+ struct GNUNET_GNSRECORD_EcdsaBlock *ecblock;
+ size_t size = ntohl (block->size) - sizeof (*block) + sizeof (*gnr_block);
+
+ gnr_block = GNUNET_malloc (size);
+ ecblock = &(block)->ecdsa_block;
+ gnr_block->purpose.size = htonl (size);
+ gnr_block->purpose.purpose =
+ htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
+ gnr_block->expiration_time = ecblock->expiration_time;
+ /* encrypt and sign */
+ GNUNET_memcpy (&gnr_block[1], &ecblock[1],
+ size - sizeof (*gnr_block));
+ GNUNET_CRYPTO_ecdsa_public_key_derive (pkey,
+ label,
+ "gns",
+ &ecblock->derived_key);
+ if (GNUNET_OK !=
+ GNUNET_CRYPTO_ecdsa_sign_derived (key,
+ label,
+ "gns",
+ &gnr_block->purpose,
+ &ecblock->signature))
+ {
+ GNUNET_break (0);
+ GNUNET_free (gnr_block);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_free (gnr_block);
+ return GNUNET_OK;
+}
+
+
+enum GNUNET_GenericReturnValue
+block_sign_eddsa (const struct
+ GNUNET_CRYPTO_EddsaPrivateKey *key,
+ const struct
+ GNUNET_CRYPTO_EddsaPublicKey *pkey,
+ const char *label,
+ struct GNUNET_GNSRECORD_Block *block)
+{
+ struct GNRBlockPS *gnr_block;
+ struct GNUNET_GNSRECORD_EddsaBlock *edblock;
+ size_t size = ntohl (block->size) - sizeof (*block) + sizeof (*gnr_block);
+ gnr_block = GNUNET_malloc (size);
+ edblock = &(block)->eddsa_block;
+ gnr_block->purpose.size = htonl (size);
+ gnr_block->purpose.purpose =
+ htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
+ gnr_block->expiration_time = edblock->expiration_time;
+ GNUNET_memcpy (&gnr_block[1], &edblock[1],
+ size - sizeof (*gnr_block));
+ /* encrypt and sign */
+ GNUNET_CRYPTO_eddsa_public_key_derive (pkey,
+ label,
+ "gns",
+ &edblock->derived_key);
+ GNUNET_CRYPTO_eddsa_sign_derived (key,
+ label,
+ "gns",
+ &gnr_block->purpose,
+ &edblock->signature);
+ GNUNET_free (gnr_block);
+ return GNUNET_OK;
+}
+
+
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_sign (const struct
+ GNUNET_IDENTITY_PrivateKey *key,
+ const char *label,
+ struct GNUNET_GNSRECORD_Block *block)
+{
+ struct GNUNET_IDENTITY_PublicKey pkey;
+ enum GNUNET_GenericReturnValue res = GNUNET_SYSERR;
+ char *norm_label;
+
+ GNUNET_IDENTITY_key_get_public (key,
+ &pkey);
+ norm_label = GNUNET_GNSRECORD_string_normalize (label);
+
+ switch (ntohl (key->type))
+ {
+ case GNUNET_GNSRECORD_TYPE_PKEY:
+ res = block_sign_ecdsa (&key->ecdsa_key,
+ &pkey.ecdsa_key,
+ norm_label,
+ block);
+ break;
+ case GNUNET_GNSRECORD_TYPE_EDKEY:
+ res = block_sign_eddsa (&key->eddsa_key,
+ &pkey.eddsa_key,
+ norm_label,
+ block);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ GNUNET_free (norm_label);
+ return res;
+}
+
/**
* Sign name and records
@@ -204,6 +315,7 @@ block_get_size_ecdsa (const struct GNUNET_GNSRECORD_Data
*rd,
* @param rd record data
* @param rd_count number of records
* @param block the block result. Must be allocated sufficiently.
+ * @param sign sign the block GNUNET_NO if block will be signed later.
* @return GNUNET_SYSERR on error (otherwise GNUNET_OK)
*/
static enum GNUNET_GenericReturnValue
@@ -213,12 +325,12 @@ block_create_ecdsa (const struct
GNUNET_CRYPTO_EcdsaPrivateKey *key,
const char *label,
const struct GNUNET_GNSRECORD_Data *rd,
unsigned int rd_count,
- struct GNUNET_GNSRECORD_Block **block)
+ struct GNUNET_GNSRECORD_Block **block,
+ int sign)
{
ssize_t payload_len = GNUNET_GNSRECORD_records_get_size (rd_count,
rd);
struct GNUNET_GNSRECORD_EcdsaBlock *ecblock;
- struct GNRBlockPS *gnr_block;
unsigned char ctr[GNUNET_CRYPTO_AES_KEY_LENGTH / 2];
unsigned char skey[GNUNET_CRYPTO_AES_KEY_LENGTH];
struct GNUNET_GNSRECORD_Data rdc[GNUNET_NZL (rd_count)];
@@ -251,7 +363,7 @@ block_create_ecdsa (const struct
GNUNET_CRYPTO_EcdsaPrivateKey *key,
}
/* serialize */
*block = GNUNET_malloc (sizeof (struct GNUNET_GNSRECORD_Block) +
payload_len);
- (*block)->size = htonl(sizeof (struct GNUNET_GNSRECORD_Block) + payload_len);
+ (*block)->size = htonl (sizeof (struct GNUNET_GNSRECORD_Block) +
payload_len);
{
char payload[payload_len];
@@ -260,19 +372,9 @@ block_create_ecdsa (const struct
GNUNET_CRYPTO_EcdsaPrivateKey *key,
rdc,
payload_len,
payload));
- gnr_block = GNUNET_malloc (sizeof (struct GNRBlockPS) + payload_len);
ecblock = &(*block)->ecdsa_block;
(*block)->type = htonl (GNUNET_GNSRECORD_TYPE_PKEY);
- gnr_block->purpose.size = htonl (sizeof(struct GNRBlockPS) + payload_len);
- gnr_block->purpose.purpose =
- htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
- gnr_block->expiration_time = GNUNET_TIME_absolute_hton (expire);
- ecblock->expiration_time = gnr_block->expiration_time;
- /* encrypt and sign */
- GNUNET_CRYPTO_ecdsa_public_key_derive (pkey,
- label,
- "gns",
- &ecblock->derived_key);
+ ecblock->expiration_time = GNUNET_TIME_absolute_hton (expire);
GNR_derive_block_aes_key (ctr,
skey,
label,
@@ -284,21 +386,16 @@ block_create_ecdsa (const struct
GNUNET_CRYPTO_EcdsaPrivateKey *key,
skey,
ctr,
&ecblock[1]));
- GNUNET_memcpy (&gnr_block[1], &ecblock[1], payload_len);
}
+ if (GNUNET_YES != sign)
+ return GNUNET_OK;
if (GNUNET_OK !=
- GNUNET_CRYPTO_ecdsa_sign_derived (key,
- label,
- "gns",
- &gnr_block->purpose,
- &ecblock->signature))
+ block_sign_ecdsa (key, pkey, label, *block))
{
GNUNET_break (0);
GNUNET_free (*block);
- GNUNET_free (gnr_block);
return GNUNET_SYSERR;
}
- GNUNET_free (gnr_block);
return GNUNET_OK;
}
@@ -327,6 +424,7 @@ block_get_size_eddsa (const struct GNUNET_GNSRECORD_Data
*rd,
* @param rd record data
* @param rd_count number of records
* @param block where to store the block. Must be allocated sufficiently.
+ * @param sign GNUNET_YES if block shall be signed as well
* @return GNUNET_SYSERR on error (otherwise GNUNET_OK)
*/
enum GNUNET_GenericReturnValue
@@ -336,12 +434,12 @@ block_create_eddsa (const struct
GNUNET_CRYPTO_EddsaPrivateKey *key,
const char *label,
const struct GNUNET_GNSRECORD_Data *rd,
unsigned int rd_count,
- struct GNUNET_GNSRECORD_Block **block)
+ struct GNUNET_GNSRECORD_Block **block,
+ int sign)
{
ssize_t payload_len = GNUNET_GNSRECORD_records_get_size (rd_count,
rd);
struct GNUNET_GNSRECORD_EddsaBlock *edblock;
- struct GNRBlockPS *gnr_block;
unsigned char nonce[crypto_secretbox_NONCEBYTES];
unsigned char skey[crypto_secretbox_KEYBYTES];
struct GNUNET_GNSRECORD_Data rdc[GNUNET_NZL (rd_count)];
@@ -375,8 +473,8 @@ block_create_eddsa (const struct
GNUNET_CRYPTO_EddsaPrivateKey *key,
/* serialize */
*block = GNUNET_malloc (sizeof (struct GNUNET_GNSRECORD_Block)
+ payload_len + crypto_secretbox_MACBYTES);
- (*block)->size = htonl(sizeof (struct GNUNET_GNSRECORD_Block)
- + payload_len + crypto_secretbox_MACBYTES);
+ (*block)->size = htonl (sizeof (struct GNUNET_GNSRECORD_Block)
+ + payload_len + crypto_secretbox_MACBYTES);
{
char payload[payload_len];
@@ -385,24 +483,9 @@ block_create_eddsa (const struct
GNUNET_CRYPTO_EddsaPrivateKey *key,
rdc,
payload_len,
payload));
- gnr_block = GNUNET_malloc (sizeof (struct GNRBlockPS)
- + payload_len
- + crypto_secretbox_MACBYTES);
edblock = &(*block)->eddsa_block;
(*block)->type = htonl (GNUNET_GNSRECORD_TYPE_EDKEY);
- gnr_block->purpose.size =
- htonl (sizeof(struct GNRBlockPS)
- + payload_len
- + crypto_secretbox_MACBYTES);
- gnr_block->purpose.purpose =
- htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
- gnr_block->expiration_time = GNUNET_TIME_absolute_hton (expire);
- edblock->expiration_time = gnr_block->expiration_time;
- /* encrypt and sign */
- GNUNET_CRYPTO_eddsa_public_key_derive (pkey,
- label,
- "gns",
- &edblock->derived_key);
+ edblock->expiration_time = GNUNET_TIME_absolute_hton (expire);
GNR_derive_block_xsalsa_key (nonce,
skey,
label,
@@ -414,14 +497,9 @@ block_create_eddsa (const struct
GNUNET_CRYPTO_EddsaPrivateKey *key,
skey,
nonce,
&edblock[1]));
- GNUNET_memcpy (&gnr_block[1], &edblock[1],
- payload_len + crypto_secretbox_MACBYTES);
-
- GNUNET_CRYPTO_eddsa_sign_derived (key,
- label,
- "gns",
- &gnr_block->purpose,
- &edblock->signature);
+ if (GNUNET_YES != sign)
+ return GNUNET_OK;
+ block_sign_eddsa (key, pkey, label, *block);
}
return GNUNET_OK;
}
@@ -477,7 +555,8 @@ GNUNET_GNSRECORD_block_create (const struct
GNUNET_IDENTITY_PrivateKey *key,
norm_label,
rd,
rd_count,
- result);
+ result,
+ GNUNET_YES);
break;
case GNUNET_GNSRECORD_TYPE_EDKEY:
res = block_create_eddsa (&key->eddsa_key,
@@ -486,7 +565,8 @@ GNUNET_GNSRECORD_block_create (const struct
GNUNET_IDENTITY_PrivateKey *key,
norm_label,
rd,
rd_count,
- result);
+ result,
+ GNUNET_YES);
break;
default:
GNUNET_assert (0);
@@ -513,13 +593,14 @@ struct KeyCacheLine
};
-enum GNUNET_GenericReturnValue
-GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey,
- struct GNUNET_TIME_Absolute expire,
- const char *label,
- const struct GNUNET_GNSRECORD_Data *rd,
- unsigned int rd_count,
- struct GNUNET_GNSRECORD_Block **result)
+static enum GNUNET_GenericReturnValue
+block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey,
+ struct GNUNET_TIME_Absolute expire,
+ const char *label,
+ const struct GNUNET_GNSRECORD_Data *rd,
+ unsigned int rd_count,
+ struct GNUNET_GNSRECORD_Block **result,
+ int sign)
{
const struct GNUNET_CRYPTO_EcdsaPrivateKey *key;
struct GNUNET_CRYPTO_EddsaPublicKey edpubkey;
@@ -552,7 +633,8 @@ GNUNET_GNSRECORD_block_create2 (const struct
GNUNET_IDENTITY_PrivateKey *pkey,
norm_label,
rd,
rd_count,
- result);
+ result,
+ sign);
}
else if (GNUNET_IDENTITY_TYPE_EDDSA == ntohl (pkey->type))
{
@@ -564,13 +646,40 @@ GNUNET_GNSRECORD_block_create2 (const struct
GNUNET_IDENTITY_PrivateKey *pkey,
norm_label,
rd,
rd_count,
- result);
+ result,
+ sign);
}
GNUNET_free (norm_label);
return res;
}
+
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_create_unsigned (const struct
+ GNUNET_IDENTITY_PrivateKey *pkey,
+ struct GNUNET_TIME_Absolute expire,
+ const char *label,
+ const struct GNUNET_GNSRECORD_Data *rd,
+ unsigned int rd_count,
+ struct GNUNET_GNSRECORD_Block **result)
+{
+ return block_create2 (pkey, expire, label, rd, rd_count, result, GNUNET_NO);
+}
+
+
+
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey,
+ struct GNUNET_TIME_Absolute expire,
+ const char *label,
+ const struct GNUNET_GNSRECORD_Data *rd,
+ unsigned int rd_count,
+ struct GNUNET_GNSRECORD_Block **result)
+{
+ return block_create2 (pkey, expire, label, rd, rd_count, result, GNUNET_YES);
+}
+
/**
* Check if a signature is valid. This API is used by the GNS Block
* to validate signatures received from the network.
diff --git a/src/include/gnunet_gnsrecord_lib.h
b/src/include/gnunet_gnsrecord_lib.h
index 357f87587..85a42d459 100644
--- a/src/include/gnunet_gnsrecord_lib.h
+++ b/src/include/gnunet_gnsrecord_lib.h
@@ -150,7 +150,7 @@ enum GNUNET_GNSRECORD_Filter
* Filter public records.
* FIXME: Not implemented
*/
- //GNUNET_NAMESTORE_FILTER_OMIT_PUBLIC = 4,
+ // GNUNET_NAMESTORE_FILTER_OMIT_PUBLIC = 4,
};
@@ -554,6 +554,19 @@ GNUNET_GNSRECORD_block_calculate_size (const struct
const struct GNUNET_GNSRECORD_Data *rd,
unsigned int rd_count);
+/**
+ * Sign a block create with #GNUNET_GNSRECORD_block_create_unsigned
+ *
+ * @param key the private key
+ * @param label the label of the block
+ * @param block the unsigned block
+ * @return GNUNET_OK on success
+ */
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_sign (const struct
+ GNUNET_IDENTITY_PrivateKey *key,
+ const char *label,
+ struct GNUNET_GNSRECORD_Block *block);
/**
* Sign name and records
@@ -575,6 +588,31 @@ GNUNET_GNSRECORD_block_create (const struct
GNUNET_IDENTITY_PrivateKey *key,
struct GNUNET_GNSRECORD_Block **block);
+/**
+ * Create name and records but do not sign!
+ * Sign later with #GNUNET_GNSRECORD_block_sign().
+ * Cache derived public key (also keeps the
+ * private key in static memory, so do not use this function if
+ * keeping the private key in the process'es RAM is a major issue).
+ *
+ * @param key the private key
+ * @param expire block expiration
+ * @param label the name for the records
+ * @param rd record data
+ * @param rd_count number of records in @a rd
+ * @param result the block buffer. Will be allocated.
+ * @return GNUNET_OK on success.
+ */
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_create_unsigned (const struct
+ GNUNET_IDENTITY_PrivateKey *key,
+ struct GNUNET_TIME_Absolute expire,
+ const char *label,
+ const struct GNUNET_GNSRECORD_Data *rd,
+ unsigned int rd_count,
+ struct GNUNET_GNSRECORD_Block
**result);
+
+
/**
* Sign name and records, cache derived public key (also keeps the
* private key in static memory, so do not use this function if
diff --git a/src/zonemaster/gnunet-service-zonemaster.c
b/src/zonemaster/gnunet-service-zonemaster.c
index f5c1d781b..42b3abf91 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -96,6 +96,81 @@
*/
#define DHT_GNS_REPLICATION_LEVEL 5
+/**
+ * Our workers
+ */
+static pthread_t * worker;
+
+/**
+ * Lock for the open jobs queue.
+ */
+static pthread_mutex_t jobs_lock;
+
+/**
+ * Lock for the finished results queue.
+ */
+static pthread_mutex_t results_lock;
+
+/**
+ * For threads to know we are shutting down
+ */
+static int in_shutdown = GNUNET_NO;
+
+/**
+ * Our notification pipe
+ */
+static struct GNUNET_DISK_PipeHandle *notification_pipe;
+
+/**
+ * Pipe read task
+ */
+static struct GNUNET_SCHEDULER_Task *pipe_read_task;
+
+struct OpenSignJob
+{
+
+ struct OpenSignJob *next;
+
+ struct OpenSignJob *prev;
+
+ struct GNUNET_IDENTITY_PrivateKey zone;
+
+ struct GNUNET_GNSRECORD_Block *block;
+
+ struct GNUNET_GNSRECORD_Block *block_priv;
+
+ struct DhtPutActivity *ma;
+
+ size_t block_size;
+
+ struct GNUNET_TIME_Absolute expire_pub;
+
+ char *label;
+
+};
+
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *jobs_head;
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *jobs_tail;
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *results_head;
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *results_tail;
+
+
/**
* Handle for DHT PUT activity triggered from the namestore monitor.
*/
@@ -319,8 +394,13 @@ shutdown_task (void *cls)
struct CacheOperation *cop;
(void) cls;
+ in_shutdown == GNUNET_YES;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down!\n");
+ if (NULL != notification_pipe)
+ GNUNET_DISK_pipe_close (notification_pipe);
+ if (NULL != pipe_read_task)
+ GNUNET_SCHEDULER_cancel (pipe_read_task);
while (NULL != (cop = cop_head))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -332,7 +412,8 @@ shutdown_task (void *cls)
while (NULL != (ma = it_head))
{
- GNUNET_DHT_put_cancel (ma->ph);
+ if (NULL != ma->ph)
+ GNUNET_DHT_put_cancel (ma->ph);
dht_queue_length--;
GNUNET_CONTAINER_DLL_remove (it_head,
it_tail,
@@ -682,6 +763,16 @@ dht_put_continuation (void *cls)
GNUNET_free (ma);
}
+static void
+free_job (struct OpenSignJob *job)
+{
+ if (job->block != job->block_priv)
+ GNUNET_free (job->block_priv);
+ GNUNET_free (job->block);
+ if (NULL != job->label)
+ GNUNET_free (job->label);
+ GNUNET_free (job);
+}
/**
@@ -760,35 +851,86 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey
*key,
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
- GNUNET_GNSRECORD_query_from_private_key (key,
- label,
+ GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
+ struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
+ job->block = GNUNET_malloc (block_size); // FIXME this does not need to be
copied, can be freed by worker
+ memcpy (job->block, block, block_size);
+ job->block_size = block_size;
+ job->block_priv = block_priv;
+ job->zone = *key;
+ job->ma = ma;
+ job->label = GNUNET_strdup (label);
+ job->expire_pub = expire_pub;
+ GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
+ GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Storing %u record(s) for label `%s' in DHT with expiration
`%s'\n",
+ rd_public_count,
+ label,
+ GNUNET_STRINGS_absolute_time_to_string (expire));
+ num_public_records++;
+}
+
+static void
+notification_pipe_cb (void *cls);
+
+static void
+initiate_put_from_pipe_trigger (void *cls)
+{
+ struct GNUNET_HashCode query;
+ struct OpenSignJob *job;
+
+ pipe_read_task = NULL;
+ GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
+ job = results_head;
+ if (NULL == job)
+ {
+ GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+ const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
+ notification_pipe,
+ GNUNET_DISK_PIPE_END_READ);
+ pipe_read_task =
+ GNUNET_SCHEDULER_add_read_file (
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ np_fh,
+ notification_pipe_cb,
+ NULL);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
+ GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+ GNUNET_GNSRECORD_query_from_private_key (&job->zone,
+ job->label,
&query);
GNUNET_STATISTICS_update (statistics,
"DHT put operations initiated",
1,
GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Storing %u record(s) for label `%s' in DHT with expiration `%s'
under key %s\n",
- rd_public_count,
- label,
- GNUNET_STRINGS_absolute_time_to_string (expire),
+ "Storing record(s) for label `%s' in DHT under key %s\n",
+ job->label,
GNUNET_h2s (&query));
- num_public_records++;
- ret = GNUNET_DHT_put (dht_handle,
- &query,
- DHT_GNS_REPLICATION_LEVEL,
- GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
- GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
- block_size,
- block,
- expire_pub,
- &dht_put_continuation,
- ma);
- refresh_block (block_priv);
- if (block != block_priv)
- GNUNET_free (block_priv);
- GNUNET_free (block);
- return ret;
+ job->ma->ph = GNUNET_DHT_put (dht_handle,
+ &query,
+ DHT_GNS_REPLICATION_LEVEL,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+ GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
+ job->block_size,
+ job->block,
+ job->expire_pub,
+ &dht_put_continuation,
+ job->ma);
+ if (NULL == job->ma->ph)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Could not perform DHT PUT, is the DHT running?\n");
+ GNUNET_free (job->ma);
+ free_job (job);
+ return;
+ }
+ refresh_block (job->block_priv);
+ free_job (job);
+ return;
}
@@ -907,45 +1049,29 @@ put_gns_record (void *cls,
/* We got a set of records to publish */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting DHT PUT\n");
-
- ma = GNUNET_new (struct DhtPutActivity);
- ma->start_date = GNUNET_TIME_absolute_get ();
- ma->ph = perform_dht_put (key,
- label,
- rd,
- rd_count,
- expire,
- ma);
put_cnt++;
if (0 == put_cnt % DELTA_INTERVAL)
update_velocity (DELTA_INTERVAL);
check_zone_namestore_next ();
- if (NULL == ma->ph)
+ if (dht_queue_length >= DHT_QUEUE_LIMIT)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Could not perform DHT PUT, is the DHT running?\n");
- GNUNET_free (ma);
+ "DHT PUT queue length exceeded (%u), aborting PUT\n",
+ DHT_QUEUE_LIMIT);
return;
}
+
+ ma = GNUNET_new (struct DhtPutActivity);
+ perform_dht_put (key,
+ label,
+ rd,
+ rd_count,
+ expire,
+ ma);
dht_queue_length++;
GNUNET_CONTAINER_DLL_insert_tail (it_head,
it_tail,
ma);
- if (dht_queue_length > DHT_QUEUE_LIMIT)
- {
- ma = it_head;
- GNUNET_CONTAINER_DLL_remove (it_head,
- it_tail,
- ma);
- GNUNET_DHT_put_cancel (ma->ph);
- dht_queue_length--;
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "DHT PUT unconfirmed after %s, aborting PUT\n",
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_duration (ma->start_date),
- GNUNET_YES));
- GNUNET_free (ma);
- }
}
/**
@@ -1075,9 +1201,18 @@ perform_dht_put_monitor (const struct
GNUNET_IDENTITY_PrivateKey *key,
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
+ GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
+ struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
+ job->block = GNUNET_malloc (block_size); // FIXME this does not need to be
copied, can be freed by worker
+ memcpy (job->block, block, block_size);
+ job->zone = *key;
+ job->label = GNUNET_strdup (label);
+ GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
+ GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
GNUNET_GNSRECORD_query_from_private_key (key,
label,
&query);
+ GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
GNUNET_STATISTICS_update (statistics,
"DHT put operations initiated",
1,
@@ -1196,6 +1331,48 @@ handle_monitor_error (void *cls)
GNUNET_NO);
}
+static void*
+sign_worker (void *)
+{
+ struct OpenSignJob *job;
+ const struct GNUNET_DISK_FileHandle *fh;
+
+ fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE);
+ while (GNUNET_YES != in_shutdown)
+ {
+ GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
+ if (NULL != jobs_head)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Taking on Job for %s\n", jobs_head->label);
+ job = jobs_head;
+ GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
+ }
+ GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+ if (NULL != job)
+ {
+ GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
+ GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job);
+ GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+ job = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Done, notifying main thread throug pipe!\n");
+ GNUNET_DISK_file_write (fh, "!", 1);
+ }
+ else {
+ sleep (1);
+ }
+ }
+ return NULL;
+}
+
+static void
+notification_pipe_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received wake up notification through pipe, checking
results\n");
+ GNUNET_SCHEDULER_add_now (&initiate_put_from_pipe_trigger, NULL);
+}
/**
* Perform zonemaster duties: watch namestore, publish records.
@@ -1305,6 +1482,40 @@ run (void *cls,
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
NULL);
+
+ notification_pipe = GNUNET_DISK_pipe (GNUNET_DISK_PF_NONE);
+ const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
+ notification_pipe,
+ GNUNET_DISK_PIPE_END_READ);
+ pipe_read_task = GNUNET_SCHEDULER_add_read_file
(GNUNET_TIME_UNIT_FOREVER_REL,
+ np_fh,
+ notification_pipe_cb, NULL);
+
+ long long unsigned int worker_count = 1;
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (c,
+ "zonemaster",
+ "WORKER_COUNT",
+ &worker_count))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Number of workers not defined falling back to 1\n");
+ }
+ worker = GNUNET_malloc (sizeof (pthread_t) * worker_count);
+ /** Start worker */
+ for (int i = 0; i < worker_count; i++)
+ {
+ if (0 !=
+ pthread_create (&worker[i],
+ NULL,
+ &sign_worker,
+ NULL))
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+ "pthread_create");
+ GNUNET_SCHEDULER_shutdown ();
+ }
+ }
}
diff --git a/src/zonemaster/zonemaster.conf.in
b/src/zonemaster/zonemaster.conf.in
index 560239944..9c920c476 100644
--- a/src/zonemaster/zonemaster.conf.in
+++ b/src/zonemaster/zonemaster.conf.in
@@ -6,6 +6,7 @@ HOSTNAME = localhost
BINARY = gnunet-service-zonemaster
UNIXPATH = $GNUNET_USER_RUNTIME_DIR/gnunet-service-zonemaster.sock
@JAVAPORT@PORT = 2123
+WORKER_COUNT = 10
# Do we require users that want to access GNS to run this process
# (usually not a good idea)
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet] branch master updated: ZONEMASTER: Use parallel worker thread for GNS block signing,
gnunet <=