guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

02/20: Increase parallelism when loading revisions


From: Christopher Baines
Subject: 02/20: Increase parallelism when loading revisions
Date: Mon, 4 Nov 2024 12:27:26 -0500 (EST)

cbaines pushed a commit to branch master
in repository data-service.

commit b6551842d10e0a36cc9dd38d8beabe37b89459c7
Author: Christopher Baines <mail@cbaines.net>
AuthorDate: Thu Oct 17 17:10:25 2024 +0200

    Increase parallelism when loading revisions
---
 guix-data-service/jobs/load-new-guix-revision.scm | 442 ++++++++++++----------
 1 file changed, 240 insertions(+), 202 deletions(-)

diff --git a/guix-data-service/jobs/load-new-guix-revision.scm 
b/guix-data-service/jobs/load-new-guix-revision.scm
index f4968de..f52b8e1 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -107,7 +107,7 @@
   missing-store-item-error?
   (item missing-store-item-error-item))
 
-(define (retry-on-missing-store-item thunk)
+(define* (retry-on-missing-store-item thunk #:key on-exception)
   (with-exception-handler
       (lambda (exn)
         (if (missing-store-item-error? exn)
@@ -116,6 +116,7 @@
                              "missing store item ~A, retrying ~A\n"
                              (missing-store-item-error-item exn)
                              thunk)
+              (when on-exception (on-exception))
               (retry-on-missing-store-item thunk))
             (raise-exception exn)))
     thunk
@@ -1691,7 +1692,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
 
        inf))))
 
-(define* (extract-information-from db-conn guix-revision-id commit
+(define* (extract-information-from db-conn guix-revision-id-promise
+                                   commit
                                    guix-source store-item
                                    guix-derivation
                                    utility-thread-channel
@@ -1885,9 +1887,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
 
         (let ((package-ids (fibers-force package-ids-promise)))
           (with-resource-from-pool postgresql-connection-pool conn
-            (insert-guix-revision-lint-checkers conn
-                                                guix-revision-id
-                                                lint-checker-ids)
+            (insert-guix-revision-lint-checkers
+             conn
+             (fibers-force guix-revision-id-promise)
+             lint-checker-ids)
 
             (let ((lint-warning-ids
                    (insert-lint-warnings
@@ -1897,9 +1900,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
                     lint-warnings-data)))
               (chunk-for-each!
                (lambda (lint-warning-ids-chunk)
-                 (insert-guix-revision-lint-warnings conn
-                                                     guix-revision-id
-                                                     lint-warning-ids-chunk))
+                 (insert-guix-revision-lint-warnings
+                  conn
+                  (fibers-force guix-revision-id-promise)
+                  lint-warning-ids-chunk))
                5000
                lint-warning-ids)))))))
 
@@ -1913,62 +1917,66 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
 
     (define chunk-size 1000)
 
