Jade Dungeon

ch03 模块化、对象和状态 part04

并发:时间是一个本质问题

并发系统中的时间性质

练习 3.38

(define balance 0)

(define (init) (set! balance 100))
(define (Peter) (set! balance (+ balance 10)))
(define (Paul) (set! balance (- balance 20)))
(define (Mary) (set! balance (- balance (/ balance 2))))
(define (result) (display balance) (newline))

; a) 35, 40, 45, 50
; 1
(init)
(Peter) (Paul) (Mary)
(result) ;45

; 2
(init)
(Peter) (Mary) (Paul)
(result) ;35

; 3
(init)
(Paul) (Peter) (Mary)
(result) ;45

; 4
(init)
(Paul) (Mary) (Peter)
(result) ;50

; 5
(init)
(Mary) (Peter) (Paul)
(result) ;40

; 6
(init)
(Mary) (Paul) (Peter)
(result) ;40

; b) example
;     +-------- [$100] -------+
;     |                       |
;     v                       v
;   Peter                    Paul
; Access: $100             
; New: 100+10=110          Access: $100
; Set: balance=110         New: 100-20=80 
;                          Set: balance=80

控制并发的机制

有很多种控制并发的机制,这里只讨论其中的一种:「串行化组」(serializer)。

对共享变量的串行访问

串行化组的基本思想是:进程可以进发地执行,但是其中有一部分过程是不能并发执行的。

串行化组就是不能并发执行的过程的集合,这个集合里的任何一个过程都不能和 这个集合里的所有过程同时执行。

Scheme里的串行化

parallel-execute过程可以实现多个无参数过程的并行执行,它的具体实现再不讨论。 语法为:

(parallel-execute <p1> <p2> ... <pk>)

例:

(define x 10)

(parallel-execute (lambda () (set! x (* x x)))
                  (lambda () (set! x (+ x 1))))

make-serializer过程定义过程为串行化组的成员,它不可与其他串行并行执行。 以后再来实现它。使用的例子如下:

(define x 10)

(define s (make-serializer))

(parallel-execute (s (lambda () (set! x (* x x))))
                  (s (lambda () (set! x (+ x 1)))))

例子,可以有多个用户同时存取款的程序:

