gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libmicrohttpd] 01/02: Fixed thread-safety for externally added connecti


From: gnunet
Subject: [libmicrohttpd] 01/02: Fixed thread-safety for externally added connections Fully re-implemented scheme of adding connections from external thread (application)
Date: Sun, 25 Oct 2020 15:26:24 +0100

This is an automated email from the git hooks/post-receive script.

karlson2k pushed a commit to branch master
in repository libmicrohttpd.

commit 19b038f68272a423ae74c89427c8b5dcfc7ce1ae
Author: Evgeny Grin (Karlson2k) <k2k@narod.ru>
AuthorDate: Thu Oct 22 16:38:59 2020 +0300

    Fixed thread-safety for externally added connections
    Fully re-implemented scheme of adding connections
    from external thread (application)
---
 src/microhttpd/daemon.c   | 237 ++++++++++++++++++++++++++++++++++++----------
 src/microhttpd/internal.h |  21 ++++
 2 files changed, 210 insertions(+), 48 deletions(-)

diff --git a/src/microhttpd/daemon.c b/src/microhttpd/daemon.c
index 7ffd93cd..c0be6bc0 100644
--- a/src/microhttpd/daemon.c
+++ b/src/microhttpd/daemon.c
@@ -2606,35 +2606,56 @@ new_connection_prepare_ (struct MHD_Daemon *daemon,
 }
 
 
+/**
+ * Close prepared, but not yet processed connection.
+ * @param daemon     the daemon
+ * @param connection the connection to close
+ */
+static void
+new_connection_close_ (struct MHD_Daemon *daemon,
+                       struct MHD_Connection *connection)
+{
+  mhd_assert (connection->daemon == daemon);
+  mhd_assert (! connection->in_cleanup);
+  mhd_assert (NULL == connection->next);
+  mhd_assert (NULL == connection->nextX);
+#ifdef EPOLL_SUPPORT
+  mhd_assert (NULL == connection->nextE);
+#endif /* EPOLL_SUPPORT */
+
+#ifdef HTTPS_SUPPORT
+  if (NULL != connection->tls_session)
+  {
+    mhd_assert (0 != (daemon->options & MHD_USE_TLS));
+    gnutls_deinit (connection->tls_session);
+  }
+#endif /* HTTPS_SUPPORT */
+  MHD_socket_close_chk_ (connection->socket_fd);
+  MHD_ip_limit_del (daemon,
+                    connection->addr,
+                    connection->addr_len);
+  free (connection->addr);
+  free (connection);
+}
+
+
 /**
  * Finally insert the new connection to the list of connections
- * served by the daemon.
+ * served by the daemon and start processing.
  * @remark To be called only from thread that process
  * daemon's select()/poll()/etc.
  *
  * @param daemon daemon that manages the connection
- * @param client_socket socket to manage (MHD will expect
- *        to receive an HTTP request from this socket next).
- * @param addr IP address of the client
- * @param addrlen number of bytes in @a addr
- * @param external_add perform additional operations needed due
- *        to the application calling us directly
  * @param connection the newly created connection
- * @return #MHD_YES on success, #MHD_NO if this daemon could
- *        not handle the connection (i.e. malloc failed, etc).
- *        The socket will be closed in any case; 'errno' is
- *        set to indicate further details about the error.
+ * @return #MHD_YES on success, #MHD_NO on error
  */
 static enum MHD_Result
