gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: -simplify libgnunetpq to only support si


From: gnunet
Subject: [gnunet] branch master updated: -simplify libgnunetpq to only support single-threaded applications that do use the scheudler (when using event API)
Date: Mon, 23 Aug 2021 00:03:35 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new ea901fb49 -simplify libgnunetpq to only support single-threaded 
applications that do use the scheudler (when using event API)
ea901fb49 is described below

commit ea901fb4978ee7e9cfd2f74c810f2146bdf9d46b
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sun Aug 22 23:59:54 2021 +0200

    -simplify libgnunetpq to only support single-threaded applications that do 
use the scheudler (when using event API)
---
 src/include/gnunet_pq_lib.h |  49 +--------------
 src/pq/pq.h                 |  40 ++++++------
 src/pq/pq_connect.c         |  17 ++----
 src/pq/pq_event.c           | 145 ++++++++++++++++++++------------------------
 src/pq/test_pq.c            |  64 +------------------
 src/util/os_priority.c      |   4 +-
 6 files changed, 101 insertions(+), 218 deletions(-)

diff --git a/src/include/gnunet_pq_lib.h b/src/include/gnunet_pq_lib.h
index ecc2b9719..ff4498938 100644
--- a/src/include/gnunet_pq_lib.h
+++ b/src/include/gnunet_pq_lib.h
@@ -852,33 +852,6 @@ void
 GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db);
 
 
-/**
- * Function called whenever the socket needed for
- * notifications from postgres changes.
- *
- * @param cls closure
- * @param fd socket to listen on, -1 for none
- */
-typedef void
-(*GNUNET_PQ_SocketCallback)(void *cls,
-                            int fd);
-
-
-/**
- * Obtain the file descriptor to poll on for notifications.
- * Useful if the GNUnet scheduler is NOT to be used for
- * such notifications.
- *
- * @param db database handle
- * @param sc function to call with the socket
- * @param sc_cls closure for @a sc
- */
-void
-GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
-                                     GNUNET_PQ_SocketCallback sc,
-                                     void *sc_cls);
-
-
 /**
  * Poll for database events now.  Used if the event FD
  * is ready and the application wants to trigger applicable
@@ -892,24 +865,6 @@ void
 GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db);
 
 
-/**
- * Run poll event loop using the GNUnet scheduler.
- *
- * @param db database handle
- */
-void
-GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db);
-
-
-/**
- * Stop running poll event loop using the GNUnet scheduler.
- *
- * @param db database handle
- */
-void
-GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db);
-
-
 /**
  * Register callback to be invoked on events of type @a es.
  *
@@ -921,14 +876,16 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context 
*db);
  *
  * @param db database context to use
  * @param es specification of the event to listen for
+ * @param timeout when to trigger @a cb based on timeout
  * @param cb function to call when the event happens, possibly
- *         multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked)
+ *         multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked), 
including on timeout
  * @param cb_cls closure for @a cb
  * @return handle useful to cancel the listener
  */
 struct GNUNET_DB_EventHandler *
 GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
                         const struct GNUNET_DB_EventHeaderP *es,
+                        struct GNUNET_TIME_Relative timeout,
                         GNUNET_DB_EventCallback cb,
                         void *cb_cls);
 
diff --git a/src/pq/pq.h b/src/pq/pq.h
index 107fd116c..950d38220 100644
--- a/src/pq/pq.h
+++ b/src/pq/pq.h
@@ -28,6 +28,7 @@
 #include "gnunet_util_lib.h"
 #include "gnunet_pq_lib.h"
 
+
 /**
  * Handle to Postgres database.
  */
@@ -58,26 +59,11 @@ struct GNUNET_PQ_Context
    */
   char *load_path;
 
-  /**
-   * Function to call on Postgres FDs.
-   */
-  GNUNET_PQ_SocketCallback sc;
-
-  /**
-   * Closure for @e sc.
-   */
-  void *sc_cls;
-
   /**
    * Map managing event subscriptions.
    */
   struct GNUNET_CONTAINER_MultiShortmap *channel_map;
 
-  /**
-   * Lock to access @e channel_map.
-   */
-  pthread_mutex_t notify_lock;
-
   /**
    * Task responsible for processing events.
    */
@@ -87,7 +73,7 @@ struct GNUNET_PQ_Context
    * File descriptor wrapper for @e event_task.
    */
   struct GNUNET_NETWORK_Handle *rfd;
-  
+
   /**
    * Is scheduling via the GNUnet scheduler desired?
    */