(define (make-account balance)
  (define (withdraw amount)
    (if (>= balance amount)
        (begin (set! balance (- balance amount))
               balance)
        "Insufficient funds"))
  (define (deposit amount)
    (set! balance (+ balance amount))
    balance)
  (let ((protected (make-serializer)))
    (define (dispatch m)
      (cond ((eq? m 'withdraw) (protected withdraw))  ;; 串行组中
            ((eq? m 'deposit) (protected deposit))    ;; 串行组中
            ((eq? m 'balance) balance)
            (else (error "Unknown request -- MAKE-ACCOUNT"
                         m))))
    dispatch))

练习 3.39

(load "3.4-parallel.scm")

(define x 10)

;(define s (make-serializer)) ;; 101 or 121
(define s (make-serializer-slow)) ;; this helps to produce the answer x=100

(parallel-execute
    (lambda () (set! x ((s (lambda () (* x x))))))
    (s (lambda () (set! x (+ x 1)))))

(display x)

; the last form equals to
#|
(define (square x) (* x x))
(parallel-execute
    (lambda () (set! x (square x)))
    (s (lambda () (set! x (+ x 1)))))
|#
; A: [get x] [set! x]
; B: [get x, set! x]
;
; 1) 101
;   A.get x -> 10
;   A.set 100 -> x
;   B.get x -> 100, B.set 101 -> x
;
; 2) 121
;   B.get x -> 10, B.set 11 -> x
;   A.get x -> 11
;   A.set 121 -> x
;
; 3) 100
;   A.get x -> 10
;   B.get x -> 10, B.set 11 -> x
;   A.set x -> 100
;

练习 3.40


(load "3.4-parallel.scm")

(define x 10)

(define s (make-serializer))
(define ss (make-serializer-slow))

(parallel-execute
    (lambda () (set! x (* x x)))
    (lambda () (set! x (* x x x))))

;(display x)

; equals to
#|
(set! x 10)

(parallel-execute
    (lambda ()
        (let ((a x))
            ;possible intersection point
            (let ((b x))
                ;possible intersection point
                (set! x (* a b)))))
    (lambda ()
        (let ((a x))
            ;possible intersection point
            (let ((b x))
                ;possible intersection point
                (let ((c x))
                    ;possible intersection point
                    (set! x (* a b c)))))))
(display x)
; 100
; 1,000
; 10,000
; 100,000
; 1,000,000
;|#

(set! x 10)

(parallel-execute
    (s (lambda () (set! x (* x x x))))
    (s (lambda () (set! x (* x x)))))

(display x)

; only 1,000,000

练习 3.41

(load "3.4-parallel.scm")

(define (make-account balance)
    (define (withdraw amount)
        (if (>= balance amount)
            (begin (set! balance (- balance amount))
                balance)
            "Insufficient funds"))
    (define (deposit amount)
        (set! balance (+ balance amount))
        balance)
    (let ((protected (make-serializer)))
        (define (dispatch m)
            (cond
                ((eq? m 'withdraw) (protected withdraw))
                ((eq? m 'deposit) (protected deposit))
                ((eq? m 'balance)
                    ((protected (lambda () balance)))) ;serialized
                (else (error "unknown request -- MAKE-ACCOUNT" m))))
        dispatch))

; 担心是多余的,只是读取余额并不会导致并发问题,
; 只是在"读取->写入"之间出现交叉的时候才可能导致不一致性。

以下的代码在读余额的操作balance上也加了串行化:

(define (make-account balance)
  (define (withdraw amount)
    (if (>= balance amount)
        (begin (set! balance (- balance amount))
               balance)
        "Insufficient funds"))
  (define (deposit amount)
    (set! balance (+ balance amount))
    balance)
  ;; continued on next page

  (let ((protected (make-serializer)))
    (define (dispatch m)
      (cond ((eq? m 'withdraw) (protected withdraw))
            ((eq? m 'deposit) (protected deposit))
            ((eq? m 'balance)
             ((protected (lambda () balance))))       ;; serialized
            (else (error "Unknown request -- MAKE-ACCOUNT"
                         m))))
    dispatch))

这样是不是有必要? 没有必要,读取变量这个动作在Scheme里是原子操作。

练习 3.42



下面的代码把串行化withdrawdeposit的声明外移到dispatch过程的外面:

(define (make-account balance)
  (define (withdraw amount)
    (if (>= balance amount)
        (begin (set! balance (- balance amount))
               balance)
        "Insufficient funds"))
  (define (deposit amount)
    (set! balance (+ balance amount))
    balance)
  (let ((protected (make-serializer)))
    (let ((protected-withdraw (protected withdraw))  ;; move out
          (protected-deposit (protected deposit)))
      (define (dispatch m)
        (cond ((eq? m 'withdraw) protected-withdraw)
              ((eq? m 'deposit) protected-deposit)
              ((eq? m 'balance) balance)
              (else (error "Unknown request -- MAKE-ACCOUNT"
                           m))))
      dispatch)))

这样做可以么?

从正确性上来说是没有差别的,区别只在于什么时候创建串行化执行函数:

  • 原来版本是在调用withdraw/deposit的时候创建,
  • 而本题则是在调用make-account的时候就先创建好。

使用多重资源的复杂性

当有多个线程访问多个资源的时候,就很难把握并发程序的设计。对于一个资源,需要 锁住所有对它的访问。

练习 3.43

#lang racket

(require "felix.scm")

(define empty-board '())

(define (safe? solution)

    (let ((p (car solution)))
        (define (conflict? q i)
            (or
                (= p q)
                (= p (+ q i))
                (= p (- q i))))
        (define (check-all rest i)
            (cond 
                ((null? rest) #t)
                ((conflict? (car rest) i) #f)
                (else (check-all (cdr rest) (inc i)))))
        (check-all (cdr solution) 1)))

(define (n-queen n)
    (define n-cols (range 1 n 1))
    (define (queen-cols k)
        (if (= k 0)
            (list empty-board)
            (filter
                (lambda (solution) (safe? solution))
                (let ((sub-solutions (queen-cols (dec k))))
                    (flatmap
                        (lambda (col)
                            (map
                                (lambda (sub-solution)
                                    (cons col sub-solution))
                                sub-solutions)) 
                        n-cols)))))
    (queen-cols n))

(length (n-queen 11))

练习 3.44

(define (transfer from-acc to-acc amount)
    ((from-acc 'withdraw) amount)
    ((to-acc 'deposit) amount))

在假定from-acc的余额不小于amount的前提下,这个transfer方法总可以正常地 完成转账操作。

exchange的问题在于,由于需要事先锁定两个账户,有可能会出现死锁:

  • 假定A在执行(exchange x y), B在执行(exchange y x)
  • 如果A获取了x的锁、同时B获取了y的锁,那么两个exchange操作都 无法再继续下去了。

练习 3.45

(serialized-exchange acc1 acc2) 执行的时候,会先获取acc1acc2的锁, 然后去执行 ((acc1 'withdraw) diff),而按照Louis的实现方法, 在withdraw的时候要再次获取acc1的锁,从而导致程序进入死锁状态.

串行化的实现

互拆元(mutex),可以进行「获取」(acquired)和「释放」(release)操作。

这里实现串行化的方式是每个串化组关联着一个互拆元,同一组里只有一个进程可以执行:

(define (make-serializer)
  (let ((mutex (make-mutex)))
    (lambda (p)
      (define (serialized-p . args)
        (mutex 'acquire)
        (let ((val (apply p args)))
          (mutex 'release)
          val))
      serialized-p)))

make-mutex的实现如下。

(define (make-mutex)
  (let ((cell (list false)))   ;; 初始化为false
    (define (the-mutex m)
      (cond ((eq? m 'acquire)  ;; 如果没有被其他占用
             (if (test-and-set! cell)   ;; 可用就设置为真
                 (the-mutex 'acquire))) ;; 不可用就重试
            ((eq? m 'release) (clear! cell))))
    the-mutex))

(define (clear! cell)
  (set-car! cell false))

在分时操作系统里,「不可用则等待一段时间后再重试」并不是真的「忙等待」。 CPU会调度其他的任务。

test-and-set!检查单元并返回结果,如果为假,还会在返回之前把内容设置为真:

(define (test-and-set! cell)
  (if (car cell)
      true
      (begin (set-car! cell true)
             false)))

但是test-and-set!本身的原子性实现依赖于底层硬件。比如在某单处理器的MIT Scheme 里,提供了without-ineterrupts过程来声明原子化操作:

(define (test-and-set! cell)
  (without-interrupts
   (lambda ()
     (if (car cell)
         true
         (begin (set-car! cell true)
                false)))))

练习 3.46

(define (test-and-set! cell)
    (if (car cell)
        true
        (begin
            (set-car! cell true)
            false)))

;   A                       B
;   (car cell)->false   
;                           (car cell) -> false
;   (set-car! cell true)
;       [cell: '(true)]     
;                           (set-car! cell true)
;                                   [cell: '(true)]
;   false                   
;                           false

练习 3.47

; not locked version(wrong)
(define (make-semaphore-mutex n)
    (define (request m)
        (cond
            ((eq? m 'acquire)
                (if (= n 0)
                    (request m)
                    (set! n (- n 1))))
            ((eq? m 'release)
                (set! n (+ n 1)))
            (else (error "unknown request" m))))
    request)

; a) mutex based semaphore

(define (make-semaphore-mutex n)
    (let ((mtx (make-mutex)))
        (define (request m)
            (cond
                ((eq? m 'acquire)
                    (lock-mutex mtx)
                    (cond
                        ((= n 0)
                            (unlock-mutex mtx) ; release first
                            (request m))
                        (else
                            (set! n (- n 1))
                            (unlock-mutex mtx)))) ;enable other threads
                ((eq? m 'release)
                    (lock-mutex mtx)
                    (set! n (+ n 1))
                    (unlock-mutex mtx))
                (else (error "unknown request" m))))
        request))

; b)

; use test-and-set! to replace the mutex in (a)
;  inspired by the solution of [Barry Allison](http://wizardbook.wordpress.com/2010/12/19/exercise-3-47/)
;  WARNING: Barry's solution is not right due to some ill-considered situations.

(load "3.4-mutex.scm")

(define (make-semaphore n)
    (let ((cell (list #t)))
        (define (request m)
            (cond
                ((eq? m 'acquire)
                    (if (test-and-set! cell)
                        (request m)
                        (cond
                            ((= n 0)
                                (clear! cell) ; release first
                                (request m))
                            (else
                                (set! n (- n 1))
                                (clear! cell))))) ;enable other threads
                ((eq? m 'release)
                    (if (test-and-set! cell)
                        (request m)
                        (begin
                            (set! n (+ n 1))
                            (clear! cell))))
                (else (error "unknown request" m))))
        request))

; or use a modified version of test-and-set! (my implementation)
;   much simpler, huh :)

(define (test-and-add! lock d)
    (if (= (+ (car lock) d) 0)
        #t
        (begin
            (set-car! lock (+ (car lock) d))
            #f)))

(define (make-semaphore-add n)
    (let ((lock (list n)))
        (define (request m)
            (cond
                ((eq? m 'acquire)
                    (if (test-and-add! lock -1)
                        (request m)))
                ((eq? m 'release)
                    (if (test-and-add! lock 1)
                        (request m)))
                (else (error "bad request" m))))
        request))

死锁

有一些情况下死锁是无法避免的,这就要求有一种死锁恢复方法。比如很多数据库系统里 都有实现。

练习 3.48

(load "3.4-parallel.scm")

(define (make-account id balance)
    (let ((s (make-serializer)))
        (define (deposit amount)
            (set! balance (+ balance amount)))
        (define (withdraw amount)
            (set! balance (- balance amount)))
        (define (dispatch request)
            (cond
                ((eq? request 'withdraw) withdraw)
                ((eq? request 'deposit) deposit)
                ((eq? request 'balance) balance)
                ((eq? request 'id) id)
                ((eq? request 'serializer) s)
                (else (error "unknown request" request))))
        dispatch))

(define (exchange acc1 acc2)
    (let ((diff (- (acc1 'balance) (acc2 'balance))))
        ((acc1 'withdraw) diff)
        ((acc2 'deposit) diff)))

(define (serialized--exchange acc1 acc2)
    (let
        ((s1 (acc1 'serializer))
         (s2 (acc2 'serializer)))
        (if (< (acc1 'id) (acc2 'id))
            ((s1 (s2 exchange)) acc1 acc2)
            ((s2 (s1 exchange)) acc1 acc2))))

(define acc1 (make-account 1 100))
(define acc2 (make-account 2 150))

(display (acc1 'balance)) (newline)
(display (acc2 'balance)) (newline)

(serialized--exchange acc1 acc2)

(display (acc1 'balance)) (newline)
(display (acc2 'balance)) (newline)
         

练习 3.49

;例如双链表

;建立节点
(define (make-node value prev next)
    (cons value (cons prev next)))

;取消node与下一个节点的链接
(define (remove-next node)
    (let ((next (cddr node)))           ;a1
        (set-cdr! (cdr node) '())       ;a2
        (set-car! (cdr next) '())))     ;a3

;取消node与上一个节点的链接
(define (remove-prev node)
    (let ((prev (cadr node)))           ;b1
        (set-car! (cdr node) '())       ;b2
        (set-cdr! (cdr prev) '())))     ;b3
  • 为了避免在a1a3之间nodenext被修改,必须先锁住node再获取next
  • 同样的,为了避免在b1b3之间prev被修改,必须先锁住node再获取prev

对于链表:() <--> (x) <--> (y) <--> ()

如果(remove-next x)(remove-prev y)同时执行,就可能会出现死锁。

huangz同学的 "联合账户" 例子更赞:

关联帐号的其中一种状况产生的死锁无法使用 练习 3.48 的机制来防止。

假设 peter 和 mary 是两夫妇,他们各自拥有自己的帐号peter-accmary-acc, 并且这两个帐号都将对方的帐号设置成了关联帐号, 也即是:

  • peter-acc的余额不足以支付的时候,它会去提取mary-acc的余额;
  • 而当mary-acc的余额不足以支付的时候,它也回去提取peter-acc的余额。

现在,考虑这样一种情况, peter 和 mary 分别在不同的地方消费, 然后各自账户的余额都不足以支付订单,于是peter-acc尝试访问关联帐号mary-acc, 而mary-acc也在同一时间访问peter-acc,因为两个帐号都已经被打开, 而且两个帐号都试图访问关联帐号,这样就造成了一个死锁:

除非 peter 或 mary 的其中一个主动退出账户,否则支付永远都无法完成。

并发性、时间和通信

有些场景下并不一定要在实时地同步多个资源。比如跨国银行的「周期性平账」, 同一个账户在不同国家里的余额并不是所有的时间都一样的。