emacs-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 2/3] process changes


From: Tom Tromey
Subject: [PATCH 2/3] process changes
Date: Wed, 15 Aug 2012 08:49:14 -0600
User-agent: Gnus/5.13 (Gnus v5.13) Emacs/24.1 (gnu/linux)

This changes wait_reading_process_output to handle threads better.  It
introduces a wrapper for select that releases the global lock, and it
ensures that only a single thread can select a given file descriptor
at a time.

This also adds the thread-locking feature to processes.  By default a
process can only have its output accepted by the thread that created
it.  This can be changed using set-process-thread.  (If the thread
exits, the process is again available for waiting by any thread.)

Note that thread-signal will not currently interrupt a thread blocked
on select.  I'll fix this later.
---
 src/process.c |  175 +++++++++++++++++++++++++++++++++++++++++++++++---------
 src/process.h |    5 ++
 src/thread.c  |   47 +++++++++++++++
 src/thread.h  |   22 +++++++
 4 files changed, 221 insertions(+), 28 deletions(-)

diff --git a/src/process.c b/src/process.c
index a79592c..8331f80 100644
--- a/src/process.c
+++ b/src/process.c
@@ -335,6 +335,13 @@ static struct fd_callback_data
   void *data;
   /* Flags from enum fd_bits.  */
   int flags;
+  /* If this fd is locked to a certain thread, this points to it.
+     Otherwise, this is NULL.  If an fd is locked to a thread, then
+     only that thread is permitted to wait on it.  */
+  struct thread_state *thread;
+  /* If this fd is currently being selected on by a thread, this
+     points to the thread.  Otherwise it is NULL.  */
+  struct thread_state *waiting_thread;
 } fd_callback_info[MAXDESC];
 
 
@@ -451,8 +458,17 @@ compute_input_wait_mask (SELECT_TYPE *mask)
   FD_ZERO (mask);
   for (fd = 0; fd < max (max_process_desc, max_input_desc); ++fd)
     {
+      if (fd_callback_info[fd].thread != NULL
+         && fd_callback_info[fd].thread != current_thread)
+       continue;
+      if (fd_callback_info[fd].waiting_thread != NULL
+         && fd_callback_info[fd].waiting_thread != current_thread)
+       continue;
       if ((fd_callback_info[fd].flags & FOR_READ) != 0)
-       FD_SET (fd, mask);
+       {
+         FD_SET (fd, mask);
+         fd_callback_info[fd].waiting_thread = current_thread;
+       }
     }
 }
 
@@ -464,9 +480,18 @@ compute_non_process_wait_mask (SELECT_TYPE *mask)
   FD_ZERO (mask);
   for (fd = 0; fd < max (max_process_desc, max_input_desc); ++fd)
     {
+      if (fd_callback_info[fd].thread != NULL
+         && fd_callback_info[fd].thread != current_thread)
+       continue;
+      if (fd_callback_info[fd].waiting_thread != NULL
+         && fd_callback_info[fd].waiting_thread != current_thread)
+       continue;
       if ((fd_callback_info[fd].flags & FOR_READ) != 0
          && (fd_callback_info[fd].flags & PROCESS_FD) == 0)
-       FD_SET (fd, mask);
+       {
+         FD_SET (fd, mask);
+         fd_callback_info[fd].waiting_thread = current_thread;
+       }
     }
 }
 
@@ -478,9 +503,18 @@ compute_non_keyboard_wait_mask (SELECT_TYPE *mask)
   FD_ZERO (mask);
   for (fd = 0; fd < max (max_process_desc, max_input_desc); ++fd)
     {
+      if (fd_callback_info[fd].thread != NULL
+         && fd_callback_info[fd].thread != current_thread)
+       continue;
+      if (fd_callback_info[fd].waiting_thread != NULL
+         && fd_callback_info[fd].waiting_thread != current_thread)
+       continue;
       if ((fd_callback_info[fd].flags & FOR_READ) != 0
          && (fd_callback_info[fd].flags & KEYBOARD_FD) == 0)
-       FD_SET (fd, mask);
+       {
+         FD_SET (fd, mask);
+         fd_callback_info[fd].waiting_thread = current_thread;
+       }
     }
 }
 