@@ -100,9 +86,29 @@ struct GNUNET_PQ_Context
  * after a disconnect.
  *
  * @param db the DB handle
+ * @param fd socket to listen on
+ */
+void
+GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
+                            int fd);
+
+
+/**
+ * Run poll event loop using the GNUnet scheduler.
+ *
+ * @param db database handle
+ */
+void
+GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db);
+
+
+/**
+ * Stop running poll event loop using the GNUnet scheduler.
+ *
+ * @param db database handle
  */
 void
-GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db);
+GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db);
 
 
 #endif
diff --git a/src/pq/pq_connect.c b/src/pq/pq_connect.c
index 275fd7450..05e787939 100644
--- a/src/pq/pq_connect.c
+++ b/src/pq/pq_connect.c
@@ -103,9 +103,6 @@ GNUNET_PQ_connect (const char *config_str,
   }
   db->channel_map = GNUNET_CONTAINER_multishortmap_create (16,
                                                            GNUNET_YES);
-  GNUNET_assert (0 ==
-                 pthread_mutex_init (&db->notify_lock,
-                                     NULL));
   GNUNET_PQ_reconnect (db);
   if (NULL == db->conn)
   {
@@ -294,9 +291,8 @@ GNUNET_PQ_reconnect_if_down (struct GNUNET_PQ_Context *db)
 void
 GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db)
 {
-  if (NULL != db->sc)
-    db->sc (db->sc_cls,
-            -1);
+  GNUNET_PQ_event_reconnect_ (db,
+                              -1);
   if (NULL != db->conn)
     PQfinish (db->conn);
   db->conn = PQconnectdb (db->config_str);
@@ -416,11 +412,8 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db)
     db->conn = NULL;
     return;
   }
-  GNUNET_PQ_event_reconnect_ (db);
-  if ( (NULL != db->sc) &&
-       (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
-    db->sc (db->sc_cls,
-            PQsocket (db->conn));
+  GNUNET_PQ_event_reconnect_ (db,
+                              PQsocket (db->conn));
 }
 
 
@@ -473,8 +466,6 @@ GNUNET_PQ_disconnect (struct GNUNET_PQ_Context *db)
   GNUNET_assert (0 ==
                  GNUNET_CONTAINER_multishortmap_size (db->channel_map));
   GNUNET_CONTAINER_multishortmap_destroy (db->channel_map);
-  GNUNET_assert (0 ==
-                 pthread_mutex_destroy (&db->notify_lock));
   GNUNET_free (db->es);
   GNUNET_free (db->ps);
   GNUNET_free (db->load_path);
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
index 2890869a3..3a0bfcde3 100644
--- a/src/pq/pq_event.c
+++ b/src/pq/pq_event.c
@@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler
    * Database context this event handler is with.
    */
   struct GNUNET_PQ_Context *db;
+
+  /**
+   * Task to run on timeout.
+   */
+  struct GNUNET_SCHEDULER_Task *timeout_task;
 };
 
 
@@ -162,36 +167,11 @@ do_notify (void *cls,
 }
 
 
