[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 87/324: mq: define message queue module
From: |
gnunet |
Subject: |
[gnunet-scheme] 87/324: mq: define message queue module |
Date: |
Tue, 21 Sep 2021 13:22:07 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit a851800e1c471adc51b65f6298907fbc84e9073c
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Mar 4 18:56:33 2021 +0100
mq: define message queue module
* gnu/gnunet/mq/message-io.scm: new module.
* tests/message-io.scm: test it.
* README.org (Modules): document message queues.
* Makefile.am: compile new module and run its tests.
---
Makefile.am | 4 +-
README.org | 8 ++
gnu/gnunet/mq/message-io.scm | 238 +++++++++++++++++++++++++++++++++++++++++++
tests/message-io.scm | 196 +++++++++++++++++++++++++++++++++++
4 files changed, 445 insertions(+), 1 deletion(-)
diff --git a/Makefile.am b/Makefile.am
index 3e85312..bb6f9f3 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -41,6 +41,7 @@ modules = \
gnu/gnunet/util/mq-enum.scm \
gnu/gnunet/util/mq-handler.scm \
gnu/gnunet/util/mq.scm \
+ gnu/gnunet/mq/message-io.scm \
\
gnu/gnunet/utils/bv-slice.scm \
gnu/gnunet/utils/hat-let.scm \
@@ -88,7 +89,8 @@ SCM_LOG_DRIVER = \
SCM_TESTS = \
tests/envelope.scm \
tests/message-handler.scm \
- tests/update.scm
+ tests/update.scm \
+ tests/message-io.scm
SCM_TESTS_ENVIRONMENT = \
GUILE_AUTO_COMPILE=0 \
diff --git a/README.org b/README.org
index b6ec797..2b41933 100644
--- a/README.org
+++ b/README.org
@@ -52,6 +52,11 @@
can be waited upon.
** Message queues
+ Message queues have three parts: the input queue, the output
+ queue and the transport, that are respectively a read+close request
+ capability, a write+close request capability and a capability
+ for all the previous, reacting to a close request and injecting errors.
+
+ gnu/gnunet/util/mq.scm: message priorities & preferences
Preferences: is out-of-order allowed or not,
@@ -66,7 +71,10 @@
the ambient authority appropriately.
TODO rename to gnu/gnunet/mq/handler.scm
+ + gnu/gnunet/mq/message-io.scm: like soft ports, but using
+ fibers channels and for messages.
+ TODO actual queues? Maybe we don't need them?
+ + TODO filling the queues
** Network structures
Features:
diff --git a/gnu/gnunet/mq/message-io.scm b/gnu/gnunet/mq/message-io.scm
new file mode 100644
index 0000000..b19fb6b
--- /dev/null
+++ b/gnu/gnunet/mq/message-io.scm
@@ -0,0 +1,238 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program. If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; @author Maxime Devos (scheme-GNUnet)
+;;
+;; @brief Generic interface for sending / receiving messages
+;; TODO perhaps some kind of buffering would be useful.
+;;
+;; What's the impact on performance from having to wait on a fiber
+;; when sending / receving a message? Maybe put the 'queue' in
+;; message queue when it makes sense.
+;;
+;; TODO integrate with message envelopes. Maybe change the
+;; definition of envelopes.
+(define-library (gnu gnunet mq message-io)
+ (export <message-input> message-transport->input message-input?
+ read-message-operation
+ read-input-error-operation
+ close-input!
+ wait-for-input-closed-operation
+
+ <message-output> message-transport->output message-output?
+ send-message-operation
+ read-output-error-operation
+ wait-for-output-closed-operation
+ close-output!
+
+ <message-transport> make-message-transport message-transport?
+ wait-for-transport-close-operation
+ notice-input-error-operation
+ notice-output-error-operation
+ close-transport!)
+ (import (only (rnrs base)
+ begin let define lambda assert)
+ (only (fibers operations)
+ wrap-operation)
+ (only (fibers conditions)
+ make-condition signal-condition! wait-operation)
+ (only (fibers channels)
+ make-channel get-operation put-operation)
+ (only (ice-9 atomic)
+ make-atomic-box atomic-box-ref atomic-box-set!)
+ (only (rnrs records syntactic)
+ define-record-type)
+ (only (rnrs conditions)
+ define-condition-type
+ &violation
+ make-message-condition
+ make-who-condition))
+ (begin
+
+ (define-record-type (<message-transport> make-message-transport
message-transport?)
+ (fields (immutable close? transport-close-condition) ; condition
+ (immutable closed? transport-closed-condition) ; condition
+ (immutable messages transport-messages) ; fibers channel
+ (immutable input-errors transport-input-errors) ; fibers channel
+ (immutable output-errors transport-output-errors) ; fibers channel
+ ;; TODO I don't think atomic boxes are strictly required here.
+ ;; atomic box of (#f or the errors)
+ (immutable input-errors/close transport-input-errors/close)
+ ;; atomic box of (#f or the errors)
+ (immutable output-errors/close transport-output-errors/close))
+ (protocol
+ (lambda (%make)
+ (lambda ()
+ "Return a fresh message transport.
+
+Messages will be sent from the output half-pipe to the input half-pipe.
+By default, closing the half-pipes will do nothing, and the half-pipes
+will remain marked as open. Use @code{wait-for-transport-close-operation}
+and @code{close-transport!} to react to close requests.
+
+Errors can be sent with @code{notice-input-error-operation} and
+@code{notice-output-error-operation}. Note that input and output
+errors are separated.
+
+No restrictions are placed upon the types of messages sent."
+ (%make (make-condition)
+ (make-condition)
+ (make-channel)
+ (make-channel)
+ (make-channel)
+ (make-atomic-box #f)
+ (make-atomic-box #f)))))
+ (opaque #t)
+ (sealed #f))
+
+ (define-record-type (<message-input> message-transport->input
message-input?)
+ (fields (immutable transport message-input-transport))
+ (protocol
+ (lambda (%make)
+ (lambda (transport)
+ "Return an input queue corresponding to the transport
+@var{transport}. Currently, this is a fresh object, but that might
+change in the future."
+ (assert (message-transport? transport))
+ (%make transport))))
+ (opaque #t)
+ (sealed #f))
+
+ (define-record-type (<message-output> message-transport->output
message-output?)
+ (fields (immutable transport message-output-transport))
+ (protocol
+ (lambda (%make)
+ (lambda (transport)
+ "Return an output queue corresponding to the transport
+@var{transport}. Currently, this is a fresh object, but that might
+change in the future."
+ (assert (message-transport? transport))
+ (%make transport))))
+ (opaque #t)
+ (sealed #f))
+
+ (define (close-input! in)
+ "Close the input queue @var{in} (asynchronuous).
+@code{wait-for-input-closed-operation} can be used to wait
+until the queue has been closed. This has the same effect
+as @code{close-output!} on the output queue."
+ (assert (message-input? in))
+ (signal-condition!
+ (transport-close-condition (message-input-transport in))))
+
+ (define (close-output! out)
+ "Close the output queue @var{in} (asynchronuous).
+@code{wait-for-output-closed-operation} can be used to wait
+until the queue has been closed. This has the same effect
+as @code{close-input!} on the input queue."
+ (assert (message-output? out))
+ (signal-condition!
+ (transport-close-condition (message-output-transport out))))
+
+ (define (read-message-operation in)
+ "Return an operation for reading a message from the input queue @var{in}.
+
+The operation will block until a message has been read, so this should probably
+be combined with @code{wait-for-input-closed-operation} and
+@code{read-input-error-operation}."
+ (assert (message-input? in))
+ (get-operation (transport-messages (message-input-transport in))))
+
+ (define (send-message-operation out msg)
+ "Make an operation for sending a message @var{msg} into the output queue
+@var{out}.
+
+The operation will block until the message has been sent (though it may take
+some time before it ends up on the other side of the network, and some kind
+of output error could happen in-between), so this should probably be combined
+with @code{wait-for-output-closed-operation} and
@code{read-output-error-operation}"
+ (assert (message-output? out))
+ (put-operation (transport-messages (message-output-transport out)) msg))
+
+ (define (read-input-error-operation in)
+ "Return an operation for reading the next input error from the
+input queue @var{in}."
+ (assert (message-input? in))
+ (get-operation (transport-input-errors (message-input-transport in))))
+
+ (define (read-output-error-operation out)
+ "Return an operation for reading the next output error from the
+output queue @var{out}."
+ (assert (message-output? out))
+ (get-operation (transport-output-errors (message-output-transport out))))
+
+ (define (wait-for-transport-close-operation transport)
+ "Return an operation for waiting upon a close request
+from the input or output queue."
+ (assert (message-transport? transport))
+ (wait-operation (transport-close-condition transport)))
+
+ (define (close-transport! transport input-errors output-errors)
+ "Close the transport @var{transport}, with some closing input errors
+and closing output errors @var{input-errors} and @var{output-errors}.
+This marks the input and output queues as closed.
+XXX double closes probably should be detected."
+ (assert (message-transport? transport))
+ (atomic-box-set! (transport-input-errors/close transport)
+ input-errors)
+ (atomic-box-set! (transport-output-errors/close transport)
+ output-errors)
+ (signal-condition!
+ (transport-closed-condition transport)))
+
+ (define (notice-input-error-operation transport error)
+ "Return an operation for indicating the transport @var{transport}
+noticed an input error @var{error}. This will block if no fiber is waiting
+for an input error, so this procedure should probably not be used after
+the transport has been closed."
+ (assert (message-transport? transport))
+ (put-operation (transport-input-errors transport) error))
+
+ (define (notice-output-error-operation transport error)
+ "Return an operation for indicating the transport @var{transport}
+noticed an output error @var{error}. This will block if no fiber is waiting
+for an output error, so this procedure should probably not be used after
+the transport has been closed."
+ (assert (message-transport? transport))
+ (put-operation (transport-output-errors transport) error))
+
+ (define (wait-for-output-closed-operation out)
+ "Return an operation for waiting until the output queue @var{out}
+has been closed. This has the same effect as waiting until the corresponding
+input queue has been closed, except the return values are presumably different.
+Any output errors happening during the closing are returned in a data structure
+according to the transport."
+ (assert (message-output? out))
+ (let ((transport (message-output-transport out)))
+ (wrap-operation
+ (wait-operation (transport-closed-condition transport))
+ (lambda ()
+ (atomic-box-ref (transport-output-errors/close transport))))))
+
+ (define (wait-for-input-closed-operation in)
+ "Return an operation for waiting until the input queue @var{in}
+has been closed. This has the same effect as waiting until the corresponding
+output queue has been closed, except the return values are presumably
different.
+Any input errors happening during the closing are returned in a data structure
+according to the transport."
+ (assert (message-input? in))
+ (let ((transport (message-input-transport in)))
+ (wrap-operation
+ (wait-operation (transport-closed-condition transport))
+ (lambda ()
+ (atomic-box-ref (transport-input-errors/close transport))))))))
diff --git a/tests/message-io.scm b/tests/message-io.scm
new file mode 100644
index 0000000..f283855
--- /dev/null
+++ b/tests/message-io.scm
@@ -0,0 +1,196 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program. If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+(use-modules (gnu gnunet mq message-io)
+ (rnrs base)
+ (fibers)
+ ((fibers internal)
+ #:select (yield-current-fiber))
+ (fibers conditions)
+ (fibers operations))
+
+;; Test whether inputs, outputs and transports are disjoint.
+;;
+;; It would not be unreasonable if transport actually was a subtype
+;; of input and output --- transport implies input and output, after all.
+(define %transport (make-message-transport))
+
+(test-eq "message-transport? predicate"
+ #t
+ (message-transport? %transport))
+(test-eq "message-input? predicate"
+ #t
+ (message-input? (message-transport->input %transport)))
+(test-eq "message-output? predicate"
+ #t
+ (message-output? (message-transport->output %transport)))
+
+(test-equal "message-transport? -> not message-input? / message-output?"
+ '(#f . #f)
+ (cons (message-input? %transport)
+ (message-output? %transport)))
+
+(test-equal "message-input? -> not message-output? / message-transport?"
+ '(#f . #f)
+ (let ((i (message-transport->input %transport)))
+ (cons (message-output? i)
+ (message-transport? i))))
+
+(test-equal "message-output? -> not message-input? / message-transport?"
+ '(#f . #f)
+ (let ((o (message-transport->output %transport)))
+ (cons (message-input? o)
+ (message-transport? o))))
+
+
+
+;; Test whether inputs and outputs are connected
+
+;; Non-preempting, in order to be able to detect
+;; blocking later.
+(define (run-fibers* thunk)
+ (run-fibers thunk
+ #:install-suspendable-ports? #f
+ #:parallelism 1
+ #:hz 0))
+
+(define-syntax-rule (with-fibers exp exp* ...)
+ (run-fibers* (lambda () exp exp* ...)))
+
+(define-syntax-rule (async exp exp* ...)
+ (spawn-fiber (lambda () exp exp* ...)))
+
+(test-eq "send-message-operation & read-message-operation"
+ 'stuff
+ (let* ((transport (make-message-transport))
+ (input (message-transport->input transport))
+ (output (message-transport->output transport))
+ (send (send-message-operation output 'stuff))
+ (input (read-message-operation input)))
+ (with-fibers
+ (async (perform-operation send))
+ (perform-operation input))))
+
+(define-syntax-rule (test-notice/i/o-error str transport->i/o notice-error
read-error)
+ (test-eq str
+ 'oops
+ (let* ((transport (make-message-transport))
+ (i/o (transport->i/o transport))
+ (notice-oops (notice-error transport 'oops))
+ (wait-for-oops (read-error i/o)))
+ (with-fibers
+ (async (perform-operation notice-oops))
+ (perform-operation wait-for-oops)))))
+
+(test-notice/i/o-error "notice-input-error-operation &
read-input-error-operation"
+ message-transport->input
+ notice-input-error-operation
+ read-input-error-operation)
+
+(test-notice/i/o-error "notice-output-error-operation &
read-output-error-operation"
+ message-transport->output
+ notice-output-error-operation
+ read-output-error-operation)
+
+
+;; Test closing
+
+(define (make-detect-blocking-operation loops)
+ "Return a condition that returns when a newly-spawned
+fiber is yields LOOPS times"
+ (assert (and (exact-integer? loops)
+ (>= loops 0)))
+ (let ((op (make-condition)))
+ (async
+ (let loop ((n loops))
+ (if (>= n 0)
+ (begin (assert (yield-current-fiber))
+ (loop (- n 1)))
+ (signal-condition! op))))
+ (wait-operation op)))
+(define *loops* 16) ; should be enough
+
+(define-syntax-rule (test-i/o-open str transport->i/o wait-i/o-closed)
+ (test-eq str
+ 'blocking
+ (let* ((transport (make-message-transport))
+ (i/o (transport->i/o transport))
+ (wait-closed (wait-i/o-closed i/o)))
+ (with-fibers
+ (let ((is-blocking (make-detect-blocking-operation *loops*)))
+ (perform-operation
+ (choice-operation
+ (wrap-operation is-blocking (const 'blocking))
+ (wrap-operation wait-closed (const 'closed)))))))))
+
+(test-i/o-open "transports start open (input)"
+ message-transport->input wait-for-input-closed-operation)
+
+(test-i/o-open "transports start open (output)"
+ message-transport->output wait-for-output-closed-operation)
+
+(test-i/o-open "transports start non-closing"
+ identity wait-for-transport-close-operation)
+
+(define (test-input-closed x)
+ (perform-operation
+ (choice-operation
+ (wrap-operation (wait-for-input-closed-operation
+ (message-transport->input x))
+ (lambda _ _))
+ (wrap-operation (make-detect-blocking-operation *loops*)
+ (lambda () '())))))
+
+(define (test-output-closed x)
+ (perform-operation
+ (choice-operation
+ (wrap-operation (wait-for-output-closed-operation
+ (message-transport->output x))
+ (lambda _ _))
+ (wrap-operation (make-detect-blocking-operation *loops*)
+ (lambda () '())))))
+
+(test-equal "input & output are closed by close-transport!"
+ '((input-errors) . (output-errors))
+ (let* ((transport (make-message-transport)))
+ (with-fibers
+ (close-transport! transport 'input-errors 'output-errors)
+ (cons (test-input-closed transport)
+ (test-output-closed transport)))))
+
+(define-syntax-rule
+ (test-i/o-close-request str transport->i/o close-i/o! test-i/o-closed)
+ (test-equal str
+ '(close-me)
+ (let* ((transport (make-message-transport)))
+ (with-fibers
+ (close-i/o! (transport->i/o transport))
+ (cons (perform-operation
+ (choice-operation
+ (wrap-operation (wait-for-transport-close-operation transport)
+ (lambda () 'close-me))
+ (wrap-operation (make-detect-blocking-operation *loops*)
+ (lambda () 'not-yet))))
+ (test-i/o-closed transport))))))
+
+(test-i/o-close-request
+ "only wait-for-transport-close succeeds after a close request (input)"
+ message-transport->input close-input! test-input-closed)
+(test-i/o-close-request
+ "only wait-for-transport-close succeeds after a close request (output)"
+ message-transport->output close-output! test-output-closed)
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 133/324: mq: Pluralise ‘message-queue-handler’., (continued)
- [gnunet-scheme] 133/324: mq: Pluralise ‘message-queue-handler’., gnunet, 2021/09/21
- [gnunet-scheme] 137/324: mq: Make %suspicious-length a sort-of exported parameter., gnunet, 2021/09/21
- [gnunet-scheme] 139/324: mq: Make accessors of &overly-full-queue-warnings predictably named., gnunet, 2021/09/21
- [gnunet-scheme] 141/324: mq: Return the envelope after enqueueing and add more tests., gnunet, 2021/09/21
- [gnunet-scheme] 147/324: Merge branch 'master' into proper-mq, gnunet, 2021/09/21
- [gnunet-scheme] 97/324: utils: bv-slice: Define a record printer., gnunet, 2021/09/21
- [gnunet-scheme] 118/324: netstruct: Fix field lookup and offset calculation., gnunet, 2021/09/21
- [gnunet-scheme] 140/324: mq: Export &overly-full-queue-warning and friends., gnunet, 2021/09/21
- [gnunet-scheme] 144/324: mq: Test message cancellation., gnunet, 2021/09/21
- [gnunet-scheme] 145/324: mq: envelope: Correct spelling in comments and docstrings., gnunet, 2021/09/21
- [gnunet-scheme] 87/324: mq: define message queue module,
gnunet <=
- [gnunet-scheme] 96/324: doc: Fix typo in README.org., gnunet, 2021/09/21
- [gnunet-scheme] 102/324: tests: config-parser: Don't generate inexact numbers., gnunet, 2021/09/21
- [gnunet-scheme] 116/324: utils: hat-let: Fix inline procedure definitions., gnunet, 2021/09/21
- [gnunet-scheme] 117/324: mq: Define envelope data type, again., gnunet, 2021/09/21
- [gnunet-scheme] 126/324: netstruct: Verify there is a setter (not a reader) in set%!., gnunet, 2021/09/21
- [gnunet-scheme] 128/324: netstruct: Raise &unwritable, not an &unreadable, in set%!., gnunet, 2021/09/21
- [gnunet-scheme] 131/324: tests: mq: Work-around guile-fibers bug., gnunet, 2021/09/21
- [gnunet-scheme] 143/324: tests: Extract conservative-gc? in a library., gnunet, 2021/09/21
- [gnunet-scheme] 150/324: Merge branch 'master' into proper-mq, gnunet, 2021/09/21
- [gnunet-scheme] 148/324: utils: tokeniser: Split message streams into individual messages., gnunet, 2021/09/21