guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[shepherd] 15/15: WIP: shepherd: Port core service actor.


From: Juliana Sims
Subject: [shepherd] 15/15: WIP: shepherd: Port core service actor.
Date: Tue, 26 Nov 2024 13:27:20 -0500 (EST)

juli pushed a commit to branch wip-goblinsify
in repository shepherd.

commit 2739fea85d63049f1f4c304e93770a897288a6c4
Author: Juliana Sims <juli@incana.org>
AuthorDate: Tue Nov 26 13:10:56 2024 -0500

    WIP: shepherd: Port core service actor.
    
    This is a living commit and will change over time.  All "final" commits will
    be atomic.
    
    Summary of major architectural changes:
     - wrap the <service> record in a Goblins actor
     - remove the service controller, moving its messages into the service actor
     - move stop logic inside the service actor
    
    * modules/shepherd/service.scm: Port the core service actor.
---
 modules/shepherd/service.scm | 379 +++++++++++++++++++++++++++++++++++--------
 1 file changed, 313 insertions(+), 66 deletions(-)

diff --git a/modules/shepherd/service.scm b/modules/shepherd/service.scm
index 13ca230..bb35a06 100644
--- a/modules/shepherd/service.scm
+++ b/modules/shepherd/service.scm
@@ -32,6 +32,9 @@
   #:use-module (fibers conditions)
   #:use-module (fibers scheduler)
   #:use-module (fibers timers)
+  #:use-module (goblins)
+  #:use-module (goblins actor-lib cell)
+  #:use-module (goblins actor-lib methods)
   #:use-module (srfi srfi-1)
   #:use-module (srfi srfi-9)
   #:use-module (srfi srfi-9 gnu)
@@ -175,6 +178,11 @@
             get-message*                      ;XXX: for lack of a better place
             essential-task-thunk))
 
