guix-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: RPC pipelining


From: Roel Janssen
Subject: Re: RPC pipelining
Date: Tue, 11 Jul 2017 13:18:47 +0200
User-agent: mu4e 0.9.18; emacs 25.2.1

Hello Ludo’!

Thanks for working so hard on this.
I run into trouble with my test setup..

address@hidden ~]$ time ./guixr environment --ad-hoc coreutils -- true

;;; (flush-pending-rpcs 170)

;;; (flush-pending-rpcs 4)
substitute: guix substitute: warning: ACL for archive imports seems to be 
uninitialized, substitutes may be unavailable
substitute: ;;; Failed to autoload make-session in (gnutls):
substitute: ;;; ERROR: missing interface for module (gnutls)
substitute: Backtrace:
substitute:            1 (primitive-load "/gnu/repositories/guix/scripts/guix")
substitute: In guix/ui.scm:
substitute:   1352:12  0 (run-guix-command _ . _)
substitute: 
substitute: guix/ui.scm:1352:12: In procedure run-guix-command:
substitute: guix/ui.scm:1352:12: In procedure module-lookup: Unbound variable: 
make-session
guix environment: error: build failed: writing to file: Broken pipe

real    0m8.679s
user    0m1.199s
sys     0m0.202s


But FWIW, I think the time between no output and the "substitute: ..."
output is dramatically shorter.

I'll report back when I have a better testing environment ready.

Kind regards,
Roel Janssen


Ludovic Courtès writes:

