[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[shepherd] 15/15: WIP: shepherd: Port core service actor.
From: |
Juliana Sims |
Subject: |
[shepherd] 15/15: WIP: shepherd: Port core service actor. |
Date: |
Tue, 26 Nov 2024 13:27:20 -0500 (EST) |
juli pushed a commit to branch wip-goblinsify
in repository shepherd.
commit 2739fea85d63049f1f4c304e93770a897288a6c4
Author: Juliana Sims <juli@incana.org>
AuthorDate: Tue Nov 26 13:10:56 2024 -0500
WIP: shepherd: Port core service actor.
This is a living commit and will change over time. All "final" commits will
be atomic.
Summary of major architectural changes:
- wrap the <service> record in a Goblins actor
- remove the service controller, moving its messages into the service actor
- move stop logic inside the service actor
* modules/shepherd/service.scm: Port the core service actor.
---
modules/shepherd/service.scm | 379 +++++++++++++++++++++++++++++++++++--------
1 file changed, 313 insertions(+), 66 deletions(-)
diff --git a/modules/shepherd/service.scm b/modules/shepherd/service.scm
index 13ca230..bb35a06 100644
--- a/modules/shepherd/service.scm
+++ b/modules/shepherd/service.scm
@@ -32,6 +32,9 @@
#:use-module (fibers conditions)
#:use-module (fibers scheduler)
#:use-module (fibers timers)
+ #:use-module (goblins)
+ #:use-module (goblins actor-lib cell)
+ #:use-module (goblins actor-lib methods)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
@@ -175,6 +178,11 @@
get-message* ;XXX: for lack of a better place
essential-task-thunk))
+(define-syntax-rule (if-on pred b1 b2)
+ (on pred (lambda (true?) (if true? b1 b2)) #:promise? #t))
+
+(define-syntax-rule (match-on expr body body* ...)
+ (on expr (match-lambda body body* ...) #:promise? #t))
(define sleep (@ (fibers) sleep))
@@ -390,15 +398,306 @@ Log abnormal termination reported by @var{status}."
denoting what the service provides."
(match provision
(((? symbol?) ..1)
- (make-service provision requirement one-shot? transient?
- respawn? respawn-limit respawn-delay
- start stop actions termination-handler
- documentation #f))
+ (selfish-spawn ^service
+ (make-service provision requirement one-shot? transient?
+ respawn? respawn-limit respawn-delay
+ start stop actions termination-handler
+ documentation #f)))
(_
(raise (condition
(&message (message "invalid service provision list")))))))
-(define (service-control service) ;internal
+(define-actor (^service _bcom self service-record)
+ "Constructor for Goblins actor representing a service"
+ #:frozen
+ (define (pid? obj)
+ "Return #t if @var{obj} looks like a PID"
+ (and (integer? obj) (exact? obj) (> obj 1)))
+
+ (define (update-status-changes status)
+ "Add @var{status} to @var{changes}, the ring buffer of status changes"
+ (ring-buffer-insert (cons status (current-time))
+ ($ changes)))
+ (define (stop)
+ "Attempt to stop this service, blocking if it is already being stopped.
+Return a promise resolving to @code{#f} if this service was already running or
+being stopped; otherwise return a promise resolving to a promise resolver to be
+fulfilled when this service stops."
+ (match ($ status)
+ ('stopping
+ ;; We are being stopped; wait until we are stopped then return #f.
+ (on ($ stopped-promise)
+ (lambda _
+ (let-values ((promise resolver) (spawn-promise-values))
+ ($ stopped-promise promise)
+ ($ stopped-promise-resolver resolver))
+ #f)
+ #:promise? #t))
+ ('stopped #f)
+ ('starting
+ ;; We are being started; wait until we are started, then try to stop.
+ (local-output (l10n "Waiting for ~a to start...")
+ ($ self 'canonical-name))
+ (on ($ started-promise)
+ (lambda _
+ (let-values ((promise resolver) (spawn-promise-values))
+ ($ started-promise promise)
+ ($ started-promise-resolver resolver))
+ (stop))
+ #:promise? #t))
+ ('running
+ ;; Stop this service.
+ (let-values (((notification-promise notification-promise-resolver)
+ (spawn-promise-values)))
+ (on notification-promise
+ (lambda (stopped?)
+ ;; The STOPPED? boolean is supposed to indicate success
+ ;; or failure, but sometimes 'stop' method might return a
+ ;; truth value even though the service was successfully
+ ;; stopped, hence "might have failed" below.
+ (if stopped?
+ (local-output (l10n "Service ~a stopped.")
+ ($ self 'canonical-name))
+ (local-output
+ (l10n "Service ~a might have failed to stop.")
+ ($ self 'canonical-name)))
+ (<-np self 'service-stopped!)))
+ (local-output (l10n "Stopping service ~a...")
+ ($ self 'canonical-name))
+ ($ status 'stopping)
+ ($ changes (update-status-changes 'stopping))
+ notification-promise-resolver))))
+
+ (define-values (started-promise started-promise-resolver)
+ (let-values ((promise resolver)
+ (spawn-promise-values))
+ (values
+ (spawn ^cell promise)
+ (spawn ^cell resolver))))
+ (define-values (stopped-promise stopped-promise-resolver)
+ (let-values ((promise resolver)
+ (spawn-promise-values))
+ (values
+ (spawn ^cell promise)
+ (spawn ^cell resolver))))
+ (define-cell status 'stopped)
+ (define-cell value #f)
+ (define-cell enabled? #t)
+ (define-cell changes ;list of status/timestamp pairs
+ (ring-buffer %max-recorded-startup-changes))
+ (define-cell failures ; list of timestamps
+ (ring-buffer %max-recorded-startup-failures))
+ (define-cell respawns '()) ; list of timestamps
+ (define-cell exit-statuses
+ (ring-buffer %max-recorded-exist-statuses))
+ (define-cell replacement #f)
+ ;; TODO: port to Goblins
+ (define-cell logger #f) ;logger actor
+ (define-cell log-file)
+ (methods
+ ;; Immutable state
+ ((canonical-name) (car (service-provision service-record)))
+ ((provision) (service-provision service-record))
+ ((requirement) (service-requirement service-record))
+ ((one-shot?) (one-shot-service? service-record))
+ ((transient?) (transient-service service-record))
+ ((respawn?) (respawn-service? service-record))
+ ((respawn-limit) (service-respawn-limit service-record))
+ ((respawn-delay) (service-respawn-delay service-record))
+ ((start-action) (service-start service-record))
+ ((stop-action) (service-stop service-record))
+ ((actions) (service-actions service-record))
+ ((termination-handler) (service-termination-handler service-record))
+ ((documentation) (service-documentation service-record))
+ ((%control) (%service-control service-record))
+ ((%control new-control) (set-service-control! service-record new-control))
+
+ ((running) ($ value))
+ ((status) ($ status))
+ ((enabled?) ($ enabled?))
+ ((respawn-times) ($ respawns))
+ ((startup-failures) ($ failures))
+ ((status-changes) ($ changes))
+ ((exit-statuses) ($ exit-statuses))
+
+ ((enable) ($ enabled? #t)) ;no reply
+ ((disable) ($ enabled? #f)) ;no reply
+
+ ((stopped?) (eq? 'stopped ($ status)))
+ ((running?) (not ($ self 'stopped?)))
+
+ ((start)
+ ...)
+ ((service-started! new-value) ;no reply
+ ;; When NEW-VALUE is a procedure, call it to get the actual value and
+ ;; pass it a callback so it can eventually change it.
+ (let ((new-value (cond ((procedure? new-value)
+ (new-value
+ (lambda (value)
+ (<- self 'change-value! value))))
+ ((pid? new-value) ;backward compatibility
+ (pid->process new-value))
+ (else new-value))))
+ (on new-value
+ (lambda (new-value)
+ (when new-value
+ (local-output (l10n "Service ~a running with value ~s.")
+ ($ self 'canonical-name) new-value))
+ (when (process? new-value)
+ ;; TODO: service monitoring
+ (monitor-service-process service (process-id new-value)))
+
+ (<-np ($ started-promise-resolver) 'fulfill)
+ (let ((new-status (if (and new-value
+ (not ($ self 'one-shot?)))
+ 'running
+ 'stopped)))
+ ($ status new-status)
+ ($ value (and (eq? new-value 'running) new-value))
+ ($ changes (update-status-changes new-status))
+ ($ failures (if new-value
+ failures
+ (ring-buffer-insert (current-time)
+ ($ failures)))))))))
+ ((change-value! new-value)
+ (let ((new-value (if (pid? new-value) ;backward compatibility
+ (pid->process new-value)
+ new-value)))
+ (local-output (l10n "Running value of service ~a changed to ~s.")
+ ($ self 'canonical-name) new-value)
+ (when (process? new-value)
+ ;; TODO: service monitoring
+ (monitor-service-process service (process-id new-value)))
+ ($ value new-value)))
+ ;; Stop the service, including services that depend on it. If the
+ ;; latter fails, continue anyway. Return `#f' if it could be stopped.
+ ((stop . args)
+ "Stop this service and any service that depends on it. Return the list of
+names of services that have been stopped (including transitive dependent
+services).
+
+If this service is not running, print a warning and return its canonical name
+in a list."
+ (if ($ self 'stopped?)
+ (let ((name ($ self 'canonical-name)))
+ (local-output (l10n "Service ~a is not running.")
+ name)
+ (list name))
+ (on (all-of*
+ (fold-services
+ (lambda (other acc)
+ (let-on* ((other-running? (<- other 'running?))
+ (dependencies (<- other 'requirement))
+ (required?
+ (and (find (lambda (dependency)
+ (memq dependency ($ self 'provision)))
+ dependencies)
+ #t)))
+ (if (and other-running? required?)
+ (append (<- other 'stop) acc)
+ acc)))
+ '()))
+ (lambda (stopped-dependents)
+ ;; Stop the service itself.
+ (on (stop)
+ (match-lambda
+ (#f #f)
+ ((? procedure? promise-resolver)
+ (catch #t
+ (lambda ()
+ (define stopped?
+ (parameterize ((current-service self))
+ (not (apply ($ self 'stop-action)
+ ($ self 'running)
+ args))))
+ (<-np promise-resolver 'fulfill stopped?))
+ (lambda (key . args)
+ ;; Special case: 'root' may quit.
+ (and (eq? root-service service)
+ (eq? key 'quit)
+ (apply quit args))
+ (<-np promise-resolver 'break key)
+ (report-exception 'stop service key args)))))
+ #:finally
+ (lambda ()
+ (when ($ self 'transient?)
+ (put-message (current-registry-channel)
+ `(unregister ,(list service)))
+ (local-output (l10n "Transient service ~a unregistered.")
+ ($ self 'canonical-name)))
+ ;; Replace the service with its replacement, if it has one.
+ (let ((replacement ($ self 'replacement)))
+ (when replacement
+ (register-services (list replacement)))
+ (cons (or replacement self) stopped-dependents))))))))
+ ((service-stopped!) ;no reply
+ (local-output (l10n "Service ~a is now stopped.")
+ ($ self 'canonical-name))
+ (<-np ($ stopped-promise-resolver) 'fulfill)
+ (when ($ logger)
+ (put-message ($ logger) 'terminate))
+
+ ($ status 'stopped)
+ ($ changes (update-status-changes 'stopped))
+ ($ value #f) ($ logger #f)
+ ($ respawns '())
+ ($ failures (ring-buffer %max-recorded-startup-failures)))
+ ((handle-termination pid exit-status) ;no reply
+ ;; Handle premature termination of this service's process, possibly by
+ ;; respawning it, unless STATUS is 'stopping' or 'stopped' or PID
+ ;; doesn't match VALUE (which happens with notifications of processes
+ ;; terminated while stopping the service or shortly after).
+ (unless (or (memq ($ status) '(stopping stopped))
+ (not (process? ($ value)))
+ (not (= (process-id ($ value)) pid)))
+ (spawn-fiber
+ (lambda ()
+ (false-if-exception
+ (($ self 'termination-handler)
+ service-record ($ value) exit-status))
+ (when ($ logger)
+ (put-message ($ logger) 'terminate))))
+ ($ status 'stopped)
+ ($ changes (update-status-changes 'stopped))
+ ($ exit-statuses
+ (ring-buffer-insert (cons exit-status (current-time))
+ exit-statuses))
+ ($ value #f) ($ logger #f)))
+ ((record-process-exit-status pid status)
+ ($ exit-statuses
+ (ring-buffer-insert (cons status (current-time))
+ ($ exit-statuses))))
+ ((record-respawn-time) ;no reply
+ ($ respawns (cons (current-time) ($ respawns))))
+ ((replace-if-running new-serivce)
+ (if (eq? ($ status) 'running)
+ (begin
+ (local-output (l10n "Recording replacement for ~a.")
+ ($ self 'canonical-name))
+ ($ replacement new-service)
+ #t)
+ (begin
+ ($ replacement #f)
+ #f)))
+ ((replacement)
+ ($ replacement))
+ ((register-logger new-logger) ;no reply
+ (when logger
+ ;; This happens when, for example, the 'start' procedure calls
+ ;; 'fork+exec-command' several times: each call creates a new logger.
+ (local-output
+ (l10n "Registering new logger for ~a.")
+ ($ self 'canonical-name))
+ (put-message ($ logger) 'terminate))
+ ($ logger new-logger))
+ ((terminate) ;no reply
+ (if (eq? ($ status) 'stopped)
+ (bcom (lambda _ '*service-stopped*))
+ (local-output
+ (l10n "Attempt to terminate controller of ~a in ~a state!")
+ ($ self 'canonical-name) ($ status))))))
+
+(define (service-control service) ;internal
"Return the controlling channel of @var{service}."
;; Spawn the controlling fiber lazily, hopefully once Fibers has actually
;; been initialized.
@@ -1024,68 +1323,16 @@ found in the service registry."
running)))))))
(define (required-by? service dependent)
- "Returns #t if DEPENDENT directly requires SERVICE in order to run. Returns
-#f otherwise."
- (and (find (lambda (dependency)
- (memq dependency (service-provision service)))
- (service-requirement dependent))
- #t))
-
-;; Stop the service, including services that depend on it. If the
-;; latter fails, continue anyway. Return `#f' if it could be stopped.
+ "Wrapper for @var{service} required-by? message
+
+Type: Service Service -> (Promise Boolean)"
+ (<- service 'required-by? dependent))
+
(define (stop-service service . args)
- "Stop @var{service} and any service that depends on it. Return the list of
-services that have been stopped (including transitive dependent services).
+ "Wrapper for @var{service} stop message
-If @var{service} is not running, print a warning and return its canonical name
-in a list."
- (if (service-stopped? service)
- (begin
- (local-output (l10n "Service ~a is not running.")
- (service-canonical-name service))
- (list service))
- (let ((stopped-dependents
- (fold-services (lambda (other acc)
- (if (and (service-running? other)
- (required-by? service other))
- (append (stop-service other) acc)
- acc))
- '())))
- ;; Stop the service itself.
- (let ((reply (make-channel)))
- (put-message (service-control service) `(stop ,reply))
- (match (get-message reply)
- (#f
- #f)
- ((? channel? notification)
- (catch #t
- (lambda ()
- (define stopped?
- (parameterize ((current-service service))
- (not (apply (service-stop service)
- (service-running-value service)
- args))))
- (put-message notification stopped?))
- (lambda (key . args)
- ;; Special case: 'root' may quit.
- (and (eq? root-service service)
- (eq? key 'quit)
- (apply quit args))
- (put-message notification #f)
- (report-exception 'stop service key args))))))
-
- (when (transient-service? service)
- (put-message (current-registry-channel)
- `(unregister ,(list service)))
- (local-output (l10n "Transient service ~a unregistered.")
- (service-canonical-name service)))
-
- ;; Replace the service with its replacement, if it has one.
- (let ((replacement (service-replacement service)))
- (when replacement
- (register-services (list replacement)))
-
- (cons (or replacement service) stopped-dependents)))))
+Type: Service . Any -> (Promise (Listof String))"
+ (<- service 'stop args))
(define (perform-service-action service the-action . args)
"Perform @var{the-action} (a symbol such as @code{'restart} or
@code{'status})
@@ -1150,7 +1397,7 @@ the action."
(apply quit args))
;; Re-throw service errors that the caller will handle.
- (cond ((and (eq? key '%exception) ;Guile 3.x
+ (cond ((and (eq? key '%exception) ;Guile 3.x
(service-error? (car args)))
(raise-exception (car args)))
(else
- [shepherd] 13/15: Incorporate Spritely feedback into design doc, (continued)
- [shepherd] 13/15: Incorporate Spritely feedback into design doc, Juliana Sims, 2024/11/26
- [shepherd] 07/15: scratch: First pass at service startup code., Juliana Sims, 2024/11/26
- [shepherd] 04/15: scratch: Begin prototyping process monitoring., Juliana Sims, 2024/11/26
- [shepherd] 05/15: scratch: Stub out timeout support., Juliana Sims, 2024/11/26
- [shepherd] 06/15: scratch: Cleanup comments somewhat., Juliana Sims, 2024/11/26
- [shepherd] 08/15: goblins port manifest: Update dependency commits, fix inputs., Juliana Sims, 2024/11/26
- [shepherd] 11/15: Update design doc., Juliana Sims, 2024/11/26
- [shepherd] 09/15: scratch: Return demo to working state., Juliana Sims, 2024/11/26
- [shepherd] 12/15: Incorporate more feedback into design doc, Juliana Sims, 2024/11/26
- [shepherd] 14/15: dir-locals: Add indentation for Goblins forms., Juliana Sims, 2024/11/26
- [shepherd] 15/15: WIP: shepherd: Port core service actor.,
Juliana Sims <=