;; Copyright (C) 2015 Amirouche Boubekki
;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; Lesser General Public License for more details.
;;
;; You should have received a copy of the GNU Lesser General Public
;; License along with this library; if not, write to the Free Software
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
;; 02110-1301 USA
(define-module (async))
(use-modules (srfi srfi-69)) ;; hash-table
(use-modules (ice-9 q))
(use-modules (ice-9 match))
(use-modules (ice-9 rdelim))
(use-modules (srfi srfi-9))
;; srfi-queue :o)
(define make-queue make-q)
(define push! enq!)
(define pop! deq!)
(define empty? q-empty?)
(define-record-type
(make-loop running tasks readers writers)
loop?
(running loop-running loop-running!)
(tasks loop-tasks loop-tasks!)
(readers loop-readers loop-readers!)
(writers loop-writers loop-writers!))
(define loop (make-loop #false (make-queue) (make-hash-table) (make-hash-table)))
(define-public *loop* (make-fluid loop))
(define-public (loop-call-later thunk) ;; support delay
(let ((loop (fluid-ref *loop*)))
(push! (loop-tasks loop) thunk)))
(define (loop-add-reader port callback)
(let* ((loop (fluid-ref *loop*))
(readers (loop-readers loop))
(queue (hash-table-ref readers port make-queue)))
(push! queue callback)
(hash-table-set! readers port queue)))
(define (loop-add-writer port callback)
(let* ((loop (fluid-ref *loop*))
(writers (loop-writers loop))
(queue (hash-table-ref writers port make-queue)))
(push! queue callback)
(hash-table-set! writers port queue)))
(define (call-read-callback port)
(let* ((loop (fluid-ref *loop*))
(queue (hash-table-ref (loop-readers loop) port)))
(when (and queue (not (empty? queue)))
(let ((callback (pop! queue)))
(when (empty? queue)
(hash-table-delete! (loop-readers loop) port))
(callback)))))
(define (call-write-callback port)
(let* ((loop (fluid-ref *loop*))
(queue (hash-table-ref (loop-writers loop) port)))
(when (and queue (not (empty? queue)))
(let ((callback (pop! queue)))
(when (empty? queue)
(hash-table-delete! (loop-writers loop) port))
(callback)))))
(define (loop-run-once)
(let ((loop (fluid-ref *loop*))
(readers (hash-table-keys (loop-readers loop)))
(writers (hash-table-keys (loop-writers loop))))
;; first poll ready ports
(match (select readers writers '() 0) ;; FIXME: replace 0 with time for next task
((to-read to-write _)
(for-each call-read-callback to-read)
(for-each call-write-callback to-write)))
;; execute tasks
(while (not (empty? (loop-tasks loop)))
((pop! (loop-tasks loop))))))
(define-public (loop-run-forever)
(let* ((loop (fluid-ref *loop*)))
(loop-running! loop #true)
(while (loop-running loop)
(call-with-prompt 'loop
loop-run-once
(lambda (cc callback) (callback cc))))))
(define-public (loop-running?)
(let ((loop (fluid-ref *loop*)))
(loop-running loop)))
(define-public (loop-stop!)
(let ((loop (fluid-ref *loop*)))
(loop-running! loop #false)))
;; non-blocking replacement for read/write procedures
(use-modules (ice-9 binary-ports))
(define (recv-some port)
(let next ((out '()))
(if (char-ready? port)
(let ((byte (get-u8 port)))
(if (eof-object? byte)
(pk 'eof (u8-list->bytevector (reverse out)))
(next (cons byte out))))
(pk '!ready (u8-list->bytevector (reverse out))))))
(define-public (recv-some/ socket)
(abort-to-prompt 'loop (lambda (cc) (loop-add-reader socket (lambda () (cc (recv-some socket)))))))
(define-public (get-bytevector-some/ socket)
(abort-to-prompt 'loop (lambda (cc) (loop-add-reader socket (lambda () (cc (get-bytevector-some socket)))))))
(define-public (accept/ socket)
(abort-to-prompt 'loop (lambda (cc) (loop-add-reader socket (lambda () (cc (accept socket)))))))
(use-modules (rnrs bytevectors))
(define (bytevector-drop bv count)
(u8-list->bytevector (list-tail (bytevector->u8-list bv) count)))
(define (send-all socket message cc)
(let loop ((message message))
(let* ((count (send socket message))
(message (bytevector-drop message count)))
(if (eq? (bytevector-length message) 0)
(cc)
(loop-add-writer socket (lambda () (loop message)))))))
(define-public (send-all/ socket message)
(abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (send-all socket message cc))))))
;; XXX: those are blocking anyway
;; (define-public (read/ socket)
;; (abort-to-prompt 'loop (lambda (cc) (loop-add-reader socket (lambda () (cc (read socket)))))))
;; (define-public (write/ message socket)
;; (abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (cc (write message socket)))))))
;; (define-public (display/ message socket)
;; (abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (cc (display message socket)))))))
;; (define-public (read-line/ socket)
;; (abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (cc (read-line socket)))))))
;;; basic TCP sockets
(define make-socket socket)
(define-public (make-client-socket port)
(let ((socket (make-socket PF_INET SOCK_STREAM 0)))
(connect socket AF_INET INADDR_LOOPBACK port)
(fcntl socket F_SETFL (logior O_NONBLOCK (fcntl socket F_GETFL)))
socket))
(define-public (make-server-socket port)
(let ((socket (make-socket PF_INET SOCK_STREAM 0)))
(bind socket (make-socket-address AF_INET INADDR_ANY port))
(listen socket 128)
(fcntl socket F_SETFL (logior O_NONBLOCK (fcntl socket F_GETFL)))
socket))