@@ -492,12 +526,31 @@ compute_write_mask (SELECT_TYPE *mask)
   FD_ZERO (mask);
   for (fd = 0; fd < max (max_process_desc, max_input_desc); ++fd)
     {
+      if (fd_callback_info[fd].thread != NULL
+         && fd_callback_info[fd].thread != current_thread)
+       continue;
+      if (fd_callback_info[fd].waiting_thread != NULL
+         && fd_callback_info[fd].waiting_thread != current_thread)
+       continue;
       if ((fd_callback_info[fd].flags & FOR_WRITE) != 0)
-       FD_SET (fd, mask);
+       {
+         FD_SET (fd, mask);
+         fd_callback_info[fd].waiting_thread = current_thread;
+       }
     }
 }
 
+static void
+clear_waiting_thread_info (void)
+{
+  int fd;
 
+  for (fd = 0; fd < max (max_process_desc, max_input_desc); ++fd)
+    {
+      if (fd_callback_info[fd].waiting_thread == current_thread)
+       fd_callback_info[fd].waiting_thread = NULL;
+    }
+}
 
 
 /* Compute the Lisp form of the process status, p->status, from
@@ -709,6 +762,7 @@ make_process (Lisp_Object name)
      Lisp data to nil, so do it only for slots which should not be nil.  */
   PSET (p, status, Qrun);
   PSET (p, mark, Fmake_marker ());
+  PSET (p, thread, Fcurrent_thread ());
 
   /* Initialize non-Lisp data.  Note that allocate_process zeroes out all
      non-Lisp data, so do it only for slots which should not be zero.  */
@@ -746,6 +800,27 @@ remove_process (register Lisp_Object proc)
   deactivate_process (proc);
 }
 
+void
+update_processes_for_thread_death (Lisp_Object dying_thread)
+{
+  Lisp_Object pair;
+
+  for (pair = Vprocess_alist; !NILP (pair); pair = XCDR (pair))
+    {
+      Lisp_Object process = XCDR (XCAR (pair));
+      if (EQ (XPROCESS (process)->thread, dying_thread))
+       {
+         struct Lisp_Process *proc = XPROCESS (process);
+
+         proc->thread = Qnil;
+         if (proc->infd >= 0)
+           fd_callback_info[proc->infd].thread = NULL;
+         if (proc->outfd >= 0)
+           fd_callback_info[proc->outfd].thread = NULL;
+       }
+    }
+}
+
 
 DEFUN ("processp", Fprocessp, Sprocessp, 1, 1, 0,
        doc: /* Return t if OBJECT is a process.  */)
