[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
branch main updated: remote: Keep ‘last-seen’ info in memory rather than
From: |
Ludovic Courtès |
Subject: |
branch main updated: remote: Keep ‘last-seen’ info in memory rather than in the database. |
Date: |
Tue, 09 Jul 2024 06:09:58 -0400 |
This is an automated email from the git hooks/post-receive script.
civodul pushed a commit to branch main
in repository guix-cuirass.
The following commit(s) were added to refs/heads/main by this push:
new e9f83e4 remote: Keep ‘last-seen’ info in memory rather than in the
database.
e9f83e4 is described below
commit e9f83e43f066cdc8bb4bec6ba221ade4ef7cab7b
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Tue Jul 9 11:32:35 2024 +0200
remote: Keep ‘last-seen’ info in memory rather than in the database.
Previously, the ‘last_seen’ bit was kept in the database: for each and
every ping received, ‘remote-server’ would run a SQL query to update it.
The overhead could slow down processing of incoming messages in
‘remote-server’, especially with large numbers of workers.
Suggested by Chris Baines <guix@cbaines.net>.
* src/cuirass/scripts/remote-server.scm (worker-directory)
(spawn-worker-directory): New procedures.
(serve-build-requests): Add ‘worker-directory’ parameter.
[update-worker!]: Rename to…
[update-worker]: … this. Rewrite to send a message to WORKER-DIRECTORY.
Remove fiber that periodically calls ‘db-remove-unresponsive-workers’.
(cuirass-remote-server): Call ‘spawn-worker-directory’ and update
‘serve-build-requests’ call.
* src/cuirass/remote.scm (<worker>)[last-seen]: Remove.
(worker->sexp): Remove reference to ‘worker-last-seen’.
(sexp->worker): Remove ‘last-seen’ field.
* src/cuirass/database.scm (db-add-or-update-worker): Set ‘last_seen’ to
zero.
(db-get-worker): Remove reference to ‘last_seen’.
(db-get-workers): Likewise.
(db-remove-unresponsive-workers): Rename to…
(db-remove-workers): … this. Adjust SQL queries accordingly.
* src/cuirass/templates.scm (machine-status): Remove “Last seen” column.
* src/schema.sql: Mark ‘last_seen’ as unused.
* tests/database.scm (%dummy-worker): Remove ‘last-seen’ field.
("db-remove-unresponsive-workers"): Rename to…
("db-remove-workers"): … this. Adjust accordingly.
* tests/remote.scm ("no workers", "one worker"): New tests.
---
src/cuirass/database.scm | 49 +++++++++--------
src/cuirass/remote.scm | 15 ++----
src/cuirass/scripts/remote-server.scm | 99 ++++++++++++++++++++++++++---------
src/cuirass/templates.scm | 7 +--
src/schema.sql | 2 +-
tests/database.scm | 9 ++--
tests/remote.scm | 17 ++++++
7 files changed, 127 insertions(+), 71 deletions(-)
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index 686ae1c..5d5aad7 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -216,7 +216,7 @@
db-get-worker
db-get-workers
db-worker-current-builds
- db-remove-unresponsive-workers
+ db-remove-workers
db-clear-workers
db-clear-build-queue
db-get-log-from-output
@@ -2170,10 +2170,10 @@ VALUES ("
(worker-name worker) ", "
(worker-address worker) ", "
(worker-machine worker) ", "
- (string-join (worker-systems worker) ",") ", "
- (worker-last-seen worker) ")
+ (string-join (worker-systems worker) ",")
+ ", 0)
ON CONFLICT(name) DO UPDATE
-SET last_seen = " (worker-last-seen worker) ";"))
+SET last_seen = 0;"))
#:threshold 2))
(define (db-get-worker name)
@@ -2181,34 +2181,32 @@ SET last_seen = " (worker-last-seen worker) ";"))
(with-db-connection db
(match (expect-one-row
(exec-query/bind db "
-SELECT name, address, machine, systems, last_seen from Workers
+SELECT name, address, machine, systems FROM Workers
WHERE name = " name ";"))
- ((name address machine systems last-seen)
+ ((name address machine systems)
(worker
(name name)
(address address)
(machine machine)
- (systems (string-split systems #\,))
- (last-seen (string->number last-seen))))
- (else #f))))
+ (systems (string-split systems #\,))))
+ (_ #f))))
(define (db-get-workers)
"Return the workers in Workers table."
(with-db-connection db
(let loop ((rows (exec-query db "
-SELECT name, address, machine, systems, last_seen from Workers"))
+SELECT name, address, machine, systems FROM Workers"))
(workers '()))
(match rows
(() (reverse workers))
- (((name address machine systems last-seen)
+ (((name address machine systems)
. rest)
(loop rest
(cons (worker
(name name)
(address address)
(machine machine)
- (systems (string-split systems #\,))
- (last-seen (string->number last-seen)))
+ (systems (string-split systems #\,)))
workers)))))))
(define (db-worker-current-builds)
@@ -2233,18 +2231,19 @@ Builds.starttime DESC, Builds.id DESC;"))
;; switched back to "scheduled".
(* 30 60))
-(define (db-remove-unresponsive-workers timeout)
- "Remove the workers that are unresponsive since at least TIMEOUT seconds.
-Also restart the builds that are started on those workers."
+(define (db-remove-workers names)
+ "Remove workers with any of the given NAMES.
+Also restart the builds that were started on those workers."
+ (define name-array
+ (string-append "{"
+ (string-join (map object->string names) ",")
+ "}"))
+
(with-db-connection db
;; Restart the builds that are marked as started on those workers.
- (let ((restarted (exec-query/bind db "
-UPDATE Builds SET status = -2, worker = null FROM
-(SELECT id FROM Workers LEFT JOIN Builds
-ON builds.worker = workers.name
-WHERE status = -1 AND
-(extract(epoch from now())::int - last_seen) > " timeout
-") AS expired WHERE builds.id = expired.id")))
+ (let* ((restarted (exec-query/bind db "\
+UPDATE Builds SET status = -2, worker = null
+WHERE worker = ANY(" name-array ");")))
(unless (zero? restarted)
(log-info "restarted ~a builds that were on unresponsive workers"
restarted)))
@@ -2258,8 +2257,8 @@ WHERE status = " (build-status submitted) " AND
(log-info "rescheduled ~a builds that were submitted more than ~as ago"
rescheduled %build-submission-timeout)))
- (let ((removed (exec-query/bind db "DELETE FROM Workers WHERE
-(extract(epoch from now())::int - last_seen) > " timeout ";")))
+ (let ((removed (exec-query/bind db "DELETE FROM Workers
+WHERE name = ANY(" name-array ");")))
(unless (zero? removed)
(log-info "removed ~a unresponsive workers" removed)))))
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 1f7b0ba..52efbd9 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -55,7 +55,6 @@
worker-machine
worker-publish-url
worker-systems
- worker-last-seen
worker->sexp
sexp->worker
generate-worker-name
@@ -114,23 +113,20 @@
(machine worker-machine)
(publish-url worker-publish-url
(default #f))
- (systems worker-systems)
- (last-seen worker-last-seen
- (default 0)))
+ (systems worker-systems))
(define (worker->sexp worker)
"Return an sexp describing WORKER."
(let ((name (worker-name worker))
(address (worker-address worker))
(machine (worker-machine worker))
- (systems (worker-systems worker))
- (last-seen (worker-last-seen worker)))
+ (systems (worker-systems worker)))
`(worker
(name ,name)
(address ,address)
(machine ,machine)
(systems ,systems)
- (last-seen ,last-seen))))
+ (last-seen 0)))) ;unused
(define (sexp->worker sexp)
"Turn SEXP, an sexp as returned by 'worker->sexp', into a <worker> record."
@@ -139,13 +135,12 @@
('address address)
('machine machine)
('systems systems)
- ('last-seen last-seen))
+ _ ...)
(worker
(name name)
(address address)
(machine machine)
- (systems systems)
- (last-seen last-seen)))))
+ (systems systems)))))
(define (generate-worker-name)
"Return the service name of the server."
diff --git a/src/cuirass/scripts/remote-server.scm
b/src/cuirass/scripts/remote-server.scm
index 4bf3da9..341362c 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -377,6 +377,65 @@ requested received on its channel."
(loop)))))
+;;;
+;;; Keeping track of active workers.
+;;;
+
+(define* (worker-directory channel
+ #:optional (timeout (%worker-timeout)))
+ (lambda ()
+ ;; This directory accesses the 'Workers' database table only when strictly
+ ;; necessary (when adding or removing workers) to reduce the overhead when
+ ;; processing pings sent by workers.
+
+ (define (stale?)
+ (let ((now (current-time)))
+ (match-lambda
+ ((name . last-seen)
+ (> (- now last-seen) timeout)))))
+
+ (define (cleanup workers)
+ (let ((gone alive (partition (stale?) workers)))
+ (log-debug "~a workers are up and running" (length alive))
+ (match gone
+ (() #t)
+ (((names . _) ...)
+ (log-info "removing ~a unresponsive workers:~{ ~a~}"
+ (length names) names)
+ (db-remove-workers names)))
+ alive))
+
+ (log-info "starting worker directory with ~as timeout"
+ timeout)
+ (let loop ((workers (map (lambda (worker)
+ (cons (worker-name worker) 0))
+ (db-get-workers))))
+ (match (get-message* channel (quotient timeout 2)
+ 'timeout)
+ ('timeout
+ (loop (cleanup workers)))
+ (`(ping ,worker ,properties)
+ (let ((name (worker-name worker)))
+ (log-debug "worker ~a is up and running (properties: ~s)"
+ name properties)
+ (match (assoc name workers)
+ (#f
+ (db-add-or-update-worker worker)
+ (loop (alist-cons name (current-time)
+ (cleanup workers))))
+ (name+time
+ (set-cdr! name+time (current-time))
+ (loop (cleanup workers))))))))))
+
+(define* (spawn-worker-directory #:optional (timeout (%worker-timeout)))
+ "Spawn an actor that acts as a directory of running workers. The directory
+keeps track of all the running workers and the last time they were seen; it
+removes those that have not been seen since more than TIMEOUT seconds."
+ (let ((channel (make-channel)))
+ (spawn-fiber (worker-directory channel timeout))
+ channel))
+
+
;;;
;;; ZMQ connection.
;;;
@@ -389,24 +448,19 @@ requested received on its channel."
all network interfaces."
(string-append "tcp://*:" (number->string backend-port)))
-(define (serve-build-requests backend-port fetch-worker)
+(define (serve-build-requests backend-port fetch-worker worker-directory)
"Open a zmq socket on BACKEND-PORT and listen for messages coming from
'cuirass remote-worker' processes, and reply to 'worker-request-work'
messages: this is a \"work stealing\" strategy.
When a message denoting a successful build is received, pass it on to
-FETCH-WORKER to download the build's output(s)."
- (define (update-worker! base-worker properties)
- ;; Mark BASE-WORKER as alive in the database. Do it in a separate fiber
- ;; so the main 'receive-message' loop is reactive.
- (spawn-fiber
- (lambda ()
- (let* ((worker* (worker
- (inherit (sexp->worker base-worker))
- (last-seen (current-time)))))
- (log-debug (G_ "worker ~a is up and running (properties: ~s)")
- (worker-name worker*) properties)
- (db-add-or-update-worker worker*)))))
+FETCH-WORKER to download the build's output(s).
+
+Use WORKER-DIRECTORY to maintain the list of active workers."
+ (define (update-worker worker properties)
+ ;; Update the directory to mark WORKER as alive.
+ (put-message worker-directory
+ `(ping ,(sexp->worker worker) ,properties)))
(let ((build-socket (zmq-create-socket %zmq-context ZMQ_ROUTER)))
@@ -420,14 +474,6 @@ FETCH-WORKER to download the build's output(s)."
(zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
- (spawn-fiber
- (lambda ()
- (let loop ()
- (sleep (quotient (%worker-timeout) 2))
- (log-debug (G_ "updating list of live workers"))
- (db-remove-unresponsive-workers (%worker-timeout))
- (loop))))
-
;; Do not use the built-in zmq-proxy as we want to edit the envelope of
;; frontend messages before forwarding them to the backend.
(let loop ()
@@ -452,7 +498,7 @@ FETCH-WORKER to download the build's output(s)."
(put-message fetch-worker command)
#t)
(('worker-ready worker properties ...)
- (update-worker! worker properties))
+ (update-worker worker properties))
(`(worker-request-info)
(catch 'zmq-error
(lambda ()
@@ -507,7 +553,7 @@ FETCH-WORKER to download the build's output(s)."
(reply-worker (no-build-message)))
(const #f)))))))
(('worker-ping worker properties ...)
- (update-worker! worker properties))
+ (update-worker worker properties))
(`(build-started (drv ,drv) (worker ,name))
(let ((log-file (log-path (%cache-directory) drv))
(worker (db-get-worker name)))
@@ -711,10 +757,13 @@ exiting."
(spawn-periodic-updates-fiber)
(spawn-build-log-cleaner (assoc-ref opts 'build-log-expiry))
- (let ((fetch-worker (spawn-fetch-worker)))
+ (let ((fetch-worker (spawn-fetch-worker))
+ (worker-directory (spawn-worker-directory)))
(catch 'zmq-error
(lambda ()
- (serve-build-requests backend-port fetch-worker))
+ (serve-build-requests backend-port
+ fetch-worker
+ worker-directory))
(lambda (key errno message . _)
(log-error (G_ "ZeroMQ error in build server: ~a")
message)
diff --git a/src/cuirass/templates.scm b/src/cuirass/templates.scm
index e2151f2..b1a0268 100644
--- a/src/cuirass/templates.scm
+++ b/src/cuirass/templates.scm
@@ -2167,8 +2167,7 @@ text-dark d-flex position-absolute w-100"))
(tr
(th (@ (scope "col")) "Name")
(th (@ (scope "col")) "Systems")
- (th (@ (scope "col")) "Building")
- (th (@ (scope "col")) "Last seen")))
+ (th (@ (scope "col")) "Building")))
(tbody
,@(map
(lambda (worker build)
@@ -2183,9 +2182,7 @@ text-dark d-flex position-absolute w-100"))
(href "/build/"
,(build-id build)
"/details"))
- ,(build-job-name build)))))
- (td ,(time->string
- (worker-last-seen worker)))))
+ ,(build-job-name build)))))))
workers builds)))))
,@(if (null? info)
'((div (@ (class "alert alert-danger"))
diff --git a/src/schema.sql b/src/schema.sql
index 4b52daa..e98eea3 100644
--- a/src/schema.sql
+++ b/src/schema.sql
@@ -117,7 +117,7 @@ CREATE TABLE Workers (
address TEXT NOT NULL,
machine TEXT NOT NULL,
systems TEXT NOT NULL,
- last_seen INTEGER NOT NULL
+ last_seen INTEGER NOT NULL -- no longer used
);
CREATE TABLE Dashboards (
diff --git a/tests/database.scm b/tests/database.scm
index 88faafe..2f20cf5 100644
--- a/tests/database.scm
+++ b/tests/database.scm
@@ -118,8 +118,7 @@
(name "worker")
(address "address")
(machine "machine")
- (systems '("a" "b"))
- (last-seen 1)))
+ (systems '("a" "b"))))
(define-syntax-rule (with-fibers exp ...)
"Evaluate EXP... in a Fiber context with a database connection pool."
@@ -584,13 +583,13 @@ timestamp, checkouttime, evaltime) VALUES ('guix', 0, 0,
0, 0);")
(with-fibers
(db-get-workers)))
- (test-assert "db-remove-unresponsive-workers"
+ (test-assert "db-remove-workers"
(with-fibers
(let ((drv "/foo.drv"))
(db-update-build-worker! drv "worker")
(db-update-build-status! drv (build-status started))
- (db-remove-unresponsive-workers 50)
- (and (eq? (db-get-workers) '())
+ (db-remove-workers '("worker"))
+ (and (null? (db-get-workers))
(let* ((build (db-get-build drv))
(worker (build-worker build))
(status (build-current-status build)))
diff --git a/tests/remote.scm b/tests/remote.scm
index 104349b..a284310 100644
--- a/tests/remote.scm
+++ b/tests/remote.scm
@@ -23,6 +23,7 @@
'make-output
symbol)))
(cuirass specification)
+ ((cuirass remote) #:select (worker-systems))
(gnu packages base)
(guix build utils)
(guix channels)
@@ -31,6 +32,7 @@
(guix monads)
(guix packages)
((guix store) #:hide (build))
+ ((guix utils) #:select (%current-system))
(tests common)
(avahi)
(avahi client)
@@ -196,11 +198,26 @@
#vu8(4 5 6 7 8)))
(zmq-close-socket socket)))
+ (test-equal "no workers" ;initially no workers
+ '()
+ (db-get-workers))
+
(test-assert "remote-worker"
(begin
(start-worker)
#t))
+ (test-equal "one worker" ;the new worker should show up
+ (list (list (%current-system)))
+ (let loop ((i 0))
+ (or (> i 9)
+ (match (db-get-workers)
+ (() ;not ready yet?
+ (sleep 1)
+ (loop (+ i 1)))
+ (lst
+ (map worker-systems lst))))))
+
(test-assert "build done"
(retry
(lambda ()
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- branch main updated: remote: Keep ‘last-seen’ info in memory rather than in the database.,
Ludovic Courtès <=