-void
-GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
-                                     GNUNET_PQ_SocketCallback sc,
-                                     void *sc_cls)
-{
-  int fd;
-
-  db->sc = sc;
-  db->sc_cls = sc_cls;
-  if (NULL == sc)
-    return;
-  GNUNET_assert (0 ==
-                 pthread_mutex_lock (&db->notify_lock));
-  fd = PQsocket (db->conn);
-  if ( (-1 != fd) &&
-       (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
-    sc (sc_cls,
-        fd);
-  GNUNET_assert (0 ==
-                 pthread_mutex_unlock (&db->notify_lock));
-}
-
-
 void
 GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
 {
   PGnotify *n;
 
-  GNUNET_assert (0 ==
-                 pthread_mutex_lock (&db->notify_lock));
   if (1 !=
       PQconsumeInput (db->conn))
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
       .extra = NULL
     };
 
+    if ('X' != toupper ((int) n->relname[0]))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "Ignoring notification for unsupported channel identifier 
`%s'\n",
+                  n->relname);
+      PQfreemem (n);
+      continue;
+    }
     if (GNUNET_OK !=
-        GNUNET_STRINGS_string_to_data (n->relname,
-                                       strlen (n->relname),
+        GNUNET_STRINGS_string_to_data (&n->relname[1],
+                                       strlen (&n->relname[1]),
                                        &sh,
                                        sizeof (sh)))
     {
@@ -236,23 +224,9 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
     GNUNET_free (ctx.extra);
     PQfreemem (n);
   }
-  GNUNET_assert (0 ==
-                 pthread_mutex_unlock (&db->notify_lock));
 }
 
 
-/**
- * Function called when the Postgres FD changes and we need
- * to update the scheduler event loop task.
- *
- * @param cls a `struct GNUNET_PQ_Context *`
- * @param fd the file descriptor, possibly -1
- */
-static void
-scheduler_fd_cb (void *cls,
-                 int fd);
-
-
 /**
  * The GNUnet scheduler notifies us that we need to
  * trigger the DB event poller.
@@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls,
 
 
 void
-GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db)
 {
-  int fd;
-
-  GNUNET_assert (! db->scheduler_on);
-  GNUNET_assert (NULL == db->sc);
+  if (db->scheduler_on)
+    return;
   db->scheduler_on = true;
-  db->sc = &scheduler_fd_cb;
-  db->sc_cls = db;
-  fd = PQsocket (db->conn);
   scheduler_fd_cb (db,
-                   fd);
+                   PQsocket (db->conn));
 }
 
 
 void
-GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db)
 {
   GNUNET_assert (db->scheduler_on);
   GNUNET_free (db->rfd);
-  db->sc = NULL;
   db->scheduler_on = false;
   if (NULL != db->event_task)
   {
@@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context 
*db)
 }
 
 
+/**
+ * Helper function to trigger an SQL @a cmd on @a db
+ *
+ * @param db database to send command to
+ * @param cmd prefix of the command to send
+ * @param eh details about the event
+ */
 static void
 manage_subscribe (struct GNUNET_PQ_Context *db,
                   const char *cmd,
@@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db,
                 cmd);
   end = sh_to_channel (&eh->sh,
                        end);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Executing PQ command `%s'\n",
+              sql);
   result = PQexec (db->conn,
                    sql);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -395,21 +373,41 @@ register_notify (void *cls,
 
 
 void
-GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
+                            int fd)
 {
-  GNUNET_assert (0 ==
-                 pthread_mutex_lock (&db->notify_lock));
+  if (! db->scheduler_on)
+    return;
+  scheduler_fd_cb (db,
+                   fd);
   GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
                                           &register_notify,
                                           db);
-  GNUNET_assert (0 ==
-                 pthread_mutex_unlock (&db->notify_lock));
+}
+
+
+/**
+ * Function run on timeout for an event. Triggers
+ * the notification, but does NOT clear the handler.
+ *
+ * @param cls a `struct GNUNET_DB_EventHandler *`
+ */
+static void
+event_timeout (void *cls)
+{
+  struct GNUNET_DB_EventHandler *eh = cls;
+
+  eh->timeout_task = NULL;
+  eh->cb (eh->cb_cls,
+          NULL,
+          0);
 }
 
 
 struct GNUNET_DB_EventHandler *
 GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
                         const struct GNUNET_DB_EventHeaderP *es,
+                        struct GNUNET_TIME_Relative timeout,
                         GNUNET_DB_EventCallback cb,
                         void *cb_cls)
 {
@@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
             &eh->sh);
   eh->cb = cb;
   eh->cb_cls = cb_cls;
-  GNUNET_assert (0 ==
-                 pthread_mutex_lock (&db->notify_lock));
   was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multishortmap_put (db->channel_map,
                                                      &eh->sh,
                                                      eh,
                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  if ( (NULL != db->sc) &&
-       was_zero)
-  {
-    int fd = PQsocket (db->conn);
-
-    if (-1 != fd)
-      db->sc (db->sc_cls,
-              fd);
-  }
+  if (was_zero)
+    GNUNET_PQ_event_scheduler_start_ (db);
   manage_subscribe (db,
                     "LISTEN X",
                     eh);
-  GNUNET_assert (0 ==
-                 pthread_mutex_unlock (&db->notify_lock));
+  eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+                                                   &event_timeout,
+                                                   eh);
   return eh;
 }
 