-    (define (process-system-and-target system target)
+    (define (get-derivations system target)
+      (let ((derivations-vector (make-vector packages-count)))
+        (with-time-logging
+            (simple-format #f "getting derivations for ~A" (cons system 
target))
+          (let loop ((start-index 0))
+            (let* ((count
+                    (if (>= (+ start-index chunk-size) packages-count)
+                        (- packages-count start-index)
+                        chunk-size))
+                   (chunk
+                    (call-with-inferior
+                     (lambda (inferior inferior-store)
+                       (ensure-gds-inferior-packages-defined! inferior)
+
+                       (inferior-package-derivations
+                        inferior-store
+                        inferior
+                        system
+                        target
+                        start-index
+                        count)))))
+              (vector-copy! derivations-vector
+                            start-index
+                            chunk)
+              (unless (>= (+ start-index chunk-size) packages-count)
+                (loop (+ start-index chunk-size))))))
+        derivations-vector))
+
+    (define (process-system-and-target system target get-derivations)
       (with-time-logging
           (simple-format #f "processing derivations for ~A" (cons system 
target))
-        (let ((derivations-vector (make-vector packages-count)))
-          (with-time-logging
-              (simple-format #f "getting derivations for ~A" (cons system 
target))
-            (let loop ((start-index 0))
-              (let* ((count
-                      (if (>= (+ start-index chunk-size) packages-count)
-                          (- packages-count start-index)
-                          chunk-size))
-                     (chunk
-                      (call-with-inferior
-                       (lambda (inferior inferior-store)
-                         (ensure-gds-inferior-packages-defined! inferior)
-
-                         (inferior-package-derivations
-                          inferior-store
-                          inferior
-                          system
-                          target
-                          start-index
-                          count)))))
-                (vector-copy! derivations-vector
-                              start-index
-                              chunk)
-                (unless (>= (+ start-index chunk-size) packages-count)
-                  (loop (+ start-index chunk-size))))))
-
-          (let* ((derivation-ids
+        (let* ((derivations-vector (get-derivations system target))
+               (derivation-ids
+                (with-time-logging
+                    (simple-format #f "derivation-file-names->derivation-ids 
(~A ~A)"
+                                   system target)
+                  (derivation-file-names->derivation-ids/fiberized
+                   derivations-vector)))
+               (guix-revision-id
+                (fibers-force guix-revision-id-promise))
+               (package-ids (fibers-force package-ids-promise))
+               (package-derivation-ids
+                (with-resource-from-pool postgresql-connection-pool conn
                   (with-time-logging
-                      (simple-format #f "derivation-file-names->derivation-ids 
(~A ~A)"
+                      (simple-format #f "insert-package-derivations (~A ~A)"
                                      system target)
-                    (derivation-file-names->derivation-ids/fiberized
-                     derivations-vector))))
-
-            (let* ((package-ids (fibers-force package-ids-promise))
-                   (package-derivation-ids
-                    (with-resource-from-pool postgresql-connection-pool conn
-                      (with-time-logging
-                          (simple-format #f "insert-package-derivations (~A 
~A)"
-                                         system target)
-                        (insert-package-derivations conn
-                                                    system
-                                                    (or target "")
-                                                    package-ids
-                                                    derivation-ids)))))
-              (chunk-for-each!
-               (lambda (package-derivation-ids-chunk)
-                 (with-resource-from-pool postgresql-connection-pool conn
-                   (insert-guix-revision-package-derivations
-                    conn
-                    guix-revision-id
-                    package-derivation-ids-chunk)))
-               2000
-               package-derivation-ids)))))
+                    (insert-package-derivations conn
+                                                system
+                                                (or target "")
+                                                package-ids
+                                                derivation-ids)))))
+          (chunk-for-each!
+           (lambda (package-derivation-ids-chunk)
+             (with-resource-from-pool postgresql-connection-pool conn
+               (insert-guix-revision-package-derivations
+                conn
+                guix-revision-id
+                package-derivation-ids-chunk)))
+           2000
+           package-derivation-ids)))
 
       (with-resource-from-pool postgresql-connection-pool conn
         (with-time-logging
@@ -1977,23 +1985,24 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
              system target)
           (insert-guix-revision-package-derivation-distribution-counts
            conn
-           guix-revision-id
+           (fibers-force guix-revision-id-promise)
            (number->string
             (system->system-id conn system))
            (or target "")))))
 