+(define-syntax-rule (if-on pred b1 b2)
+  (on pred (lambda (true?) (if true? b1 b2)) #:promise? #t))
+
+(define-syntax-rule (match-on expr body body* ...)
+  (on expr (match-lambda body body* ...) #:promise? #t))
 
 (define sleep (@ (fibers) sleep))
 
@@ -390,15 +398,306 @@ Log abnormal termination reported by @var{status}."
 denoting what the service provides."
   (match provision
     (((? symbol?) ..1)
-     (make-service provision requirement one-shot? transient?
-                   respawn? respawn-limit respawn-delay
-                   start stop actions termination-handler
-                   documentation #f))
+     (selfish-spawn ^service
+                    (make-service provision requirement one-shot? transient?
+                                  respawn? respawn-limit respawn-delay
+                                  start stop actions termination-handler
+                                  documentation #f)))
     (_
      (raise (condition
              (&message (message "invalid service provision list")))))))
 
-(define (service-control service)                 ;internal
+(define-actor (^service _bcom self service-record)
+  "Constructor for Goblins actor representing a service"
+  #:frozen
+  (define (pid? obj)
+    "Return #t if @var{obj} looks like a PID"
+    (and (integer? obj) (exact? obj) (> obj 1)))
+
+  (define (update-status-changes status)
+    "Add @var{status} to @var{changes}, the ring buffer of status changes"
+    (ring-buffer-insert (cons status (current-time))
+                        ($ changes)))
+  (define (stop)
+    "Attempt to stop this service, blocking if it is already being stopped.
+Return a promise resolving to @code{#f} if this service was already running or
+being stopped; otherwise return a promise resolving to a promise resolver to be
+fulfilled when this service stops."
+    (match ($ status)
+      ('stopping
+       ;; We are being stopped; wait until we are stopped then return #f.
+       (on ($ stopped-promise)
+           (lambda _
+             (let-values ((promise resolver) (spawn-promise-values))
+               ($ stopped-promise promise)
+               ($ stopped-promise-resolver resolver))
+             #f)
+           #:promise? #t))
+      ('stopped #f)
+      ('starting
+       ;; We are being started; wait until we are started, then try to stop.
+       (local-output (l10n "Waiting for ~a to start...")
+                     ($ self 'canonical-name))
+       (on ($ started-promise)
+           (lambda _
+             (let-values ((promise resolver) (spawn-promise-values))
+               ($ started-promise promise)
+               ($ started-promise-resolver resolver))
+             (stop))
+           #:promise? #t))
+      ('running
+       ;; Stop this service.
+       (let-values (((notification-promise notification-promise-resolver)
+                     (spawn-promise-values)))
+         (on notification-promise
+             (lambda (stopped?)
+               ;; The STOPPED? boolean is supposed to indicate success
+               ;; or failure, but sometimes 'stop' method might return a
+               ;; truth value even though the service was successfully
+               ;; stopped, hence "might have failed" below.
+               (if stopped?
+                   (local-output (l10n "Service ~a stopped.")
+                                 ($ self 'canonical-name))
+                   (local-output
+                    (l10n "Service ~a might have failed to stop.")
+                    ($ self 'canonical-name)))
+               (<-np self 'service-stopped!)))
+         (local-output (l10n "Stopping service ~a...")
+                       ($ self 'canonical-name))
+         ($ status 'stopping)
+         ($ changes (update-status-changes 'stopping))
+         notification-promise-resolver))))
+
+  (define-values (started-promise started-promise-resolver)
+    (let-values ((promise resolver)
+                 (spawn-promise-values))
+      (values
+       (spawn ^cell promise)
+       (spawn ^cell resolver))))
+  (define-values (stopped-promise stopped-promise-resolver)
+    (let-values ((promise resolver)
+                 (spawn-promise-values))
+      (values
+       (spawn ^cell promise)
+       (spawn ^cell resolver))))
+  (define-cell status 'stopped)
+  (define-cell value #f)
+  (define-cell enabled? #t)
+  (define-cell changes                  ;list of status/timestamp pairs
+    (ring-buffer %max-recorded-startup-changes))
+  (define-cell failures                 ; list of timestamps
+    (ring-buffer %max-recorded-startup-failures))
+  (define-cell respawns '())            ; list of timestamps
+  (define-cell exit-statuses
+    (ring-buffer %max-recorded-exist-statuses))
+  (define-cell replacement #f)
+  ;; TODO: port to Goblins
+  (define-cell logger #f)               ;logger actor
+  (define-cell log-file)
+  (methods
+   ;; Immutable state
+   ((canonical-name) (car (service-provision service-record)))
+   ((provision) (service-provision service-record))
+   ((requirement) (service-requirement service-record))
+   ((one-shot?) (one-shot-service? service-record))
+   ((transient?) (transient-service service-record))
+   ((respawn?) (respawn-service? service-record))
+   ((respawn-limit) (service-respawn-limit service-record))
+   ((respawn-delay) (service-respawn-delay service-record))
+   ((start-action) (service-start service-record))
+   ((stop-action) (service-stop service-record))
+   ((actions) (service-actions service-record))
+   ((termination-handler) (service-termination-handler service-record))
+   ((documentation) (service-documentation service-record))
+   ((%control) (%service-control service-record))
+   ((%control new-control) (set-service-control! service-record new-control))
+
+   ((running) ($ value))
+   ((status) ($ status))
+   ((enabled?) ($ enabled?))
+   ((respawn-times) ($ respawns))
+   ((startup-failures) ($ failures))
+   ((status-changes) ($ changes))
+   ((exit-statuses) ($ exit-statuses))
+
+   ((enable) ($ enabled? #t))           ;no reply
+   ((disable) ($ enabled? #f))          ;no reply
+
+   ((stopped?) (eq? 'stopped ($ status)))
+   ((running?) (not ($ self 'stopped?)))
+
+   ((start)
+    ...)
+   ((service-started! new-value)        ;no reply
+    ;; When NEW-VALUE is a procedure, call it to get the actual value and
+    ;; pass it a callback so it can eventually change it.
+    (let ((new-value (cond ((procedure? new-value)
+                            (new-value
+                             (lambda (value)
+                               (<- self 'change-value! value))))
+                           ((pid? new-value) ;backward compatibility
+                            (pid->process new-value))
+                           (else new-value))))
+      (on new-value
+          (lambda (new-value)
+            (when new-value
+              (local-output (l10n "Service ~a running with value ~s.")
+                            ($ self 'canonical-name) new-value))
+            (when (process? new-value)
+              ;; TODO: service monitoring
+              (monitor-service-process service (process-id new-value)))
+
+            (<-np ($ started-promise-resolver) 'fulfill)
+            (let ((new-status (if (and new-value
+                                       (not ($ self 'one-shot?)))
+                                  'running
+                                  'stopped)))
+              ($ status new-status)
+              ($ value (and (eq? new-value 'running) new-value))
+              ($ changes (update-status-changes new-status))
+              ($ failures (if new-value
+                              failures
+                              (ring-buffer-insert (current-time)
+                                                  ($ failures)))))))))
+   ((change-value! new-value)
+    (let ((new-value (if (pid? new-value) ;backward compatibility
+                         (pid->process new-value)
+                         new-value)))
+      (local-output (l10n "Running value of service ~a changed to ~s.")
+                    ($ self 'canonical-name) new-value)
+      (when (process? new-value)
+        ;; TODO: service monitoring
+        (monitor-service-process service (process-id new-value)))
+      ($ value new-value)))
+   ;; Stop the service, including services that depend on it.  If the
+   ;; latter fails, continue anyway.  Return `#f' if it could be stopped.
+   ((stop . args)
+    "Stop this service and any service that depends on it.  Return the list of
+names of services that have been stopped (including transitive dependent
+services).
+
+If this service is not running, print a warning and return its canonical name
+in a list."
+    (if ($ self 'stopped?)
+        (let ((name ($ self 'canonical-name)))
+          (local-output (l10n "Service ~a is not running.")
+                        name)
+          (list name))
+        (on (all-of*
+             (fold-services
+              (lambda (other acc)
+                (let-on* ((other-running? (<- other 'running?))
+                          (dependencies (<- other 'requirement))
+                          (required?
+                           (and (find (lambda (dependency)
+                                        (memq dependency ($ self 'provision)))
+                                      dependencies)
+                                #t)))
+                  (if (and other-running? required?)
+                      (append (<- other 'stop) acc)
+                      acc)))
+              '()))
+            (lambda (stopped-dependents)
+              ;; Stop the service itself.
+              (on (stop)
+                  (match-lambda
+                    (#f #f)
+                    ((? procedure? promise-resolver)
+                     (catch #t
+                       (lambda ()
+                         (define stopped?
+                           (parameterize ((current-service self))
+                             (not (apply ($ self 'stop-action)
+                                         ($ self 'running)
+                                         args))))
+                         (<-np promise-resolver 'fulfill stopped?))
+                       (lambda (key . args)
+                         ;; Special case: 'root' may quit.
+                         (and (eq? root-service service)
+                              (eq? key 'quit)
+                              (apply quit args))
+                         (<-np promise-resolver 'break key)
+                         (report-exception 'stop service key args)))))
+                  #:finally
+                  (lambda ()
+                    (when ($ self 'transient?)
+                      (put-message (current-registry-channel)
+                                   `(unregister ,(list service)))
+                      (local-output (l10n "Transient service ~a unregistered.")
+                                    ($ self 'canonical-name)))
+                    ;; Replace the service with its replacement, if it has one.
+                    (let ((replacement ($ self 'replacement)))
+                      (when replacement
+                        (register-services (list replacement)))
+                      (cons (or replacement self) stopped-dependents))))))))
+   ((service-stopped!)                  ;no reply
+    (local-output (l10n "Service ~a is now stopped.")
+                  ($ self 'canonical-name))
+    (<-np ($ stopped-promise-resolver) 'fulfill)
+    (when ($ logger)
+      (put-message ($ logger) 'terminate))
+
+    ($ status 'stopped)
+    ($ changes (update-status-changes 'stopped))
+    ($ value #f) ($ logger #f)
+    ($ respawns '())
+    ($ failures (ring-buffer %max-recorded-startup-failures)))
+   ((handle-termination pid exit-status) ;no reply
+    ;; Handle premature termination of this service's process, possibly by
+    ;; respawning it, unless STATUS is 'stopping' or 'stopped' or PID
+    ;; doesn't match VALUE (which happens with notifications of processes
+    ;; terminated while stopping the service or shortly after).
+    (unless (or (memq ($ status) '(stopping stopped))
+                (not (process? ($ value)))
+                (not (= (process-id ($ value)) pid)))
+      (spawn-fiber
+       (lambda ()
+         (false-if-exception
+          (($ self 'termination-handler)
+           service-record ($ value) exit-status))
+         (when ($ logger)
+           (put-message ($ logger) 'terminate))))
+      ($ status 'stopped)
+      ($ changes (update-status-changes 'stopped))
+      ($ exit-statuses
+         (ring-buffer-insert (cons exit-status (current-time))
+                             exit-statuses))
+      ($ value #f) ($ logger #f)))
+   ((record-process-exit-status pid status)
+    ($ exit-statuses
+       (ring-buffer-insert (cons status (current-time))
+                           ($ exit-statuses))))
+   ((record-respawn-time)               ;no reply
+    ($ respawns (cons (current-time) ($ respawns))))
+   ((replace-if-running new-serivce)
+    (if (eq? ($ status) 'running)
+        (begin
+          (local-output (l10n "Recording replacement for ~a.")
+                        ($ self 'canonical-name))
+          ($ replacement new-service)
+          #t)
+        (begin
+          ($ replacement #f)
+          #f)))
+   ((replacement)
+    ($ replacement))
+   ((register-logger new-logger)        ;no reply
+    (when logger
+      ;; This happens when, for example, the 'start' procedure calls
+      ;; 'fork+exec-command' several times: each call creates a new logger.
+      (local-output
+       (l10n "Registering new logger for ~a.")
+       ($ self 'canonical-name))
+      (put-message ($ logger) 'terminate))
+    ($ logger new-logger))
+   ((terminate)                         ;no reply
+    (if (eq? ($ status) 'stopped)
+        (bcom (lambda _ '*service-stopped*))
+        (local-output
+         (l10n "Attempt to terminate controller of ~a in ~a state!")
+         ($ self 'canonical-name) ($ status))))))
+
+(define (service-control service)       ;internal
   "Return the controlling channel of @var{service}."
   ;; Spawn the controlling fiber lazily, hopefully once Fibers has actually
   ;; been initialized.
@@ -1024,68 +1323,16 @@ found in the service registry."
                running)))))))
 
 (define (required-by? service dependent)
-  "Returns #t if DEPENDENT directly requires SERVICE in order to run.  Returns
-#f otherwise."
-  (and (find (lambda (dependency)
-               (memq dependency (service-provision service)))
-             (service-requirement dependent))
-       #t))
-
-;; Stop the service, including services that depend on it.  If the
-;; latter fails, continue anyway.  Return `#f' if it could be stopped.
+  "Wrapper for @var{service} required-by? message
+
+Type: Service Service -> (Promise Boolean)"
+  (<- service 'required-by? dependent))
+
 (define (stop-service service . args)
-  "Stop @var{service} and any service that depends on it.  Return the list of
-services that have been stopped (including transitive dependent services).
+  "Wrapper for @var{service} stop message
 
-If @var{service} is not running, print a warning and return its canonical name
-in a list."
-  (if (service-stopped? service)
-      (begin
-        (local-output (l10n "Service ~a is not running.")
-                      (service-canonical-name service))
-        (list service))
-      (let ((stopped-dependents
-             (fold-services (lambda (other acc)
-                              (if (and (service-running? other)
-                                       (required-by? service other))
-                                  (append (stop-service other) acc)
-                                  acc))
-                            '())))
-        ;; Stop the service itself.
-        (let ((reply (make-channel)))
-          (put-message (service-control service) `(stop ,reply))
-          (match (get-message reply)
-            (#f
-             #f)
-            ((? channel? notification)
-             (catch #t
-               (lambda ()
-                 (define stopped?
-                   (parameterize ((current-service service))
-                     (not (apply (service-stop service)
-                                 (service-running-value service)
-                                 args))))
-                 (put-message notification stopped?))
-               (lambda (key . args)
-                 ;; Special case: 'root' may quit.
-                 (and (eq? root-service service)
-                      (eq? key 'quit)
-                      (apply quit args))
-                 (put-message notification #f)
-                 (report-exception 'stop service key args))))))
-
-        (when (transient-service? service)
-          (put-message (current-registry-channel)
-                       `(unregister ,(list service)))
-          (local-output (l10n "Transient service ~a unregistered.")
-                        (service-canonical-name service)))
-
-        ;; Replace the service with its replacement, if it has one.
-        (let ((replacement (service-replacement service)))
-          (when replacement
-            (register-services (list replacement)))
-
-          (cons (or replacement service) stopped-dependents)))))
+Type: Service . Any -> (Promise (Listof String))"
+  (<- service 'stop args))
 
 (define (perform-service-action service the-action . args)
   "Perform @var{the-action} (a symbol such as @code{'restart} or 
@code{'status})
@@ -1150,7 +1397,7 @@ the action."
              (apply quit args))
 
         ;; Re-throw service errors that the caller will handle.
-        (cond ((and (eq? key '%exception)         ;Guile 3.x
+        (cond ((and (eq? key '%exception) ;Guile 3.x
                     (service-error? (car args)))
                (raise-exception (car args)))
               (else



reply via email to

[Prev in Thread] Current Thread [Next in Thread]