@@ -453,8 +443,6 @@ GNUNET_PQ_event_listen_cancel (struct 
GNUNET_DB_EventHandler *eh)
 {
   struct GNUNET_PQ_Context *db = eh->db;
 
-  GNUNET_assert (0 ==
-                 pthread_mutex_lock (&db->notify_lock));
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
                                                         &eh->sh,
@@ -463,14 +451,15 @@ GNUNET_PQ_event_listen_cancel (struct 
GNUNET_DB_EventHandler *eh)
   manage_subscribe (db,
                     "UNLISTEN X",
                     eh);
-  if ( (NULL != db->sc) &&
-       (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
+  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
+  {
+    GNUNET_PQ_event_scheduler_stop_ (db);
+  }
+  if (NULL != eh->timeout_task)
   {
-    db->sc (db->sc_cls,
-            -1);
+    GNUNET_SCHEDULER_cancel (eh->timeout_task);
+    eh->timeout_task = NULL;
   }
-  GNUNET_assert (0 ==
-                 pthread_mutex_unlock (&db->notify_lock));
   GNUNET_free (eh);
 }
 
diff --git a/src/pq/test_pq.c b/src/pq/test_pq.c
index ffbb4d129..90b5c6489 100644
--- a/src/pq/test_pq.c
+++ b/src/pq/test_pq.c
@@ -240,63 +240,6 @@ run_queries (struct GNUNET_PQ_Context *db)
 }
 
 
-static void
-event_cb (void *cls,
-          const void *extra,
-          size_t extra_size)
-{
-  unsigned int *cnt = cls;
-
-  GNUNET_assert (5 == extra_size);
-  GNUNET_assert (0 == memcmp ("world",
-                              extra,
-                              5));
-  (*cnt)++;
-}
-
-
-/**
- * Run subscribe/notify tests.
- *
- * @param db database handle
- * @return 0 on success
- */
-static int
-test_notify (struct GNUNET_PQ_Context *db)
-{
-  struct GNUNET_DB_EventHeaderP e1 = {
-    .size = htons (sizeof (e1)),
-    .type = htons (1)
-  };
-  struct GNUNET_DB_EventHeaderP e2 = {
-    .size = htons (sizeof (e2)),
-    .type = htons (2)
-  };
-  unsigned int called = 0;
-  struct GNUNET_DB_EventHandler *eh;
-
-  eh = GNUNET_PQ_event_listen (db,
-                               &e1,
-                               &event_cb,
-                               &called);
-  GNUNET_assert (NULL != eh);
-  GNUNET_PQ_event_notify (db,
-                          &e2,
-                          "hello",
-                          5);
-  GNUNET_PQ_event_do_poll (db);
-  GNUNET_assert (0 == called);
-  GNUNET_PQ_event_notify (db,
-                          &e1,
-                          "world",
-                          5);
-  GNUNET_PQ_event_do_poll (db);
-  GNUNET_assert (1 == called);
-  GNUNET_PQ_event_listen_cancel (eh);
-  return 0;
-}
-
-
 /**
  * Task called on shutdown.
  *
@@ -305,7 +248,6 @@ test_notify (struct GNUNET_PQ_Context *db)
 static void
 event_end (void *cls)
 {
-  GNUNET_PQ_event_scheduler_stop (db);
   GNUNET_PQ_event_listen_cancel (eh);
   eh = NULL;
   if (NULL != tt)
@@ -368,9 +310,9 @@ sched_tests (void *cls)
   tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
                                      &timeout_cb,
                                      NULL);
-  GNUNET_PQ_event_scheduler_start (db);
   eh = GNUNET_PQ_event_listen (db,
                                &es,
+                               GNUNET_TIME_UNIT_FOREVER_REL,
                                &event_sched_cb,
                                NULL);
   GNUNET_PQ_reconnect (db);
@@ -404,7 +346,7 @@ main (int argc,
   };
 
   GNUNET_log_setup ("test-pq",
-                    "WARNING",
+                    "INFO",
                     NULL);
   db = GNUNET_PQ_connect ("postgres:///gnunetcheck",
                           NULL,
@@ -433,8 +375,6 @@ main (int argc,
     return 1;
   }
   ret = run_queries (db);
-  ret |= test_notify (db);
-  ret |= test_notify (db);
   if (0 != ret)
   {
     GNUNET_break (0);
diff --git a/src/util/os_priority.c b/src/util/os_priority.c
index dc2f0f97e..08320b291 100644
--- a/src/util/os_priority.c
+++ b/src/util/os_priority.c
@@ -47,7 +47,6 @@ struct GNUNET_OS_Process
    */
   pid_t pid;
 
-
   /**
    * Pipe we use to signal the process.
    * NULL if unused, or if process was deemed uncontrollable.
@@ -301,7 +300,8 @@ GNUNET_OS_process_destroy (struct GNUNET_OS_Process *proc)
  * @param flags open flags (O_RDONLY, O_WRONLY)
  */
 static void
-open_dev_null (int target_fd, int flags)
+open_dev_null (int target_fd,
+               int flags)
 {
   int fd;
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]