-new_connection_insert_ (struct MHD_Daemon *daemon,
-                        MHD_socket client_socket,
-                        const struct sockaddr *addr,
-                        socklen_t addrlen,
-                        bool external_add,
-                        struct MHD_Connection *connection)
+new_connection_process_ (struct MHD_Daemon *daemon,
+                         struct MHD_Connection *connection)
 {
   int eno = 0;
 
+  mhd_assert (connection->daemon == daemon);
   /* Allocate memory pool in the processing thread so
    * intensively used memory area is allocated in "good"
    * (for the thread) memory region. It is important with
@@ -2647,10 +2668,10 @@ new_connection_insert_ (struct MHD_Daemon *daemon,
               _ ("Error allocating memory: %s\n"),
               MHD_strerror_ (errno));
 #endif
-    MHD_socket_close_chk_ (client_socket);
+    MHD_socket_close_chk_ (connection->socket_fd);
     MHD_ip_limit_del (daemon,
-                      addr,
-                      addrlen);
+                      connection->addr,
+                      connection->addr_len);
     free (connection);
 #if ENOMEM
     errno = ENOMEM;
@@ -2721,15 +2742,15 @@ new_connection_insert_ (struct MHD_Daemon *daemon,
 #ifdef EPOLL_SUPPORT
   if (0 != (daemon->options & MHD_USE_EPOLL))
   {
-    if ((0 == (daemon->options & MHD_USE_TURBO)) || (external_add))
-    { /* Do not manipulate EReady DL-list in 'external_add' mode. */
+    if (0 == (daemon->options & MHD_USE_TURBO))
+    {
       struct epoll_event event;
 
       event.events = EPOLLIN | EPOLLOUT | EPOLLPRI | EPOLLET;
       event.data.ptr = connection;
       if (0 != epoll_ctl (daemon->epoll_fd,
                           EPOLL_CTL_ADD,
-                          client_socket,
+                          connection->socket_fd,
                           &event))
       {
         eno = errno;
@@ -2752,20 +2773,10 @@ new_connection_insert_ (struct MHD_Daemon *daemon,
                    connection);
     }
   }
-  else /* This 'else' is combined with next 'if'. */
-#endif
-  if ( (0 == (daemon->options & MHD_USE_THREAD_PER_CONNECTION)) &&
-       (external_add) &&
-       (MHD_ITC_IS_VALID_ (daemon->itc)) &&
-       (! MHD_itc_activate_ (daemon->itc, "n")) )
-  {
-#ifdef HAVE_MESSAGES
-    MHD_DLOG (daemon,
-              _ (
-                "Failed to signal new connection via inter-thread 
communication channel.\n"));
 #endif
-  }
+
   return MHD_YES;
+
 cleanup:
   if (NULL != daemon->notify_connection)
     daemon->notify_connection (daemon->notify_connection_cls,
@@ -2776,10 +2787,10 @@ cleanup:
   if (NULL != connection->tls_session)
     gnutls_deinit (connection->tls_session);
 #endif /* HTTPS_SUPPORT */
-  MHD_socket_close_chk_ (client_socket);
+  MHD_socket_close_chk_ (connection->socket_fd);
   MHD_ip_limit_del (daemon,
-                    addr,
-                    addrlen);
+                    connection->addr,
+                    connection->addr_len);
 #if defined(MHD_USE_POSIX_THREADS) || defined(MHD_USE_W32_THREADS)
   MHD_mutex_lock_chk_ (&daemon->cleanup_connection_mutex);
 #endif
@@ -2880,8 +2891,82 @@ internal_add_connection (struct MHD_Daemon *daemon,
                                          non_blck, &connection))
     return MHD_NO;
 
-  return new_connection_insert_ (daemon, client_socket, addr, addrlen,
-                                 external_add, connection);
+  if ((external_add) &&
+      (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD)))
+  {
+    /* Connection is added externally and MHD is handling its own threads. */
+    MHD_mutex_lock_chk_ (&daemon->new_connections_mutex);
+    DLL_insert (daemon->new_connections_head,
+                daemon->new_connections_tail,
+                connection);
+    daemon->have_new = true;
+    MHD_mutex_unlock_chk_ (&daemon->new_connections_mutex);
+
+    /* The rest of connection processing must be handled in
+     * the daemon thread. */
+    if ((MHD_ITC_IS_VALID_ (daemon->itc)) &&
+        (! MHD_itc_activate_ (daemon->itc, "n")))
+    {
+ #ifdef HAVE_MESSAGES
+      MHD_DLOG (daemon,
+                _ ("Failed to signal new connection via inter-thread " \
+                   "communication channel.\n"));
+ #endif
+    }
+    return MHD_YES;
+  }
+
+  return new_connection_process_ (daemon, connection);
+}
+
+
+static void
+new_connections_list_process_ (struct MHD_Daemon *daemon)
+{
+  struct MHD_Connection *local_head;
+  struct MHD_Connection *local_tail;
+  struct MHD_Connection *c;   /**< Currently processed connection */
+  mhd_assert (daemon->have_new);
+  mhd_assert (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD));
+
+  local_head = NULL;
+  local_tail = NULL;
+
+  /* Move all new connections to the local DL-list to release the mutex
+   * as quick as possible. */
+  MHD_mutex_lock_chk_ (&daemon->new_connections_mutex);
+  mhd_assert (NULL != daemon->new_connections_head);
+  do
+  { /* Move connection in FIFO order. */
+    c = daemon->new_connections_tail;
+    DLL_remove (daemon->new_connections_head,
+                daemon->new_connections_tail,
+                c);
+    DLL_insert (local_head,
+                local_tail,
+                c);
+  } while (NULL != daemon->new_connections_tail);
+  daemon->have_new = false;
+  MHD_mutex_unlock_chk_ (&daemon->new_connections_mutex);
+
+  /* Process new connections in FIFO order. */
+  do
+  {
+    c = local_tail;
+    DLL_remove (local_head,
+                local_tail,
+                c);
+    mhd_assert (daemon == c->daemon);
+    if (MHD_NO == new_connection_process_ (daemon, c))
+    {
+#ifdef HAVE_MESSAGES
+      MHD_DLOG (daemon,
+                _ ("Failed to start serving new connection.\n"));
+#endif
+      (void) 0;
+    }
+  } while (NULL != local_tail);
+
 }
 
 
