guile-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

ice-9 thread-pool


From: Andy Wingo
Subject: ice-9 thread-pool
Date: Mon, 06 Feb 2012 20:14:45 +0100
User-agent: Gnus/5.13 (Gnus v5.13) Emacs/23.3 (gnu/linux)

Hi,

Related to my previous mail, here is a thread pool implementation.
Comments welcome.

Andy

;;; Thread pools

;; Copyright (C)  2010, 2011, 2012 Free Software Foundation, Inc.

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Lesser General Public License for more details.
;;
;; You should have received a copy of the GNU Lesser General Public
;; License along with this library; if not, write to the Free Software
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
;; 02110-1301 USA

;;; Commentary:
;;;
;;; A simple thread pool implementation.
;;;
;;; Code:

(define-module (ice-9 thread-pool)
  #:use-module (srfi srfi-9)
  #:use-module (srfi srfi-9 gnu)
  #:use-module (ice-9 threads)
  #:export (make-thread-pool
            start-thread-pool!
            pause-thread-pool!
            stop-thread-pool!))


(define-record-type <worker>
  (%make-worker mutex condvar thread handler state)
  worker?
  (mutex worker-mutex)
  (condvar worker-condvar)
  (thread worker-thread set-worker-thread!)
  (handler worker-handler)
  (state worker-state set-worker-state!))

(define (make-worker handler)
  (%make-worker (make-mutex) (make-condition-variable) #f handler 'stopped))

(define* (worker-wait worker #:optional time)
  (if time
      (wait-condition-variable (worker-condvar worker)
                               (worker-mutex worker)
                               time)
      (wait-condition-variable (worker-condvar worker)
                               (worker-mutex worker))))

(define (worker-stopping? worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((running)
         #f)
        ((pausing)
         (set-worker-state! worker 'paused)
         (signal-condition-variable (worker-condvar worker))
         (lp))
        ((paused)
         (worker-wait worker)
         (lp))
        ((stopping)
         #t)
        (else
         (error "bad state" worker))))))

(define (worker-loop worker)
  (let lp ()
    (or (worker-stopping? worker)
        (begin
          ((worker-handler worker))
          (lp)))))

(define (pause-worker worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((running)
         (set-worker-state! worker 'pausing)
         (lp))
        ((pausing)
         #f)
        ((paused)
         #t)
        ((stopping)
         (error "attempt to go stopping -> pausing" worker))
        ((stopped)
         (set-worker-state! worker 'pausing)
         (set-worker-thread! worker
                             (make-thread worker-loop worker))
         (lp))
        (else
         (error "bad state" worker))))))

(define* (wait-for-paused worker #:optional time)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((paused)
         #t)
        (else
         (and (worker-wait worker time)
              (lp)))))))

(define (start-worker worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((running)
         #t)
        ((stopping)
         (error "attempt to go stopping -> running" worker))
        (else
         (set-worker-state! worker 'running)
         (if (worker-thread worker)
             (signal-condition-variable (worker-condvar worker))
             (set-worker-thread! worker (make-thread worker-loop worker)))
         (lp))))))

(define (stop-worker worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((stopped)
         #t)
        ((stopping)
         #f)
        (else
         (set-worker-state! worker 'stopping)
         (lp))))))

(define* (wait-for-stopped worker #:optional time #:key cancel?)
  (let ((thread
         (with-mutex (worker-mutex worker)
           (cond
            ((eq? (worker-state worker) 'stopped)
             #f)
            ((thread-exited? (worker-thread worker))
             (set-worker-thread! worker #f)
             (set-worker-state! worker 'stopped)
             #f)
            (else
             (worker-thread worker))))))
    (or (not thread)
        (begin
          (if time
              (join-thread (worker-thread worker) time)
              (join-thread (worker-thread worker)))
          (cond
           ((thread-exited? thread)
            (wait-for-stopped worker))
           (cancel?
            (cancel-thread (worker-thread worker))
            ;; Assume it works.
            (join-thread (worker-thread worker))
            (wait-for-stopped worker))
           (else
            #f))))))

(define-record-type <thread-pool>
  (%make-thread-pool size workers)
  thread-pool?
  (size thread-pool-size)
  (workers thread-pool-workers))

;; Create a thread pool, and bring it to the "paused" state.
;;
(define (make-thread-pool size handler)
  (let ((pool (%make-thread-pool size
                                 (map (lambda (i) (make-worker handler))
                                      (iota size)))))
    (pause-thread-pool! pool)
    pool))

(define (pause-thread-pool! pool)
  (for-each pause-worker (thread-pool-workers pool))
  (for-each wait-for-paused (thread-pool-workers pool)))

(define (start-thread-pool! pool)
  (for-each start-worker (thread-pool-workers pool)))

(define (stop-thread-pool! pool)
  (for-each stop-worker (thread-pool-workers pool))
  (for-each wait-for-stopped (thread-pool-workers pool)))
-- 
http://wingolog.org/

reply via email to

[Prev in Thread] Current Thread [Next in Thread]