@@ -1094,6 +1169,42 @@ See `set-process-sentinel' for more info on sentinels.  
*/)
   return XPROCESS (process)->sentinel;
 }
 
+DEFUN ("set-process-thread", Fset_process_thread, Sset_process_thread,
+       2, 2, 0,
+       doc: /* FIXME */)
+  (Lisp_Object process, Lisp_Object thread)
+{
+  struct Lisp_Process *proc;
+  struct thread_state *tstate;
+
+  CHECK_PROCESS (process);
+  if (NILP (thread))
+    tstate = NULL;
+  else
+    {
+      CHECK_THREAD (thread);
+      tstate = XTHREAD (thread);
+    }
+
+  proc = XPROCESS (process);
+  proc->thread = thread;
+  if (proc->infd >= 0)
+    fd_callback_info[proc->infd].thread = tstate;
+  if (proc->outfd >= 0)
+    fd_callback_info[proc->outfd].thread = tstate;
+
+  return thread;
+}
+
+DEFUN ("process-thread", Fprocess_thread, Sprocess_thread,
+       1, 1, 0,
+       doc: /* FIXME */)
+  (Lisp_Object process)
+{
+  CHECK_PROCESS (process);
+  return XPROCESS (process)->thread;
+}
+
 DEFUN ("set-process-window-size", Fset_process_window_size,
        Sset_process_window_size, 3, 3, 0,
        doc: /* Tell PROCESS that it has logical window size HEIGHT and WIDTH.  
*/)
@@ -3993,7 +4104,17 @@ Return non-nil if we received any output before the 
timeout expired.  */)
   int nsecs;
 
   if (! NILP (process))
-    CHECK_PROCESS (process);
+    {
+      struct Lisp_Process *procp;
+
+      CHECK_PROCESS (process);
+      procp = XPROCESS (process);
+
+      /* Can't wait for a process that is dedicated to a different
+        thread.  */
+      if (!EQ (procp->thread, Qnil) && !EQ (procp->thread, Fcurrent_thread ()))
+       error ("FIXME");
+    }
   else
     just_this_one = Qnil;
 
@@ -4249,20 +4370,10 @@ server_accept_connection (Lisp_Object server, int 
channel)
                            build_string ("\n")));
 }
 
-/* This variable is different from waiting_for_input in keyboard.c.
-   It is used to communicate to a lisp process-filter/sentinel (via the
-   function Fwaiting_for_user_input_p below) whether Emacs was waiting
-   for user-input when that process-filter was called.
-   waiting_for_input cannot be used as that is by definition 0 when
-   lisp code is being evalled.
-   This is also used in record_asynch_buffer_change.
-   For that purpose, this must be 0
-   when not inside wait_reading_process_output.  */
-static int waiting_for_user_input_p;
-
 static Lisp_Object
 wait_reading_process_output_unwind (Lisp_Object data)
 {
+  clear_waiting_thread_info ();
   waiting_for_user_input_p = XINT (data);
   return Qnil;
 }
@@ -4329,6 +4440,10 @@ wait_reading_process_output (intmax_t time_limit, int 
nsecs, int read_kbd,
   int got_some_input = 0;
   ptrdiff_t count = SPECPDL_INDEX ();
 
+  eassert (wait_proc == NULL
+          || EQ (wait_proc->thread, Qnil)
+          || XTHREAD (wait_proc->thread) == current_thread);
+
   FD_ZERO (&Available);
   FD_ZERO (&Writeok);
 
@@ -4484,14 +4599,15 @@ wait_reading_process_output (intmax_t time_limit, int 
nsecs, int read_kbd,
          compute_write_mask (&Ctemp);
 
          timeout = make_emacs_time (0, 0);
-         if ((pselect (max (max_process_desc, max_input_desc) + 1,
-                       &Atemp,
+         if ((thread_select (pselect,
+                             max (max_process_desc, max_input_desc) + 1,
+                             &Atemp,
 #ifdef NON_BLOCKING_CONNECT
-                       (num_pending_connects > 0 ? &Ctemp : NULL),
+                             (num_pending_connects > 0 ? &Ctemp : NULL),
 #else
-                       NULL,
+                             NULL,
 #endif
-                       NULL, &timeout, NULL)
+                             NULL, &timeout, NULL)
               <= 0))
            {
              /* It's okay for us to do this and then continue with
@@ -4639,17 +4755,18 @@ wait_reading_process_output (intmax_t time_limit, int 
nsecs, int read_kbd,
              process_output_skip = 0;
            }
 #endif
+         nfds = thread_select (
 #if defined (USE_GTK) || defined (HAVE_GCONF) || defined (HAVE_GSETTINGS)
-          nfds = xg_select
+                               xg_select
 #elif defined (HAVE_NS)
-         nfds = ns_select
+                               ns_select
 #else
-         nfds = pselect
+                               pselect
 #endif
-            (max (max_process_desc, max_input_desc) + 1,
-             &Available,
-             (check_write ? &Writeok : (SELECT_TYPE *)0),
-             NULL, &timeout, NULL);
+                               , max (max_process_desc, max_input_desc) + 1,
+                               &Available,
+                               (check_write ? &Writeok : (SELECT_TYPE *)0),
+                               NULL, &timeout, NULL);
 
 #ifdef HAVE_GNUTLS
           /* GnuTLS buffers data internally.  In lowat mode it leaves
@@ -7597,6 +7714,8 @@ The variable takes effect when `start-process' is called. 
 */);
   defsubr (&Sprocess_filter);
   defsubr (&Sset_process_sentinel);
   defsubr (&Sprocess_sentinel);
+  defsubr (&Sset_process_thread);
+  defsubr (&Sprocess_thread);
   defsubr (&Sset_process_window_size);
   defsubr (&Sset_process_inherit_coding_system_flag);
   defsubr (&Sset_process_query_on_exit_flag);
diff --git a/src/process.h b/src/process.h
index 43cc7ea..1ddfe91 100644
--- a/src/process.h
+++ b/src/process.h
@@ -103,6 +103,9 @@ struct Lisp_Process
     Lisp_Object gnutls_cred_type;
 #endif
 
+    /* The thread a process is linked to, or nil for any thread.  */
+    Lisp_Object thread;
+
     /* After this point, there are no Lisp_Objects any more.  */
     /* alloc.c assumes that `pid' is the first such non-Lisp slot.  */
 
@@ -208,3 +211,5 @@ extern void add_read_fd (int fd, fd_callback func, void 
*data);
 extern void delete_read_fd (int fd);
 extern void add_write_fd (int fd, fd_callback func, void *data);
 extern void delete_write_fd (int fd);