@@ -3710,6 +3795,10 @@ internal_run_from_select (struct MHD_Daemon *daemon,
                   read_fd_set)) )
     MHD_itc_clear_ (daemon->itc);
 
+  /* Process externally added connection if any */
+  if (daemon->have_new)
+    new_connections_list_process_ (daemon);
+
   /* select connection thread handling type */
   if ( (MHD_INVALID_SOCKET != (ds = daemon->listen_fd)) &&
        (! daemon->was_quiesced) &&
@@ -4141,9 +4230,6 @@ MHD_poll_all (struct MHD_Daemon *daemon,
       return MHD_NO;
     }
 
-    /* Reset. New value will be set when connections are processed. */
-    daemon->data_already_pending = false;
-
     /* handle ITC FD */
     /* do it before any other processing so
        new signals will be processed in next loop */
@@ -4157,6 +4243,19 @@ MHD_poll_all (struct MHD_Daemon *daemon,
       free (p);
       return MHD_NO;
     }
+
+    /* Process externally added connection if any */
+    if (daemon->have_new)
+      new_connections_list_process_ (daemon);
+
+    /* handle 'listen' FD */
+    if ( (-1 != poll_listen) &&
+         (0 != (p[poll_listen].revents & POLLIN)) )
+      (void) MHD_accept_connection (daemon);
+
+    /* Reset. New value will be set when connections are processed. */
+    daemon->data_already_pending = false;
+
     i = 0;
     prev = daemon->connections_tail;
     while (NULL != (pos = prev))
@@ -4209,10 +4308,6 @@ MHD_poll_all (struct MHD_Daemon *daemon,
       }
     }
 #endif /* HTTPS_SUPPORT && UPGRADE_SUPPORT */
-    /* handle 'listen' FD */
-    if ( (-1 != poll_listen) &&
-         (0 != (p[poll_listen].revents & POLLIN)) )
-      (void) MHD_accept_connection (daemon);
 
     free (p);
   }
@@ -4294,6 +4389,11 @@ MHD_poll_listen_socket (struct MHD_Daemon *daemon,
   /* handle shutdown */
   if (daemon->shutdown)
     return MHD_NO;
+
+  /* Process externally added connection if any */
+  if (daemon->have_new)
+    new_connections_list_process_ (daemon);
+
   if ( (-1 != poll_listen) &&
        (0 != (p[poll_listen].revents & POLLIN)) )
     (void) MHD_accept_connection (daemon);
@@ -4739,6 +4839,10 @@ MHD_epoll (struct MHD_Daemon *daemon,
     }
   }
 