> Hello Guix!
>
> One of the main sources of slowness when talking to a remote daemon, as
> with GUIX_DAEMON_SOCKET=guix://…, is the many RPCs that translate in
> lots of network round trips:
>
> --8<---------------cut here---------------start------------->8---
> $ GUIX_PROFILING=rpc ./pre-inst-env guix build inkscape -d --no-grafts
> /gnu/store/iymxyy5sn0qrkivppl6fn0javnmr3nss-inkscape-0.92.1.drv
> Remote procedure call summary: 1006 RPCs
>   built-in-builders              ...     1
>   add-to-store                   ...   136
>   add-text-to-store              ...   869
> --8<---------------cut here---------------end--------------->8---
>
> In this example we’re making ~1,000 round trips; not good!
>
> Before changing the protocol, an idea that came to mind is to do “RPC
> pipelining”: send as many RPC requests at once, then read all the
> corresponding responses.
>
> It turns out to necessitate a small change in the daemon, though, but
> the attached patch demonstrates it: the client buffers all
> ‘add-text-to-store’ RPCs, and writes them all at once when another RPC
> is made (because other RPCs, which are not buffered, might depend on the
> effect of those ‘add-text-to-store’ RPCs) or when the connection is
> closed.  In practice, on the example above, it manages to buffer all 869
> RPCs and send them all at once.
>
> To estimate the effectiveness of this approach, I introduced delay on
> the loopback device with tc-netem(8) and measured execution time (the
> first run uses pipelining, the second doesn’t):
>
> --8<---------------cut here---------------start------------->8---
> $ sudo tc qdisc add dev lo root netem delay 150ms
> $ time GUIX_DAEMON_SOCKET=guix://localhost ./pre-inst-env guix build inkscape 
> -d --no-grafts
> accepted connection from 127.0.0.1
> /gnu/store/iymxyy5sn0qrkivppl6fn0javnmr3nss-inkscape-0.92.1.drv
>
> ;;; (flush-pending-rpcs 869)
>
> real  0m47.796s
> user  0m1.307s
> sys   0m0.056s
> $ time GUIX_DAEMON_SOCKET=guix://localhost guix build inkscape -d --no-grafts
> accepted connection from 127.0.0.1
> /gnu/store/iymxyy5sn0qrkivppl6fn0javnmr3nss-inkscape-0.92.1.drv
>
> real  5m7.226s
> user  0m1.392s
> sys   0m0.056s
> $ sudo tc qdisc del dev lo root
> --8<---------------cut here---------------end--------------->8---
>
> So the wall-clock time is divided by 6 thanks to ‘add-text-to-store’
> pipelining, but it’s still pretty high due to the 136 ‘add-to-store’
> RPCs which are still *not* pipelined.
>
> It’s less clear what to do with these.  Buffering them would require
> clients to compute the store file name of the files that are passed to
> ‘add-to-store’, which involves computing the hash of the files itself,
> which can be quite costly and redundant with what the daemon will do
> eventually anyway.  The CPU cost might be compensated for when latency
> is high, but not when latency is low.
>
> Anyway, food for thought!
>
> For now, if those using Guix on clusters are willing to test the patch
> below (notice that you need to run the patched guix-daemon as well), I’d
> be interested in seeing how representative the above test is!
>
> Ludo’.
>
> diff --git a/guix/store.scm b/guix/store.scm
> index b15da5485..1ba22cf2d 100644
> --- a/guix/store.scm
> +++ b/guix/store.scm
> @@ -40,6 +40,7 @@
>    #:use-module (ice-9 regex)
>    #:use-module (ice-9 vlist)
>    #:use-module (ice-9 popen)
> +  #:use-module (ice-9 format)
>    #:use-module (web uri)
>    #:export (%daemon-socket-uri
>              %gc-roots-directory
> @@ -322,7 +323,7 @@
>  
>  (define-record-type <nix-server>
>    (%make-nix-server socket major minor
> -                    buffer flush
> +                    buffer flush pending-rpcs
>                      ats-cache atts-cache)
>    nix-server?
>    (socket nix-server-socket)
> @@ -332,6 +333,10 @@
>    (buffer nix-server-output-port)                 ;output port
>    (flush  nix-server-flush-output)                ;thunk
>  
> +  ;; List of pending 'add-text-to-store' RPC arguments.
> +  (pending-rpcs nix-server-pending-rpcs
> +                set-nix-server-pending-rpcs!)
> +
>    ;; Caches.  We keep them per-connection, because store paths build
>    ;; during the session are temporary GC roots kept for the duration of
>    ;; the session.
> @@ -509,7 +514,7 @@ for this connection will be pinned.  Return a server 
> object."
>                        (let ((conn (%make-nix-server port
>                                                      (protocol-major v)
>                                                      (protocol-minor v)
> -                                                    output flush
> +                                                    output flush '()
>                                                      (make-hash-table 100)
>                                                      (make-hash-table 100))))
>                          (let loop ((done? (process-stderr conn)))
> @@ -521,8 +526,17 @@ for this connection will be pinned.  Return a server 
> object."
>    (force-output (nix-server-output-port server))
>    ((nix-server-flush-output server)))
>  
> +(define (flush-pending-rpcs server)
> +  (let ((len (length (nix-server-pending-rpcs server))))
> +    (when (> len 0)
> +      (pk 'flush-pending-rpcs len)
> +      (add-data-to-store/multiple server
> +                                  (reverse (nix-server-pending-rpcs server)))
> +      (set-nix-server-pending-rpcs! server '()))))
> +
>  (define (close-connection server)
>    "Close the connection to SERVER."
> +  (flush-pending-rpcs server)
>    (close (nix-server-socket server)))
>  
>  (define-syntax-rule (with-store store exp ...)
> @@ -811,6 +825,8 @@ bytevector) as its internal buffer, and a thunk to flush 
> this output port."
>         docstring
>         (let* ((s (nix-server-socket server))
>                (buffered (nix-server-output-port server)))
> +         (unless (eq? 'name 'add-text-to-store)
> +           (flush-pending-rpcs server))
>           (record-operation 'name)
>           (write-int (operation-id name) buffered)
>           (write-arg type arg buffered)
> @@ -822,6 +838,32 @@ bytevector) as its internal buffer, and a thunk to flush 
> this output port."
>             (or done? (loop (process-stderr server))))
>           (values (read-arg return s) ...))))))
>  
> +
> +(define-syntax operation-pipeline
> +  (syntax-rules ()
> +    "Define a client-side RPC stub for the given operation."
> +    ((_ (name (type arg) ...) docstring return ...)
> +     (lambda (server arg-list)
> +       docstring
> +       (let* ((s (nix-server-socket server))
> +              (buffered (nix-server-output-port server)))
> +         (record-operation 'name)
> +         (for-each (match-lambda
> +                     ((arg ...)
> +                      (write-int (operation-id name) buffered)
> +                      (write-arg type arg buffered)
> +                      ...))
> +                   arg-list)
> +         (write-buffered-output server)
> +
> +         (map (lambda (_)
> +                ;; Loop until the server is done sending error output.
> +                (let loop ((done? (process-stderr server)))
> +                  (or done? (loop (process-stderr server))))
> +
> +                (list (read-arg return s) ...))
> +              arg-list))))))
> +
>  (define-syntax-rule (define-operation (name args ...)
>                        docstring return ...)
>    (define name
> @@ -856,6 +898,20 @@ string).  Raise an error if no such path exists."
>    "Return the info (hash, references, etc.) for PATH."
>    path-info)
>  
> +(define add-data-to-store/multiple
> +  (operation-pipeline
> +   (add-text-to-store (string name) (bytevector text)
> +                      (string-list references))
> +   #f
> +   store-path))
> +
> +(define (add-data-to-store/buffer server name bytes references)
> +  (let ((pending (nix-server-pending-rpcs server)))
> +    (set-nix-server-pending-rpcs! server
> +                                  (cons (list name bytes references)
> +                                        pending))
> +    (text-output-path name bytes references)))
> +
>  (define add-data-to-store
>    ;; A memoizing version of `add-to-store', to avoid repeated RPCs with
>    ;; the very same arguments during a given session.
> @@ -871,7 +927,7 @@ path."
>        (let* ((args  `(,bytes ,name ,references))
>               (cache (nix-server-add-text-to-store-cache server)))
>          (or (hash-ref cache args)
> -            (let ((path (add-text-to-store server name bytes references)))
> +            (let ((path (add-data-to-store/buffer server name bytes 
> references)))
>                (hash-set! cache args path)
>                path))))))
>  
> @@ -1485,6 +1541,16 @@ the derivation called NAME with hash HASH."
>                    name
>                    (string-append name "-" output))))
>  
> +(define (text-output-path name bv references)
> +  "Return an output path for NAME, with contents BV and the given REFERENCES.
> +The result is the same as that produced by 'add-data-to-store' with the same
> +arguments."
> +  (store-path (string-append "text"
> +                             (string-join (sort references string<?)
> +                                          ":" 'prefix))
> +              (sha256 bv)
> +              name))
> +
>  (define* (fixed-output-path name hash
>                              #:key
>                              (output "out")
> diff --git a/nix/nix-daemon/nix-daemon.cc b/nix/nix-daemon/nix-daemon.cc
> index 7d26b6135..72851e1cb 100644
> --- a/nix/nix-daemon/nix-daemon.cc
> +++ b/nix/nix-daemon/nix-daemon.cc
> @@ -9,6 +9,7 @@
>  #include "builtins.hh"
>  
>  #include <algorithm>
> +#include <iostream>
>  
>  #include <cstring>
>  #include <unistd.h>
> @@ -79,8 +80,7 @@ static void tunnelStderr(const unsigned char * buf, size_t 
> count)
>  
>  
>  /* Return true if the remote side has closed its end of the
> -   connection, false otherwise.  Should not be called on any socket on
> -   which we expect input! */
> +   connection, false otherwise.  */
>  static bool isFarSideClosed(int socket)
>  {
>      struct timeval timeout;
> @@ -95,17 +95,24 @@ static bool isFarSideClosed(int socket)
>  
>      if (!FD_ISSET(socket, &fds)) return false;
>  
> -    /* Destructive read to determine whether the select() marked the
> -       socket as readable because there is actual input or because
> -       we've reached EOF (i.e., a read of size 0 is available). */
> -    char c;
> -    int rd;
> -    if ((rd = read(socket, &c, 1)) > 0)
> -        throw Error("EOF expected (protocol error?)");
> -    else if (rd == -1 && errno != ECONNRESET)
> -        throw SysError("expected connection reset or EOF");
> +    /* Check whether whether 'select' marked the socket as readable because
> +       there is actual input or because we've reached EOF (i.e., a read of
> +       size 0 is available).  */
> +    char c; int rd;
> +    do {
> +     rd = recv(socket, &c, sizeof c, MSG_PEEK);
> +    }
> +    while (rd == -1 && errno == EINTR);
>  
> -    return true;
> +    if (rd == -1) {
> +     if (errno == ECONNRESET)
> +         /* Remote side is definitely closed.  */
> +         return true;
> +     else
> +         throw SysError("while peeking client input");
> +    }
> +
> +    return rd == 0;
>  }
>  
>  
> @@ -136,9 +143,6 @@ static void sigPollHandler(int sigNo)
>                  const char * s = "SIGPOLL\n";
>                  write(STDERR_FILENO, s, strlen(s));
>              }
> -        } else {
> -            const char * s = "spurious SIGPOLL\n";
> -            write(STDERR_FILENO, s, strlen(s));
>          }
>      }
>      catch (Error & e) {
> @@ -847,8 +851,8 @@ static void acceptConnection(int fdSocket)
>  
>         /* If we're on a TCP connection, disable Nagle's algorithm so that
>            data is sent as soon as possible.  */
> -       (void) setsockopt(remote, SOL_TCP, TCP_NODELAY,
> -                         &enabled, sizeof enabled);
> +       // (void) setsockopt(remote, SOL_TCP, TCP_NODELAY,
> +       //                &enabled, sizeof enabled);
>  
>  #if defined(TCP_QUICKACK)
>         /* Enable TCP quick-ack if applicable; this might help a little.  */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]