(use-modules (web uri)
             (web request)
             (web response)
             (web client)
             (web http)
             (srfi srfi-1)
             (ice-9 threads)
             (ice-9 match)
             (rnrs bytevectors)
             (srfi srfi-11)
             (srfi srfi-9)
             (srfi srfi-9 gnu)
             (srfi srfi-26)
             (gnutls)
             (ice-9 binary-ports)
             ((ice-9 ftw) #:select (scandir))
             ((rnrs io ports)
              #:prefix rnrs-ports:))

(define* (call-with-streaming-http-request uri callback
                                           #:key (headers '()))
  (let* ((port (open-socket-for-uri uri))
         (request
          (build-request
           uri
           #:method 'PUT
           #:version '(1 . 1)
           #:headers `((connection close)
                       (Transfer-Encoding . "chunked")
                       (Content-Type . "application/octet-stream")
                       ,@headers)
           #:port port)))

    (set-port-encoding! port "ISO-8859-1")
    (let ((request (write-request request port)))
      (let ((chunked-output-port
             (make-chunked-output-port
              port
              #:buffering 128
              #:keep-alive? #t)))

        ;; A SIGPIPE will kill Guile, so ignore it
        (sigaction SIGPIPE
          (lambda (arg)
            (simple-format (current-error-port) "warning: SIGPIPE\n")))

        (set-port-encoding! chunked-output-port "ISO-8859-1")
        (callback chunked-output-port)
        (retry-gnutls-resource-temporarily-unavailable
         (lambda ()
           (close-port chunked-output-port)))
        (display "\r\n" port)
        (force-output port))

      (let ((response (read-response port)))
        (let ((body (read-response-body response)))
          (close-port port)
          (values response
                  body))))))

(define (retry-gnutls-resource-temporarily-unavailable thunk)
  (catch 'gnutls-error
    thunk
    (lambda (key err proc . rest)
      (if (eq? error/again err)
          (begin
            (simple-format (current-error-port)
                           "error/again\n")
            (sleep 1)
            (thunk))
          (throw key (cons* err proc rest))))))

(define (start-thread thread-index)
  (call-with-new-thread
   (lambda ()
     (for-each
      (lambda (request-index)
        (with-throw-handler #t
          (lambda ()
            (call-with-streaming-http-request
             ;; The URL doesn't realy matter as the response to the
             ;; request doesn't matter.
             (peek (string->uri (if (= thread-index 1)
                              "https://guix.cbaines.net/test"
                              "https://www.cbaines.net/test")))
             (lambda (port)
               (simple-format (current-error-port)
                              "thread ~A making request\n"
                              thread-index)
               (let* ((buffer-size 128)
                      (buffer (make-bytevector buffer-size)))
                 (for-each (lambda (index)
                             ;; (usleep 10)
                             (retry-gnutls-resource-temporarily-unavailable
                              (lambda ()
                                (put-bytevector port buffer 0 buffer-size))))
                           (iota 20000))))))
          (lambda (key . args)
            (simple-format #t "thread ~A: exception: ~A ~A\n"
                           thread-index key args)
            (backtrace))))
      (iota 1 1)))))

(define threads
  (map start-thread
       (iota 6 1)))

(for-each join-thread threads)