+  /* Process externally added connection if any */
+  if (daemon->have_new)
+    new_connections_list_process_ (daemon);
+
   if (need_to_accept)
   {
     unsigned int series_length = 0;
@@ -6633,6 +6737,18 @@ MHD_start_daemon_va (unsigned int flags,
 #endif /* ! HAVE_LISTEN_SHUTDOWN */
     if (0 == daemon->worker_pool_size)
     {
+      if (! MHD_mutex_init_ (&daemon->new_connections_mutex))
+      {
+#ifdef HAVE_MESSAGES
+        MHD_DLOG (daemon,
+                  _ ("Failed to initialise mutex.\n"));
+#endif
+        MHD_mutex_destroy_chk_ (&daemon->cleanup_connection_mutex);
+        MHD_mutex_destroy_chk_ (&daemon->per_ip_connection_mutex);
+        if (MHD_INVALID_SOCKET != listen_fd)
+          MHD_socket_close_chk_ (listen_fd);
+        goto free_and_fail;
+      }
       if (! MHD_create_named_thread_ (&daemon->pid,
                                       (*pflags
                                        & MHD_USE_THREAD_PER_CONNECTION) ?
@@ -6646,6 +6762,7 @@ MHD_start_daemon_va (unsigned int flags,
                   _ ("Failed to create listen thread: %s\n"),
                   MHD_strerror_ (errno));
 #endif
+        MHD_mutex_destroy_chk_ (&daemon->new_connections_mutex);
         MHD_mutex_destroy_chk_ (&daemon->cleanup_connection_mutex);
         MHD_mutex_destroy_chk_ (&daemon->per_ip_connection_mutex);
         if (MHD_INVALID_SOCKET != listen_fd)
@@ -6685,7 +6802,14 @@ MHD_start_daemon_va (unsigned int flags,
         d->master = daemon;
         d->worker_pool_size = 0;
         d->worker_pool = NULL;
-
+        if (! MHD_mutex_init_ (&d->new_connections_mutex))
+        {
+  #ifdef HAVE_MESSAGES
+          MHD_DLOG (daemon,
+                    _ ("Failed to initialise mutex.\n"));
+  #endif
+          goto thread_failed;
+        }
         if (0 != (*pflags & MHD_USE_ITC))
         {
           if (! MHD_itc_init_ (d->itc))
@@ -6696,6 +6820,7 @@ MHD_start_daemon_va (unsigned int flags,
                         "Failed to create worker inter-thread communication 
channel: %s\n"),
                       MHD_itc_last_strerror_ () );
 #endif
+            MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
             goto thread_failed;
           }
           if ( (0 == (*pflags & (MHD_USE_POLL | MHD_USE_EPOLL))) &&
@@ -6707,6 +6832,7 @@ MHD_start_daemon_va (unsigned int flags,
                       _ (
                         "File descriptor for worker inter-thread communication 
channel exceeds maximum value.\n"));
 #endif
+            MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
             MHD_itc_destroy_chk_ (d->itc);
             goto thread_failed;
           }
@@ -6733,6 +6859,7 @@ MHD_start_daemon_va (unsigned int flags,
         {
           if (MHD_ITC_IS_VALID_ (d->itc))
             MHD_itc_destroy_chk_ (d->itc);
+          MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
           goto thread_failed;
         }
 #endif
@@ -6745,6 +6872,7 @@ MHD_start_daemon_va (unsigned int flags,
 #endif
           if (MHD_ITC_IS_VALID_ (d->itc))
             MHD_itc_destroy_chk_ (d->itc);
+          MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
           goto thread_failed;
         }
 
@@ -6765,6 +6893,7 @@ MHD_start_daemon_va (unsigned int flags,
           MHD_mutex_destroy_chk_ (&d->cleanup_connection_mutex);
           if (MHD_ITC_IS_VALID_ (d->itc))
             MHD_itc_destroy_chk_ (d->itc);
+          MHD_mutex_destroy_chk_ (&d->new_connections_mutex);
           goto thread_failed;
         }
       }
@@ -6879,6 +7008,17 @@ close_all_connections (struct MHD_Daemon *daemon)
   mhd_assert (NULL == daemon->worker_pool);
 #endif
   mhd_assert (daemon->shutdown);
+
+  /* Remove externally added new connections that are
+   * not processed by the daemon thread. */
+  while (NULL != (pos = daemon->new_connections_tail))
+  {
+    mhd_assert (0 != (daemon->options & MHD_USE_INTERNAL_POLLING_THREAD));
+    DLL_remove (daemon->new_connections_head,
+                daemon->new_connections_tail,
+                pos);
+    new_connection_close_ (daemon, pos);
+  }
   /* give upgraded HTTPS connections a chance to finish */
   /* 'daemon->urh_head' is not used in thread-per-connection mode. */
   for (urh = daemon->urh_tail; NULL != urh; urh = urhn)
@@ -7126,6 +7266,7 @@ MHD_stop_daemon (struct MHD_Daemon *daemon)
     }
     if (MHD_ITC_IS_VALID_ (daemon->itc))
       MHD_itc_destroy_chk_ (daemon->itc);
+    MHD_mutex_destroy_chk_ (&daemon->new_connections_mutex);
 
 #ifdef EPOLL_SUPPORT
     if ( (0 != (daemon->options & MHD_USE_EPOLL)) &&
diff --git a/src/microhttpd/internal.h b/src/microhttpd/internal.h
index 5fb3da3d..3b044984 100644
--- a/src/microhttpd/internal.h
+++ b/src/microhttpd/internal.h
@@ -1301,6 +1301,16 @@ struct MHD_Daemon
    */
   void *default_handler_cls;
 
+  /**
+   * Head of doubly-linked list of new, externally added connections.
+   */
+  struct MHD_Connection *new_connections_head;
+
+  /**
+   * Tail of doubly-linked list of new, externally added connections.
+   */
+  struct MHD_Connection *new_connections_tail;
+
   /**
    * Head of doubly-linked list of our current, active connections.
    */
@@ -1516,6 +1526,11 @@ struct MHD_Daemon
    * "manual_timeout" DLLs.
    */
   MHD_mutex_ cleanup_connection_mutex;
+
+  /**
+   * Mutex for any access to the "new connections" DL-list.
+   */
+  MHD_mutex_ new_connections_mutex;
 #endif
 
   /**
@@ -1600,6 +1615,12 @@ struct MHD_Daemon
    */
   volatile bool resuming;
 
+  /**
+   * Indicate that new connections in @e new_connections_head list
+   * need to be processed.
+   */
+  volatile bool have_new;
+
   /**
    * 'True' if some data is already waiting to be processed.
    * If set to 'true' - zero timeout for select()/poll*()

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]