-    (let ((process-system-and-target/fiberized
-           (fiberize process-system-and-target
+    (let ((get-derivations/fiberized
+           (fiberize get-derivations
                      #:parallelism parallelism)))
       (par-map&
        (match-lambda
          ((system . target)
           (retry-on-missing-store-item
            (lambda ()
-             (process-system-and-target/fiberized system target)))))
-      (call-with-inferior
-       (lambda (inferior inferior-store)
-         (inferior-fetch-system-target-pairs inferior))))))
+             (process-system-and-target system target
+                                        get-derivations/fiberized)))))
+       (call-with-inferior
+        (lambda (inferior inferior-store)
+          (inferior-fetch-system-target-pairs inferior))))))
 
   (define (extract-and-store-system-tests)
     (if skip-system-tests?
@@ -2027,7 +2036,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
               (with-resource-from-pool postgresql-connection-pool conn
                 (insert-system-tests-for-guix-revision
                  conn
-                 guix-revision-id
+                 (fibers-force guix-revision-id-promise)
                  data-with-derivation-ids)))))))
 
   (with-time-logging
@@ -2124,34 +2133,48 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
          (channel-for-commit
           (channel (name 'guix)
                    (url git-repository-url)
-                   (commit commit)))
-         (guix-source
-          channel-derivations-by-system
-          guix-revision-id
-          (retry-on-missing-store-item
-           (lambda ()
-             (let ((guix-source
-                    channel-derivations-by-system
-                    (channel->source-and-derivations-by-system
-                     conn
-                     channel-for-commit
-                     fetch-with-authentication?
-                     #:parallelism parallelism)))
-               (let ((guix-revision-id
-                      (load-channel-instances utility-thread-channel
-                                              git-repository-id commit
-                                              channel-derivations-by-system)))
-                 (values guix-source
-                         channel-derivations-by-system
-                         guix-revision-id)))))))
-    (let ((store-item
-           guix-derivation
-           (channel-derivations-by-system->guix-store-item
-            channel-derivations-by-system)))
+                   (commit commit))))
+
+    (define channel-derivations-by-system-promise
+      (fibers-delay
+       (lambda ()
+         (channel->source-and-derivations-by-system
+          conn
+          channel-for-commit
+          fetch-with-authentication?
+          #:parallelism parallelism))))
+
+    (define guix-revision-id-promise
+      (fibers-delay
+       (lambda ()
+         (retry-on-missing-store-item
+          (lambda ()
+            (let ((guix-source
+                   channel-derivations-by-system
+                   (fibers-force channel-derivations-by-system-promise)))
+              (load-channel-instances utility-thread-channel
+                                      git-repository-id commit
+                                      channel-derivations-by-system)))
+          #:on-exception
+          (lambda ()
+            (fibers-promise-reset channel-derivations-by-system-promise))))))
+
+    ;; Prompt getting the guix-revision-id as soon as possible
+    (spawn-fiber
+     (lambda ()
+       (fibers-force guix-revision-id-promise)))
+
+    (let* ((guix-source
+            channel-derivations-by-system
+            (fibers-force channel-derivations-by-system-promise))
+           (store-item
+            guix-derivation
+            (channel-derivations-by-system->guix-store-item
+             channel-derivations-by-system)))
       (if store-item
           (and
            (extract-information-from conn
-                                     guix-revision-id
+                                     guix-revision-id-promise
                                      commit guix-source store-item
                                      guix-derivation
                                      utility-thread-channel
@@ -2166,21 +2189,22 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
                (with-time-logging "inserting channel news entries"
                  (insert-channel-news-entries-for-guix-revision
                   conn
-                  guix-revision-id
+                  (fibers-force guix-revision-id-promise)
                   (channel-news-for-commit channel-for-commit commit)))
                (begin
                  (simple-format
                   #t "debug: importing channel news not supported\n")
                  #t))
 
-           (update-package-derivations-table conn
-                                             git-repository-id
-                                             guix-revision-id
-                                             commit)
+           (update-package-derivations-table
+            conn
+            git-repository-id
+            (fibers-force guix-revision-id-promise)
+            commit)
            (with-time-logging "updating 
builds.derivation_output_details_set_id"
              (update-builds-derivation-output-details-set-id
               conn
-              (string->number guix-revision-id))))
+              (string->number (fibers-force guix-revision-id-promise)))))
           (begin
             (simple-format #t "Failed to generate store item for ~A\n"
                            commit)
@@ -2572,109 +2596,123 @@ SKIP LOCKED")
 (define* (process-load-new-guix-revision-job id #:key skip-system-tests?
                                              
extra-inferior-environment-variables
                                              parallelism)
-  (with-postgresql-connection
-   (simple-format #f "load-new-guix-revision ~A" id)
-   (lambda (conn)
-     ;; Fix the hash encoding of derivation_output_details. This'll only run
-     ;; once on any given database, but is kept here just to make sure any
-     ;; instances have the data updated.
-     (fix-derivation-output-details-hash-encoding conn)
+  (define result
+    (with-postgresql-connection
+     (simple-format #f "load-new-guix-revision ~A" id)
+     (lambda (conn)
+       ;; Fix the hash encoding of derivation_output_details. This'll only run
+       ;; once on any given database, but is kept here just to make sure any
+       ;; instances have the data updated.
+       (fix-derivation-output-details-hash-encoding conn)
+
+       (exec-query conn "BEGIN")
+
+       (spawn-fiber
+        (lambda ()
+          (while #t
+            (sleep 30)
+
+            (let ((stats (gc-stats)))
+              (simple-format
+               (current-error-port)
+               "process-job heap: ~a MiB used (~a MiB heap)~%"
+               (round
+                (/ (- (assoc-ref stats 'heap-size)
+                      (assoc-ref stats 'heap-free-size))
+                   (expt 2. 20)))
+               (round
+                (/ (assoc-ref stats 'heap-size)
+                   (expt 2. 20))))))))
 
-     (exec-query conn "BEGIN")
+       (match (select-job-for-update conn id)
+         (((id commit source git-repository-id))
 
-     (spawn-fiber
-      (lambda ()
-        (while #t
-          (sleep 30)
+          ;; With a separate connection, outside of the transaction so the 
event
+          ;; gets persisted regardless.
+          (with-postgresql-connection
+           (simple-format #f "load-new-guix-revision ~A start-event" id)
+           (lambda (start-event-conn)
+             (record-job-event start-event-conn id "start")))
 
-          (let ((stats (gc-stats)))
-            (simple-format
-             (current-error-port)
-             "process-job heap: ~a MiB used (~a MiB heap)~%"
-             (round
-              (/ (- (assoc-ref stats 'heap-size)
-                    (assoc-ref stats 'heap-free-size))
-                 (expt 2. 20)))
-             (round
-              (/ (assoc-ref stats 'heap-size)
-                 (expt 2. 20))))))))
-
-     (match (select-job-for-update conn id)
-       (((id commit source git-repository-id))
-
-        ;; With a separate connection, outside of the transaction so the event
-        ;; gets persisted regardless.
-        (with-postgresql-connection
-         (simple-format #f "load-new-guix-revision ~A start-event" id)
-         (lambda (start-event-conn)
-           (record-job-event start-event-conn id "start")))
-
-        (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
-                       id commit source)
-
-        (if (eq?
-             (with-time-logging (string-append "processing revision " commit)
-               (with-exception-handler
-                   (const #f)
-                 (lambda ()
-                   (with-throw-handler #t
-                     (lambda ()
-                       (load-new-guix-revision
-                        conn
-                        git-repository-id
-                        commit
-                        #:skip-system-tests? #t
-                        #:extra-inferior-environment-variables
-                        extra-inferior-environment-variables
-                        #:parallelism parallelism))
-                     (lambda (key . args)
-                       (simple-format (current-error-port)
-                                      "error: load-new-guix-revision: ~A ~A\n"
-                                      key args)
-                       (backtrace))))
-                 #:unwind? #t))
-             #t)
-            (begin
-              (record-job-succeeded conn id)
-              (record-job-event conn id "success")
-              (exec-query conn "COMMIT")
-
-              (with-time-logging
-                  "vacuuming package derivations by guix revision range table"
-                (vacuum-package-derivations-table conn))
-
-              (with-time-logging
-                  "vacuum-derivation-inputs-table"
-                (vacuum-derivation-inputs-table conn))
-
-              (match (exec-query
-                      conn
-                      "SELECT reltuples::bigint FROM pg_class WHERE relname = 
'derivation_inputs'")
-                (((rows))
-                 ;; Don't attempt counting distinct values if there are too
-                 ;; many rows, as that is far to slow and could use up all the
-                 ;; disk space.
-                 (when (< (string->number rows)
-                          1000000000)
-                   (with-time-logging
-                       "update-derivation-inputs-statistics"
-                     (update-derivation-inputs-statistics conn)))))
-
-              (with-time-logging
-                  "vacuum-derivation-outputs-table"
-                (vacuum-derivation-outputs-table conn))
-
-              (with-time-logging
-                  "update-derivation-outputs-statistics"
-                (update-derivation-outputs-statistics conn))
-
-              #t)
-            (begin
-              (exec-query conn "ROLLBACK")
-              (record-job-event conn id "failure")
-
-              #f)))
-       (()
-        (exec-query conn "ROLLBACK")
-        (simple-format #t "job ~A not found to be processed\n"
-                       id))))))
+          (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
+                         id commit source)
+
+          (if (eq?
+               (with-time-logging (string-append "processing revision " commit)
+                 (with-exception-handler
+                     (const #f)
+                   (lambda ()
+                     (with-throw-handler #t
+                       (lambda ()
+                         (load-new-guix-revision
+                          conn
+                          git-repository-id
+                          commit
+                          #:skip-system-tests? #t
+                          #:extra-inferior-environment-variables
+                          extra-inferior-environment-variables
+                          #:parallelism parallelism))
+                       (lambda (key . args)
+                         (simple-format (current-error-port)
+                                        "error: load-new-guix-revision: ~A 
~A\n"
+                                        key args)
+                         (backtrace))))
+                   #:unwind? #t))
+               #t)
+              (begin
+                (record-job-succeeded conn id)
+                (record-job-event conn id "success")
+                (exec-query conn "COMMIT")
+
+                #t)
+              (begin
+                (exec-query conn "ROLLBACK")
+                (record-job-event conn id "failure")
+
+                #f)))
+         (()
+          (exec-query conn "ROLLBACK")
+          (simple-format #t "job ~A not found to be processed\n"
+                         id))))))
+
+  (when result
+    (parallel-via-fibers
+     (with-postgresql-connection
+      (simple-format #f "post load-new-guix-revision ~A" id)
+      (lambda (conn)
+        (with-time-logging
+            "vacuuming package derivations by guix revision range table"
+          (vacuum-package-derivations-table conn))))
+
+     (with-postgresql-connection
+      (simple-format #f "post load-new-guix-revision ~A" id)
+      (lambda (conn)
+        (with-time-logging
+            "vacuum-derivation-inputs-table"
+          (vacuum-derivation-inputs-table conn))
+
+        (match (exec-query
+                conn
+                "SELECT reltuples::bigint FROM pg_class WHERE relname = 
'derivation_inputs'")
+          (((rows))
+           ;; Don't attempt counting distinct values if there are too
+           ;; many rows, as that is far to slow and could use up all the
+           ;; disk space.
+           (when (< (string->number rows)
+                    1000000000)
+             (with-time-logging
+                 "update-derivation-inputs-statistics"
+               (update-derivation-inputs-statistics conn)))))))
+
+     (with-postgresql-connection
+      (simple-format #f "post load-new-guix-revision ~A" id)
+      (lambda (conn)
+        (with-time-logging
+            "vacuum-derivation-outputs-table"
+          (vacuum-derivation-outputs-table conn))
+
+        (with-time-logging
+            "update-derivation-outputs-statistics"
+          (update-derivation-outputs-statistics conn))))))
+
+  result)



reply via email to

[Prev in Thread] Current Thread [Next in Thread]