[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: stis engine
From: |
Stefan Israelsson Tampe |
Subject: |
Re: stis engine |
Date: |
Wed, 25 Aug 2021 00:31:25 +0200 |
;; Oups I managed to send the message by accident without finishing it.
;; The server part is similar and I will drop the actual pipeline and
highlight the differenece
;; which is that we will get a message stream it to an scheme object call
server-.lambda below
;; with it and finally from that lambdas return value reply to the client
we will create a loop where
;; the server waits for questions
(define* (make-server server-lambda address ip-server? #key (context #f))
(define schemer (fpipe-schemer ch1 ch2))
(spawn-fiber
(lambda ()
(let lp ()
(schemer %fpipe-eof%)
(lp)))))
Finally the idea is to use it as
(define context (make-zmq-context))
(define address "") ;; ZeroMQ address
(define ip-server? #t) ;; if we will use bind or connect
(make-server (lambda (x) (cons '() x)) context address ip-server?)
(define client (make-client context address ip-server?))
;; send a mesage that is not compressed
> (client "abc")
"abc"
> (length (client (iota 1000000) #:compress? #t))
1000000
in a run fibers context
On Wed, Aug 25, 2021 at 12:25 AM Stefan Israelsson Tampe <
stefan.itampe@gmail.com> wrote:
> I have now made a fiber enables stream library called fpipe in my
> stis-engine repository see
>
> https://gitlab.com/tampe/stis-engine/-/tree/master/
>
> The idea is to focus on a high performance byte streaming library on top
> of wingo's fibers library and make heavy use of bytevector buffers. We will
> also gp between a stream of scheme
> values and these byte streams to seamlessly be able to integrate a good
> overview of the data pipeline.
>
> The following code uses a c-based serializer and deserializer of scheme
> data structures
> and allow for optional streamed compression and decompression and
> transport it over ZeroMQ
> networking which allow for thread/process/computer movement of data. The
> end result is a way to create servers and clients.
>
> It is instructive to show the code for the client and server pipelines is
> constructed tp show off the fpipes library. This is not the final design
> but moste components are done
>
> Here is the client
>
> (define* (make-client address ip-server? #key (context #f))
> ;; First we setup the zmq networking
> (define ctx (if context context (make-zmq-context)))
> (define socket (zmq-socket context ZMQ_REQ))
>
> (if ip-server?
> (zmq-bind socket address)
> (zmq-connect socket address))
>
> ;; we will define to fiber channels, channel in = ch1 and channel out =
> ch2
> (define-values (ch1 ch2)
>
> ;; fpipe-construct is the general pipelining macro
> (fpipe-construct
>
> ;; this is a scheme condition that will match check a message
> bounded to it
> (cond
> (#:scm it)
>
> ;; format of the matcher is (predicate . translatot) where if
> predicate is true we will
> ;; push the message to the branching pipline this assumes a message
> is the form
> ;; ((list-of-features) . message)
> (((memq 'compress (car it)) #:tr (cdr it))
>
> ;; the c-based stremed serializer that integrates nicely with
> fibers and streams
> ;; the message transport is the form scm->bytesteam
>
> (mk-c-atom->fpipe)
>
> ;; the zlib compressor node will tarnsport as bytestream->bytestream
> compress-from-fpipe-to-fpipe
>
> ;; a bytestream->bytestream that will prepend a message with 1 to
> indicate that the stream
> ;; has been compressed
> #:prepend #u8(1))
>
> ;; if we do not have the compress feature then we will simply
> generate the stream and
> ;; prepend a one e.g. not doing any compression
> (else
> (mk-c-atom->fpipe)
> #:prepend #u8(0)))
>
> ;; transport the message byetstream over the zmq socket this will
> retrun in a scheme
> ;; stream where eof will survive as all control messages are and
> will initiate the next
> ;; reading from the socket (when the request message has been fully
> sent.
> (fpipe->zmq socket)
>
> ;; so here we get the return message
> (zmq->fpipe socket)
>
> ;; This is a bytestream cond and has no it part,
> (cond
> ;; We try to match the beginning of the bytestream message and if
> it starts with 1
> ;; then we know that the reply message has been compressed
> ((#:match u8(1) #:skip 1)
> decompress-from-pipe-to-pipe)
>
> ;; else no compression.
> ((else #:skip 1)
> ))
>
> ;; the final step is to take the bytestream and make a scheme object
> and put that
> ;; to the scheme stream and the pipe is finished
> (mk-fpipe->c-atom)))
>
> ;; fpipe-scheme takes a piplend from scm to scm and creates a function
> of it.
> ;; each time the function is called with a scheme object we will send
> it ot the server
> ;; from the return message create a scheme object that is returned from
> the funciton
> (define action (fpipe-schemer ch1 ch2))
>
> ;; A little nicer interface and we are finished
> (lambda* (message #:key (compress? #f))
> (action (cons (if compress? '(compress) '()) message))))
>
>
> ;; SERVER
> (define* (make-server server-lambda address ip-server? #key (context #f))
>
> (define schemer (fpipe-schemer ch1 ch2))
>
> (spawn-fiber
> (lambda ()
> (let lp ()
> (schemer %fpipe-eof%)
> (lp)))))
>
>
- stis engine, Stefan Israelsson Tampe, 2021/08/24
- Re: stis engine,
Stefan Israelsson Tampe <=