+
+extern void update_processes_for_thread_death (Lisp_Object);
diff --git a/src/thread.c b/src/thread.c
index 40c8be9..be98b4a 100644
--- a/src/thread.c
+++ b/src/thread.c
@@ -22,6 +22,7 @@ along with GNU Emacs.  If not, see 
<http://www.gnu.org/licenses/>.  */
 #include "lisp.h"
 #include "character.h"
 #include "buffer.h"
+#include "process.h"
 
 /* FIXME */
 extern void unbind_for_thread_switch (void);
@@ -176,6 +177,50 @@ acquire_global_lock (struct thread_state *self)
 
 
 
+struct select_args
+{
+  select_func *func;
+  int max_fds;
+  SELECT_TYPE *rfds;
+  SELECT_TYPE *wfds;
+  SELECT_TYPE *efds;
+  EMACS_TIME *timeout;
+  sigset_t *sigmask;
+  int result;
+};
+
+static void
+really_call_select (void *arg)
+{
+  struct select_args *sa = arg;
+  struct thread_state *self = current_thread;
+
+  release_global_lock ();
+  sa->result = (sa->func) (sa->max_fds, sa->rfds, sa->wfds, sa->efds,
+                          sa->timeout, sa->sigmask);
+  acquire_global_lock (self);
+}
+
+int
+thread_select (select_func *func, int max_fds, SELECT_TYPE *rfds,
+              SELECT_TYPE *wfds, SELECT_TYPE *efds, EMACS_TIME *timeout,
+              sigset_t *sigmask)
+{
+  struct select_args sa;
+
+  sa.func = func;
+  sa.max_fds = max_fds;
+  sa.rfds = rfds;
+  sa.wfds = wfds;
+  sa.efds = efds;
+  sa.timeout = timeout;
+  sa.sigmask = sigmask;
+  flush_stack_call_func (really_call_select, &sa);
+  return sa.result;
+}
+
+
+
 static void
 mark_one_thread (struct thread_state *thread)
 {
@@ -315,6 +360,8 @@ run_thread (void *state)
 
   unbind_for_thread_switch ();
 
+  update_processes_for_thread_death (Fcurrent_thread ());
+
   /* Unlink this thread from the list of all threads.  */
   for (iter = &all_threads; *iter != self; iter = &(*iter)->next_thread)
     ;
diff --git a/src/thread.h b/src/thread.h
index d21887a..9db3c79 100644
--- a/src/thread.h
+++ b/src/thread.h
@@ -21,6 +21,9 @@ along with GNU Emacs.  If not, see 
<http://www.gnu.org/licenses/>.  */
 
 #include "regex.h"
 
+#include "sysselect.h"         /* FIXME */
+#include "systime.h"           /* FIXME */
+
 struct thread_state
 {
   struct vectorlike_header header;
@@ -156,6 +159,18 @@ struct thread_state
   /*re_char*/ unsigned char *m_whitespace_regexp;
 #define whitespace_regexp (current_thread->m_whitespace_regexp)
 
+  /* This variable is different from waiting_for_input in keyboard.c.
+     It is used to communicate to a lisp process-filter/sentinel (via the
+     function Fwaiting_for_user_input_p) whether Emacs was waiting
+     for user-input when that process-filter was called.
+     waiting_for_input cannot be used as that is by definition 0 when
+     lisp code is being evalled.
+     This is also used in record_asynch_buffer_change.
+     For that purpose, this must be 0
+     when not inside wait_reading_process_output.  */
+  int m_waiting_for_user_input_p;
+#define waiting_for_user_input_p (current_thread->m_waiting_for_user_input_p)
+
   /* The OS identifier for this thread.  */
   sys_thread_t thread_id;
 
@@ -194,4 +209,11 @@ extern void init_threads_once (void);
 extern void init_threads (void);
 extern void syms_of_threads (void);
 
+typedef int select_func (int, SELECT_TYPE *, SELECT_TYPE *, SELECT_TYPE *,
+                        EMACS_TIME *, sigset_t *);
+
+int thread_select  (select_func *func, int max_fds, SELECT_TYPE *rfds,
+                   SELECT_TYPE *wfds, SELECT_TYPE *efds, EMACS_TIME *timeout,
+                   sigset_t *sigmask);
+
 #endif /* THREAD_H */
-- 
1.7.7.6




reply via email to

[Prev in Thread] Current Thread [Next in Thread]