Jade Dungeon

Actor和并发

多线程

synchronized块同步

虽然推荐使用Actor模式,但还是讲一下传统的多线程控制方式吧。

AnyRef类追加了来自Object类的waitnotifynotifyAll方法。并且提供了带 参数的synchronized方法,等同于Java的synchronized块:

account.synchronized {
	account.balance += amount
}

Future

  • scala.concurrent.FutureApply工厂方法创建一个Future的实例,
  • onSuccess注册相成功后执行的回调方法。
  • onFailure注册的回调方法是偏函数。
scala> import scala.concurrent.Future
scala> import scala.concurrent.ExecutionContext.Implicits.global

scala> (1 to 5) foreach { index =>
     |   val future = Future {
     |      Thread.sleep((math.random * 1000).toLong)
     |      index
     |   }
     |
     |   future onSuccess {
     |     case answer: Int => println(s"Result: $answer") 
     |   }
     |
     |   future onFailure { 
     |     case th: Throwable => println(s"Failure: $th") 
     |   }
     | }

scala> Result: 4
Result: 2
Result: 5
Result: 3
Result: 1

使用集合类Future实现多线程

在使用Future前要先通过指定的上下文环境来定义并发方式。 scala.concurrent.ExecutionContext.Implicts.global用Java线程库实现多线程, 默认的ExecutionContext使用Java的ForkJoinPool来管理Java线程池。

scala> import concurrent.ExecutionContext.Implicits.global
import concurrent.ExecutionContext.Implicits.global

scala.concurrent.Future生成异步(或同步)任务:

scala> concurrent.Future { Thread.sleep(5000); println("hi") }
res0: scala.concurrent.Future[Unit] = 
				scala.concurrent.impl.Promise$DefaultPromise@185d8b6

scala> hi

参考FutureapplyonSuccessonFailure的方法声明, 可以看到有一个隐式参数:

def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]

def onSuccess[U](func: (Try[T]) => U)(
    implicit executor: ExecutionContext): Unit

def onFailure[U](callback: PartialFunction[Throwable, U])(
    implicit executor: ExecutionContext): Unit

而在scala.concurrent.ExecutionContextImplicits.global的声明中, 声明了隐式的值:

object Implicits {
	implict val global: ExecutionContextExecutor = 
		impl.ExecutionContextImpl.fromExecutor(null: Executor)
}

异步方式处理Future

Future可以被视为一个一元集合。当前的Future执行完成之后,返回的一个util.Try

  • 成功就是包含结果的Success实例,并把结果传递给另一外Future或是函数。
  • 失败就是Failure,包含异常信息,不会调用下一步。

例子,函数nextFtr在参数大于0时会抛异常:

scala> def nextFtr(i: Int = 0) = concurrent.Future {
     |   Thread.sleep(5000)
     |   if (i > 0) (i + 1) else throw new Exception("Err for Example")
     | }
nextFtr: (i: Int)scala.concurrent.Future[Int]

使用方法onComplete可能在任务完成以后,调用指定函数。 定义在成功或失败的情况下,分别应该进行什么后续操作:

scala> nextFtr(1) onComplete {
     |    case scala.util.Success(value) => println(s"value is: $value")
     |    case scala.util.Failure(e) => e.printStackTrace
     |  }

scala> value is: 2

scala> nextFtr(-1) onComplete {
     |    case scala.util.Success(value) => println(s"value is: $value")
     |    case scala.util.Failure(e) => e.printStackTrace
     |  }

scala> java.lang.Exception
	at $line29.$read$$iw$$iw$$anonfun$nextFtr$1.apply$mcI$sp(<console>:11)
	at $line29.$read$$iw$$iw$$anonfun$nextFtr$1.apply(<console>:9)
	at $line29.$read$$iw$$iw$$anonfun$nextFtr$1.apply(<console>:9)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

还有onSuccessonFailure也用来解决类似的问题:

scala> nextFtr(1) onSuccess { case x =>  println(s"value is: $x") }

value is: 2


scala> nextFtr(-1) onFailure { case e =>  println(e.getMessage) }

Err for Example

使用方法fallbackTo可以在第一个Future失败时调用第二个Future:

scala> nextFtr(-1) fallbackTo nextFtr(665) onSuccess {
     |   case x => println(s"value is: $x")
     | }

value is: 666

flatMap方法在第一个Future成功的情况下,以结果为参数调用下一个Future:

scala> nextFtr(1) flatMap nextFtr onSuccess {
     |   case x => println(s"value is: $x")
     | }

value is: 3

map方法在成功后调用指定函数:

scala> nextFtr(3) map (_ * 2) onSuccess {
     |   case x => println(s"value is: $x")
     | }

value is: 8

Future.sequence会「并行」地调用指定序列中所有Future, 全部成功就返回所有结果的列表,如果有失败的话,返回第一个出现的异常:

scala> concurrent.Future sequence (nextFtr(1) :: nextFtr(2) :: nextFtr(3) :: Nil) onSuccess {
     |   case x => println(s"value is: $x")
     | }

scala> value is: List(2, 3, 4)

scala> concurrent.Future sequence (nextFtr(1) :: nextFtr(-2) :: nextFtr(3) :: Nil) onFailure {
     |   case e => println(e.getMessage)
     | }

Err for Example

同步处理Future

concurrent.Await.result()限制在指定时间内完成任务,如果超时抛 java.util.concurrent.TimeoutException异常, 需要用util.Try来安全地管理超时条件。

例,限时内完成:

scala> concurrent.Await.result(nextFtr(1), Duration(10, SECONDS))
res36: Int = 2

例,超时:

scala> concurrent.Await.result(nextFtr(1), Duration(1, SECONDS))
java.util.concurrent.TimeoutException: Futures timed out after [1 second]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:190)
  ... 33 elided

例,限时内异常:

scala> concurrent.Await.result(nextFtr(-1), Duration(10, SECONDS))
java.lang.Exception: Err for Example
  at $anonfun$nextFtr$1.apply$mcI$sp(<console>:11)
  at $anonfun$nextFtr$1.apply(<console>:9)
  at $anonfun$nextFtr$1.apply(<console>:9)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

如果不想让异常抛出,则要另外用util.Try来包装。

actor和消息传递

actor是一个类似线程的实体,它有一个接收消息的邮箱。

实现Actor特质

Scala里实现actor的方法是混入scala.actors.Actor特质并实现其act()方法。 下面一个简单的例子都不检查它的邮箱,只是把一个消息打印五次然后退出:

  import scala.actors._

  object SillyActor extends Actor {
    def act() { 
      for (i <- 1 to 5) {
        println("I'm acting!")
        Thread.sleep(1000)
      }
    }
  }

注意:如果没有继承任何类,可以用关键字extends来混入特质。

start()方法启动

启动方法和Java线程类似,也是调用start()方法:

  scala> SillyActor.start()
  I'm acting!
  res4: scala.actors.Actor = SillyActor$@1945696

  scala> I'm acting!
  I'm acting!
  I'm acting!
  I'm acting!

注意输出的文本与Scala交互程序的回显交错在一起了,因为这是在另一个独立的线程里。 所以运行是独立的。再来看第二版的actor:

  import scala.actors._

  object SeriousActor extends Actor {
    def act() { 
      for (i <- 1 to 5) {
        println("To be or not to be.")
        Thread.sleep(1000)
      }
    }
  }

这里同时运行两个actor:

  scala> SillyActor.start(); SeriousActor.start()
  res3: scala.actors.Actor = seriousActor$@1689405

  scala> To be or not to be.
  I'm acting!
  To be or not to be.
  I'm acting!
  To be or not to be.
  I'm acting!
  To be or not to be.
  I'm acting!
  To be or not to be.
  I'm acting!

Actor.actor()方法

还可以用伴生对象scala.actor.Actoractor()方法来创建actor,它接收一个函数值/ 闭包做参数:

  scala> import scala.actors.Actor._

  scala> val seriousActor2 = actor {
       |    for (i <- 1 to 5)
       |      println("That is the question.")
       |    Thread.sleep(1000)
       | }

  scala> That is the question.
  That is the question.
  That is the question.
  That is the question.
  That is the question.

注意上面用actor()方法创建的actor在创建后是立即启动的。

!()方法发送消息

actor之间的协作方式是通过!()方法来发送消息,发送操作是并发非阻塞的:

  scala> SillyActor ! "hi there"

虽然消息"hi there"被发给了SillyActor,但消息只是放在邮箱里保持未读状态。

receive()方法接收消息

receive()方法从消息队列中取出一条消息来处理。它可以接收一个闭包作为参数。如 下面这样只是把消息打印出来:

  val echoActor = actor {
    while (true) {
      receive {
        case msg =>
          println("received message: "+ msg)
      }
    }
  }

注意这里的使用的偏函数字面量,除了没有match关键字外,和match表达式样子很像。

当actor发消息时并不会阻塞,收到消息时处理的过程是阻塞的,不会被打断。发送的消息 在接收的actor的邮箱里等待被处理,直到actor调用receive方法:

  scala> echoActor ! "hi there"
  received message: hi there

  scala> echoActor ! 15

  scala> received message: 15

这里要解释一下receive以偏函数字面量作为参数。除了没有match关键字外,和match 表达式样子很像,而且不像match表达式一样要匹配所有的可能性,的会直接忽略掉没有 匹配的情况(即偏函数没有定义)。

正如「模式匹配」一章中「模式无处不在」这一节中所说的,偏函数(可以表示为特质 PartialFunction[Any, T]的实例。其中Tcase =>语句右边表达式计算的结果类型 )并不是完整的函数。所以它不对所有输入值有都定义。除了接受单个apply方法 外,偏函数还提供一个isDefineAt方法,同样只接收单个参数。如果偏函数能处理传给 isDefineAt函数的参数,那么isDefineAt返回true。把这个消息传递给偏函数的 apply方法,apply方法就会处理这个消息。

例如echoActorapply方法会打印"received message: ",然后再输出消息对象的 toString结果。如果邮箱里没有让isDefineAt返回true的消息,则被调用receive 的actor将会被阻塞,直到有匹配的消息。

举例来说,以下是只处理Int类型消息的actor:

  scala> val intActor = actor {
       |   receive {
       |     case x: Int => // I only want Ints
       |       println("Got an Int: "+ x)
       |   }
       | }
  intActor: scala.actors.Actor = 
    scala.actors.Actor$$anon$1@34ba6b

如果是StringDouble类型的消息会被忽略:

  scala> intActor ! "hello"
  scala> intActor ! Math.Pi

给个Int就会有响应:

  scala> intActor ! 12
  Got an Int: 12
处理掉不匹配的消息

注意:如果邮箱可能会被不能匹配case条件的消息挤满,可以用一个case _匹配所有 情况来处理掉那些消息。

处理消息是串行化的

因为消息处理是串行化的,所以不会有并发的竞争状况。比如下面的balance是安全的, 不会出现同时加减:

class AccountActor extends Actor {
	private var balance = 0.0
	
	def act() {
		while (true) {
			receive {
				case Deposit(account)  => balance += amount
				case Withdraw(account) => balance -= amount
			}
		}
	}

}

但是这只能保证同一个Actor对象中的成员安全。如果多个Actor共用的话,必然会有竞争 状况。

所以Actor 绝对不应该 访问自身外部的状态对象,虽然Scala并没有强制。

actor未启动也会收到消息

如果actor还没有启动,收到的消息也会放在队列里,不是不会马上处理。在start()方法 被调用后也会开始处理:

myAct ! "before start 1" 
myAct ! "before start 2" 
myAct ! "before start 3" 
myAct ! "before start 4" 

myAct.start()

myAct ! "after start 1" 
myAct ! "after start 2" 
myAct ! "after start 3" 
myAct ! "after start 4" 

把原生线程当作actor

Actor子系统会管理线程,所以定义了actor以后开发人员不用操心actor与线程的对应关系 是怎么样的。

Java里可以用Thread.current得到当前线程的引用,类似地,Scala里Actor对象的 Actor.self()方法可以取得用当前线程生成的Actor的引用。这样在交互环境可以少写 很多代码,如:

scala> import scala.actors.Actor
import scala.actors.Actor

scala> Actor.self ! "hello"

scala> Actor.self.receive { case x => x }
res2: Any = hello

也可以直接引入Actor._,这样可以把Actor.self简写为self

  scala> import scala.actors.Actor._
  import scala.actors.Actor._

  scala> self ! "hello"

  scala> self.receive { case x => x }
  res6: Any = hello

上面的receive()方法返回由传递给它的偏函数计算出的值。在本例中,偏函数返回消息 本身,所以消息被交互环境打印了出来。

receiveWithin()

因为receive()是阻塞的,所以不知道要等到啥时候。所以如果使用了这项技巧,最好是 用receive()的变种receiveWithin()。因为它可以指定一个以毫秒计的超时时限:

  scala> self.receiveWithin(1000) { case x => x } // wait a sec!
  res7: Any = TIMEOUT

可以用case TIMEOUT来匹配超时的情况,这里指的是Actor.TIMEOUT

future

通过!!()方法表示接收一个Future对象,它表示一个未来会得到的结果,但现在先不 计算它。类似于惰性求值:

val replyFuture = account !! Deposit(1000)

上面的代码因为不求出结果,所以可以看作是不阻塞的。在取出值时阻塞:

val reply = replyFuture()

简单的并行计算例子

要计算从1到n的所有整数累加,可以分成多段并行执行。下面是处理每一段的方法:

import scala.actors._
import scala.actors.Actor._

def sumPart(floor:Int, top:Int) = {
	(0 /: (floor to top)) {
		(sum, i) => { sum + i }
	}
}

把整个区间分成多个段并行执行:

def sumAll(num:Int) = {
	val RANGE = 10
	val partCount = (num.toDouble / RANGE).ceil.toInt
	val caller = self  // this is Actor.self()

	for (i <- 0 until partCount) {
		val floor = i * RANGE + 1;
		val top = num min (i + 1) * RANGE
		actor { caller ! sumPart(floor, top) }  // Actor.actor() create new Actor
	}

	(0 /: (0 until partCount)) {
		(allSum, i) => receive {    // this is Actor.receive()
			case sumInRange:Int => { allSum + sumInRange }
		}
	}
}

能够实现并行操作的关键在于:actor发送消息是不阻塞的,打个比方就是每个发送操作都 在新建的线程中执行。结合上面代码的第9行是在发送的操作中调用的计算局部的方法, 所以每个局部计算的工作都是在新的线程中进行的。

调用:

println( sumAll(100) )

小结

消息传递

每个actor都有自己的消息队列:

  • InputChannel[Any]接收队列。
  • OutputChannel[Any]发送队列。

注意actor传递消息有两个重要的特性:

发送不阻塞

对于以下的代码来说:

op1();
myActor ! "msg"
op2();

即使中间一步发消息的动作如果要花很长时间,感觉也像是op1()执行以后接着就是 op2()的执行,好像发消息的动作不占用时间一样。

接收消息不中断

如果不用receive方法去检查接收队列,actor根本不知道有没有消息进来,所以对于以下 的代码:

op1();
op2();
receive { case msg => println(msg) }

op1()op2()执行过程中如果外部有发消息给当前actor,由于还没有调用receive 方法,actor根本就不知道有消息进来,所以op1()op2()的执行不会被打断。

接收消息阻塞

如果在调用receive方法时消息队列里没有消息,那么receive方法会阻塞。

使用同步的方式传递消息

虽然异步方式能利用并发,但也有需要用同步的情况存在。这时可以用!?()方法,注意 这样会导致阻塞,所以推荐用带超时参数的版本:!?(timeout, msg)。它在正常时返回 Option[Any];超时时返回None,但actor还是会收到这条消息,并把它发给自己。可以 通过一个特殊的case类![a](val ch:Channel[a], val msg:a)表示actor发给自己的消息 ,这样就能得到超时丢失的消息。

sender引用的总是最近一个发送消息的actor,用reply方法也可以说明把消息发送给 sender

如果发送消息给一个actor以后不希望它把响应发回给自己,而是希望它发给另外一个actor ,就可以用send()方法。

import scala.actors._
import Actor._

val fortuneTeller = actor {
    for (i <- 1 to 4) {
        Thread.sleep(1000)
        receive {
            case _ => sender ! "your day will rock! " + i  // reply to sender
            // case _ => reply("your day will rock! " + i) // same as above
        }
    }
}

println( fortuneTeller !? (2000, "what's ahead") ) // async send with timeout
println( fortuneTeller !? (200,  "what's ahead") )

val aPrinter = actor {
    receive { case msg => println("Ah, Fortune message for you: " + msg) }
}

fortuneTeller.send("what's up", aPrinter)  // use send() to send another actor
fortuneTeller ! "How's my future?"

Thread.sleep(3000)
receive { case msg : String => println("received " + msg) }

println("let's get that message")
receive { case !(channel, msg) => println("received belated message " + msg) }

上面的代码最后一行拿到的是超时丢弃的消息。运行时输出如下:

└─(1:pts/4:%)=> scala tmp.scala
Some(your day will rock! 1)
None
Ah, Fortune message for you: your day will rock! 3
received your day will rock! 4
let's get that message
received belated message your day will rock! 2

通过重用线程取得更好的性能

Acter建立在普通的Java线程上,用之前的方法每actor的receive方法个都占一个线程。 就是说,在没有消息的时候,阻塞着的receive方法也会一直占一个线程。为了重用线程, Scala提供了react方法从线程池里取已经有的线程(还有一个带超时的reactWithin) 。它处理消息以后不会返回,返回类型是Nothing。其实在后台,react在完成后会抛出 一个异常。

react方法执行完毕以后就会放弃对线程的占用,所以在react方法后面的代码是执行 不到的。如果还有其他代码要执行,就一定要放在react方法内。

下面同时用了receivereact两个方法比较:

import scala.actors.Actor._

def info(msg: String) = println(msg + " received by " + Thread.currentThread())

def receiveMessage(id : Int) {
  for(i <- 1 to 2) {
      receiveWithin(20000) { 
        case msg : String => info("receive: " + id + msg) }
  }
}

def reactMessage(id : Int) {              
  react {
    case msg : String => info("react:   " + id + msg)
    reactMessage(id)
  }
}

val actors = Array(
  actor { info("react:   1 actor created"); reactMessage(1) },
  actor { info("react:   2 actor created"); reactMessage(2) },
  actor { info("receive: 3 actor created"); receiveMessage(3) },
  actor { info("receive: 4 actor created"); receiveMessage(4) }
  ) 
  
Thread.sleep(1000)               
for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) }
Thread.sleep(2000)
for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) }

调用:

create actor by Thread[ForkJoinPool-1-worker-3,5,main]
create actor by Thread[ForkJoinPool-1-worker-3,5,main]
create actor by Thread[ForkJoinPool-1-worker-1,5,main]
create actor by Thread[ForkJoinPool-1-worker-5,5,main]
  react: 1hello by Thread[ForkJoinPool-1-worker-7,5,main]
  react: 2hello by Thread[ForkJoinPool-1-worker-5,5,main]
receive: 3hello by Thread[ForkJoinPool-1-worker-1,5,main]
receive: 4hello by Thread[ForkJoinPool-1-worker-3,5,main]
  react: 1hello by Thread[ForkJoinPool-1-worker-5,5,main]
  react: 2hello by Thread[ForkJoinPool-1-worker-5,5,main]
receive: 3hello by Thread[ForkJoinPool-1-worker-1,5,main]
receive: 4hello by Thread[ForkJoinPool-1-worker-3,5,main]

从输出可以看到,同一个actor的receiveWithin方法总是在同一个线程。如:

  • actor3总是在线程Thread[ForkJoinPool-1-worker-1,5,main]中;
  • actor4总是在同一个线程Thread[ForkJoinPool-1-worker-3,5,main]
  • react()方法就不固定了,可能会多个actor共用一个线程,也可能会一个actor用多个 线程。

因为react不返回所以接收消息处理器必须同时处理消息并执行actor所有剩下的工作。 通常做法是用一个顶级的工作方法,比如act自身,以供处理器在完成时调用。下面的 例子显示了这种方式。等待内容为主机名的字符串,如果有的话,返回那个主机名对应的 IP地址:

  object NameResolver extends Actor {
    import java.net.{InetAddress, UnknownHostException}

    def act() { 
      react {
        case (name: String, actor: Actor) =>
          actor ! getIp(name)
          act()                         // 尾递归再次调用
        case "EXIT" =>
          println("Name resolver exiting.")
          // quit
        case msg =>
          println("Unhandled message: "+ msg)
          act()                         // 尾递归再次调用
      }
    }

    def getIp(name: String): Option[InetAddress] = {
      try {
        Some(InetAddress.getByName(name))
      } catch {
        case _:UnknownHostException => None
      }
    }
  }

调用方式:

  scala> NameResolver.start()
  res0: scala.actors.Actor = NameResolver$@90d6c5

  scala> NameResolver ! ("www.scala-lang.org", self)

  scala> self.receiveWithin(0) { case x => x }
  res2: Any = Some(www.scala-lang.org/128.178.154.102)

  scala> NameResolver ! ("wwwwww.scala-lang.org", self)

  scala> self.receiveWithin(0) { case x => x }
  res4: Any = None

因为不返回所以当前线程的调用栈可以被再次使用,所以如果全用react的话只要一个 线程就可以运行所有的actor。实例应用中应该用少用几个receive,尽量多用react

react工作原理

返回类型Nothing表示永远不会正常返回的函数,相反,总是以一个异常方式完成。

因为react会因为异常退出,所以不能把它放在while循环中。如:

def act() {
	while (true) {
		react {
			case Withdraw(amount) => println("Withdrawing " + amount)
		}
	}
}

因为react()方法的参数是一个没有返回对象的偏函数:

{ case Withdraw(amount) => println("Withdrawing " + amount) }

所以它不能返回到循环当中去,所以要实现循环的方法之一是在消息处理器中再次调用 act()方法:

def act() {
	react {
		case Withdraw(amount) => {
			println("Withdrawing " + amount)
			act()
		}
	}
}

这样的作法相当于用一个无穷的递归替换掉无穷的循环。但由于每次react()调用都会 抛出异常,所以栈被清空,不会占用太大的栈空间。

虽然不同版本中实现方式还在不断调整,但可以认为react是这样工作的:

actor的start方法被调用时,start方法会以某种方式确保最终会有某个线程来调用 actor的act方法。如果act调用了react,则react方法会在actor邮箱里查找偏函数 能够处理的消息。如果找到react会安排在未来某个时间处理该消息并抛出异常。如果 没有act还是处于「冷存储」状态等待邮箱里有新消息时重新激活并抛出异常。无论哪种情况 react都会以这个异常的方式完成其执行,act方法也随之结束。调用act的线程会 捕获这个异常,并忘记这个actor,转而处理其他任务。

这就是为什么想要react在处理第一个消息之外做更多的事,就需要在偏函数中再次调用 act方法,或使用其他手段让react再次被调用。

reactWithin

reactWithin在超时没有收到消息时会匹配case TIMEOUT。下面有一个累加器的实现:

import scala.actors._
import Actor._ 

val caller = self

def accumulate() {
  var sum = 0
  reactWithin(500) {
    case number: Int => sum += number
    accumulate()
    case TIMEOUT => 
      println("Timed out! Will send result now")
      caller ! sum
  }
  println("This will not be called...")
}

val accumulator = actor { accumulate() }
accumulator ! 1
accumulator ! 7
accumulator ! 8

receiveWithin(10000) { case result => println("Total is " + result) }

结果是错误的,没有累加输出:

Timed out! Will send result now
Total is 0

问题在于再次调用accumulate方法时对sum产生了新的闭包,可以把sum放到外面作为 全局变量,但这样不够函数式。更加函数式的修改方案是把每次的结果作为参数:

import scala.actors._
import Actor._ 

val caller = self

def accumulate(sum : Int) {
  reactWithin(500) {
    case number: Int => accumulate(sum + number)
    case TIMEOUT => 
      println("Timed out! Will send result now")
      caller ! sum
  }
  println("This will not be called...")
}

val accumulator = actor { accumulate(0) }
accumulator ! 1
accumulator ! 7
accumulator ! 8

receiveWithin(10000) { case result => println("Total is " + result) }

这次得到了正确的结果:

Timed out! Will send result now
Total is 16

loop与loopWhile

由于通过react重用线程的方案非常普遍,所以actor库对此有特别的支持。actor.loop 函数重复执行一个代码块,哪怕代码调用的是react。下面用loop来重写 NameResolveract方法。下面的例子与前一个例子不同的地方是它并不退出来响应 "EXIT"消息,而是一直循环响应消息:

  def act() { 
    loop {
      react {
        case (name: String, actor: Actor) =>
          actor ! getIp(name)
        case msg =>
          println("Unhandled message: " + msg)
      }
    }
  }

还有带循环条件的版本loopWhile(...)

loopWhile(count < max) {
	react {
		...
	}
}

还有eventloop()方法可以制作聘个无穷循套环react()的简化版本,不过前提是偏函数 不会再次调用react

def act() {
	eventloop {
		case Withdraw(amount) => println("Withdrawing " + amount)
	}
}




控制线程执行

Scala会让单例对象Scheduler去运行actor,通过设置Scheduler.impl的各种实现, 可以整个应用的actor的调度策略。

通过下面的例子SingleThreadedScheduler可以控制Scala在主线程里运行actor:

import scala.Actor._
import scala.actors._
import scheduler._

if (args.length > 0 && args(0) == "Single") {
  println("Command-line argument Single found")
  Scheduler.impl = new SingleThreadedScheduler
}
println("Main running in " + Thread.currentThread)

actor { println("Actor1 running in " + Thread.currentThread) }
actor { println("Actor2 running in " + Thread.currentThread) }

receiveWithin(3000) { case _ => }

如果没有参数,两个actor和主线程会分别执行:

$ scala InMainThread.scala
Main running in Thread[main,5,main]
Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main]
Actor2 running in Thread[ForkJoinPool-1-worker-5,5,main]

如果加上参数,会在同一线程中执行:

$ scala InMainThread.scala Single
Command-line argument Single found
Main running in Thread[main,5,main]
Actor1 running in Thread[main,5,main]
Actor2 running in Thread[main,5,main]

上面是例子配置的两个应用的线程调度。其实通过继承Actor特质时改写scheduler() 方法还可以更加细粒度地控制具体actor是在主线程,一部分在新线程中。

import scala.Actor._
import scala.actors._
import scheduler._

trait SingleThreadedActor extends Actor {
  override def scheduler() = new SingleThreadedScheduler
}

class MyActor1 extends Actor {
  def act() = println("Actor1 running in " + Thread.currentThread)
}
                      
class MyActor2 extends SingleThreadedActor {                     
  def act() = println("Actor2 running in " + Thread.currentThread)
}                                                      

println("Main running in " + Thread.currentThread)
new MyActor1().start()
new MyActor2().start()
actor { println("Actor3 running in " + Thread.currentThread) }

receiveWithin(5000) { case _ => }

可以看到主线程与fork出的线程:

$ scala InMainThreadSelective.scala
Main running in Thread[main,5,main]
Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main]
Actor2 running in Thread[main,5,main]
Actor 3 running in Thread[ForkJoinPool-2-worker-7,5,main]

消息通道

除了共享Actor的引用,还可以通过共享消息通道的方式。消息通道有以下优点:

  • 消息通道是类型安全的,只能发送或接受某个特定类型的消息。
  • 防止调用到某个Actor的方法。

消息通道可以是一个OutputChannel特质(带有!方法)或是一个InputChannel特技(带有 receivereact方法)。也可以有Channel特质同时扩展这两个特质。

消息通道的构造函数需要一个Actor作为参数:

val channel - new Channel[Int](someActor)

如果不提供参数,那就是绑定到当前执行的Actor上。

通常,会指定Actor发送消息到一个输出消息通道:

import scala.actors._
import scala.actors.Actor._

case class Compute(data: Seq[Int], result: Channel[Int])

class PrimeCounter extends Actor {
  def isPrime(n: Int) = new java.math.BigInteger("" + n).isProbablePrime(20)

  def act() {
    while (true) {
      receive {
        case Compute(data, continuation) => {
          val answer = data.count(isPrime)
          continuation ! answer // Send the answer to the channel
        }
      }
    }
  }

}

object Main extends App {
  val max = 100000 
  // try with 1000000 
  val nactors = 10
  val groupSize = max / nactors / 10

  val primeCounters = for (i <- 0 until nactors) yield new PrimeCounter
  for (a <- primeCounters) a.start()

  actor {
    val c = new Channel[Int] // A channel to this actor
    
    val groups = (2 to max).grouped(groupSize).zipWithIndex.toSeq
    var count = groups.length
    var total = 0

    for ((g, i) <- groups)  {
      val a = primeCounters(i % primeCounters.length)
      a ! Compute(g, c) // a is an actor, not a channel
    }

    while (count > 0) {
      c.receive { 
        case t => { // The channel receives an Int
          total += t
          count -= 1
        }
      }
    }
    println(total + " primes")
    System.exit(0)
  }
}

注意这里调用的是消息通道的receive()方法而不是Actor自己的。如果要通过Actor来 接收,可以匹配一个!样本类:

receive {
	case !(channel, x) => ...
}

Actor的生命周期

Actor的act()方法在start()方法被调用时开始执行。然后通常情况下都是进入某个 循环:

def act() {
	while (...) {
		receive {
			...
		}
	}
}

会让Actor终止的情况:

  1. act()方法返回。
  2. act()方法因异常终止。
  3. actor调用exit()方法。

exit()方法停止actor

exit()方法是一个受保护的方法,只能被Actor的子类调用。exit()方法会抛出异常 ,以试图停止当前线程的方式停止actor。它还有一个版本接收一个参数作为退出原因:

import scala.actors._
import Actor._

class MyAct extends Actor {
    def act() {
        while (true) {
            receive {
                case "quit" => exit
                case msg    => println(msg)
            }
        }
    }
}

val myAct = new MyAct

myAct.start()

myAct ! "Hello World!" 
myAct ! "nice Day!" 
myAct ! "quit" 

如果不带原因,那就相当于exit('normal)

因为异常停止

如果因为异常终止,退出原因就是UncaughtException样本类的一个实例。它有以下属性 :

  • actor:抛出异常的Actor。
  • message:Some(msg),其中的msg就是该Actor处理的最后一条消息;或为None, 表示Actor在没有来得及处理任何消息时就挂掉了。
  • sender:Some(channel),最后一条消息发送方的输出通道;为None表示Actor在没有 来得及处理任何消息时就挂掉了。
  • thread:退出时所在的线程。
  • cause:相应的异常。

默认情况下所有末处理的异常都会造成一个UncaughtException原因的退出。必要时可以 重写exceptionHandler()方法来改变这个行为,它应该产出一个偏函数,类型为:

PartialFunction[Exception, Unit]

这样如果这个偏函数有效就被调用,Actor以'normal原因退出。

例如在需要不把非受检异常当作非正常的话,可以提供以下处理器:

override def exceptionHandler = {
	case e: RuntimeException => log(e)
}

链接多个Actor

通过link()方法链接多个Actor可以在一个Actor终止得通知另一个:

def act() {
	link(master)
	...

链接是双向的,一个终止会通知另外一个。但是并不是对称的,比如上面的:

link(master)

并不能用:

master.link(self)

来代替。link()方法必须由请求链接的Actor调用。

默认情况下,只要链接到的Actor中有一个以非'normal原因退出,当前Actor就会终止。 退出的原因和链接到的那个Actor一样。

要改变这一行为的方法是设置trapExittrue。这样Acotr会收到一个类型为Exit的 消息,包含了终止的Actor和退出的原因:

override def act() {
	trapExit = true
	link(worker)
	while(...) {
		recieve {
			...
			case Exit(linked, UncaughtException(_, _, _, _, cause)) => ...
			case Exit(linked, reason) => ...
		}
	}
}

让Actor挂掉是很正常的,但要连接到一个监管它的Actor来处理失败的处理方案,如:重新 分配工作、或是重启。

在大型的系统中,可以给Actor分成多个区域,各区域安排一个监管的actor。

对于已经终止的Actor,它们依然保留内部状态与邮箱。可以用它的restart()方法重新 启动。在这样做之前,可能会想要修复内部状态或设置一个标志来表示这个Actor正在以 修复模式运行。另外还需要重新建立链接,因为终止时连接也被移除了。

良好的actor风格

actor不应该阻塞

当一个阻塞时,另一个actor可以会发出别的请求。多个actor等一个阻塞的actor会带来 死锁。actor应该允许某种表示动作可以执行的消息发送给它。通常这样要安排其他的actor 帮助。例如通过调用Thread.sleep代替并阻塞当前的actor,可以创建一个助手actor, 这个actor睡眠并在一定时间后发加一个消息:

  actor {
    Thread.sleep(time)
    mainActor ! "WAKEUP"
  } 

这个助手actor虽然阻塞了,但是由于它不会收到消息,所以这是OK的。主actor可以继续 响应新的请求。

下面的emoteLater方法展示了这种处理方式的用法。它创建了一个新的actor来执行 sleep以便主actor不阻塞。以确保它向正确的actor发送"Emote"消息,我们必须小心地 在主actor中对self求值而不是在助手actor中:

  val sillyActor2 = actor {
    def emoteLater() {
      val mainActor = self
      actor {
        Thread.sleep(1000)
        mainActor ! "Emote"
      }
    }
 
    var emoted = 0
    emoteLater()

    loop {
      react {
        case "Emote" =>
          println("I'm acting!")
          emoted += 1
          if (emoted < 5)
            emoteLater()
        case msg =>
          println("Received: "+ msg)
      }
    }
  }

由于这个actor并不在sleep方法中阻塞,而是助手actor阻塞,所以主actor可以在等待 下次表演前继续做其他的事情。与早先的SillyActor不同,这个actor会在等待下一个 输出的同时继续打印消息:

  scala> sillyActor2 ! "hi there"
  scala> Received: hi there
  I'm acting!
  I'm acting!
  I'm acting!

只通过消息与actor通信

Scala可以混用actor模式与共享数据/锁模式两种方式。但第二种方式有锁死线程和线程 不安全变量的风险。

不要调用Actor的方法

如果调用了Actor的方法那就等于引入所有传统并发的同步问题。

优选不可变消息

虽然actor模式对于线程来说数据是不共享的,但还是有共享数据的情况:就是消息会在 多个actor中传递,如果传递过程中被改变的话是被谁改的都不知道。

而且下面会说到,有些消息会有一个指向自己的引用。所以尽量使用不可变的消息。

让消息自包含

因为actor是非阻塞的,发出一个请求不等响应就做别的事情的。等响应回来都不知道是 响应哪个请求的。所以一个简单的方案是以发出的消息里包含与请求相关的冗余信息。

如果请求是一个不可变的对象,花费代价很少的一个方案是在返回值中包含一个指向请求 自己的引用。这样就知道响应是对应哪个请求的了。

举例,IP查询功能的actor在返回IP时带一个当时请求的主机名该有多好:

 def act() { 
   loop {
     react {
       case (name: String, actor: Actor) =>
         actor ! (name, getIp(name))
     }
   }
 }

另一个增加冗余的方案是为每类消息制作样本类。比如当字符串看起来意义不是很明确:

  lookerUpper ! ("www.scala-lang.org", self)

但有了类型名字看起来就好理解多了:

  case class LookupIP(hostname: String, requester: Actor)

  lookerUpper ! LookupIP("www.scala-lang.org", self)

下面是使用样本类而不是简单的元组来查询IP的程序:

  import scala.actors.Actor._
  import java.net.{InetAddress, UnknownHostException}

  case class LookupIP(name: String, respondTo: Actor)
  case class LookupResult(
    name: String, 
    address: Option[InetAddress]
  )

  object NameResolver2 extends Actor {

    def act() { 
      loop {
        react {
          case LookupIP(name, actor) =>
            actor ! LookupResult(name, getIp(name))
        }
      }
    }

    def getIp(name: String): Option[InetAddress] = {
      // As before (in Listing 30.3)
    }
  }

最小化回复

Actor的目的不是远程调用,任务应该被分发到一个Actor组成的网网络。每个Actor算出 部分答案,发给一个知道如何合并出全部答案的Actor。

监管Actor的终止

Actor会终止是正常的。在大型的系统中,可以给Actor分成多个区域,各区域安排一个监管 的actor。

更加长的例子:并行离散事件模拟

把「有状态对象」一章的电路模拟强化成并行化的。

总体设计

核心思路是把每个模拟的对象都用actor,状态位于actor内部。这样实现并行性。

在类层级上,因为不同的模拟对象可能会有一些共通行为,所以抽象一个Simulant特质 出来。线路、门等都可以加上这个特质:

  trait Simulant extends Actor
  class Wire extends Simulant

时间同步问题:每个参与者都自己的任务列表顺序而不看任务的时间处理任务的话,可能 参与者A正在处理时间点90秒的任务;参与者B已经在处理时间点100的任务。A的任务要与 B的任务协作时就有问题。所以要同步,任何参与者都不应该当其他参与者完成时间位于 n-1的任务前,处理时间位于n的任务。

专门有一个actor来控制时钟,给模拟对象发出ping消息,模拟对象在准备好让钟走到下 一个时间单位时发出pong消息给时钟:

  case class Ping(time: Int)
  case class Pong(time: Int, from: Actor)

这两个消息里可以没有字段,但这里的冗余是有好处的。ping里的time是为了回写到 pongtime里,知道对应成功的是哪个时间。

现在的原则是每个模拟对象在处理完了要完成的任务以后才会响应ping消息。但问题是 任务也可能是别的模拟对象要发给它的,所以当前任务做完了并不一定是真的做完了, 可能是其他模拟对象还没有把任务发过来。

为了简化这个问题,要再增加两个限制。首先,要假设模拟对象之间不直接发送消息,而 只是相互安排事件日程。其次,它们不向当前的时间点n提交事件,而是给n+1以上的 时间提交事件。虽然有点限制,但对于模拟的事件来说,有点时延还是可以接受的。

还有一个方案:模拟对象可以相互发消息,但是要精心设计一个机制来决定一个actor何时 可以安全地送回一个Pong。每个模拟对象都应该延迟响应Ping消息直到它发出的所有 请求都已经完成了处理。所以传递的消息要加一些额外的内容。

现在假定模拟对象之间不发直接消息,只发送事件日程表。这样还要有一个工作项的日程表 这个日程表也可能由时钟actor所持有。这样时钟actor就可以等到当前时间点所有工作项的 请求都发送完成后再发出Ping消息。Actor们知道收到Ping时就表示当前时间点所有的 工作项都已经拿到了,可以马上送回Pong,因为不会有更多的工作发过来了。时钟由类 Clock实现:

  class Clock extends Actor {
    private var running = false
    private var currentTime = 0
    private var agenda: List[WorkItem] = List()
  }

最后要设计如何设置好一个初始模拟场景。一种自然的方式是在时钟停止的状态下创建模拟 ,添加所有的模拟对象再连接在一起,然后启动时钟。注意一定要绝对确认所有的东西都 连接好了再启动时钟。

所以关键在于如何确定都已经连接好了,方案有很多。这里用最简单的方法是在设置过程中 不向actor发消息,这样当最后一个消息调用返回就知道模拟被完整地构建好了。最终的 编码模式是用常规的方法调用将模拟设置好,然后可以在模拟运行时使用actor发消息。

基本设计完成其他的就比较直接。WorkItem还是像「有状态对象」里实现的那样有一个时间 和一个动作成员,区别在动作模拟,老版本是用无参函数,在这里的并行版本中使用目标 actor和发往该actor的消息:

  case class WorkItem(time: Int, msg: Any, target: Actor)

afterDelay方法变成了发往时钟的AfterDelay消息。无参函数被替换成了一个消息和 一个目标actor:

  case class AfterDelay(delay: Int, msg: Any, target: Actor)

定义要求模拟启动和停止的消息:

  case object Start
  case object Stop

总体设计就是这样,时钟Clock有当前时间和日程表,它只会在Ping了所有模拟对象并 确保它们准备好了以后才往前进。Simulant特质来模拟参与者,它们所代表的参与者们 通过向时钟发送工作项增加到日程表的方式来和其他参与者们通信。

实现模拟架构

Clock类的running初始为false。模拟初始化全都完成以后,会发Start消息给它, 才会变成true。保证初始化完成以后才会运行:

  class Clock extends Actor {
    private var running = false
    private var currentTime = 0
    private var agenda: List[WorkItem] = List()
    private var allSimulants: List[Actor] = List()
    private var busySimulants: Set[Actor] = Set.empty

时钟可以在创建后后马上调用start()方法,因为running还是false。所以不会真的 运行:

  start()

allSimulants是所有模拟对象,不会变所以用列表;busySimulants是正有任务的,会 不断添加删除所以用集。一旦模拟器开始运行,它只有当busySimulants为空时才会进入 下一个时间单位,并把allSimulants的成员全放到busySimulants里:

  def add(sim: Simulant) {
    allSimulants = sim :: allSimulants
  }

以上是时钟状态的描述,再来看时钟活动。它的主循环由两个职责交替:让时钟前进、响应 消息。一旦时间 前进,它只能在至少收到一个消息时才会再次前进,所以把主循环定义为 这两个活动交替是安全的:

  def act() {
    loop {
      if (running && busySimulants.isEmpty)
        advance()

      reactToOneMessage()
    }
  }

时间的推进除了简单地currentTime步进之外还有些额外的工作:

首先,如果日程表是空的而且模拟不是刚刚开始,那么模拟需要退出。

其次,如果日程表非空,则当前时间点所有的工作需要现在进行。

再次,所有的模拟对象要放到busySimulant中,并发Ping消息给它们。等它们全响应 之后才能再次推进时间。

  def advance() {
    if (agenda.isEmpty && currentTime > 0) {
      println("** Agenda empty.  Clock exiting at time "+
              currentTime+".")
      self ! Stop
      return
    }
      
    currentTime += 1
    println("Advancing to time "+currentTime)

    processCurrentEvents()
    for (sim <- allSimulants)
      sim ! Ping(currentTime)
      
    busySimulants = Set.empty ++ allSimulants
  }

处理当前事件只示过是简单地处理所有在日程表最上方时间为currentTime的事件:

  private def processCurrentEvents() {
    val todoNow = agenda.takeWhile(_.time <= currentTime)

    agenda = agenda.drop(todoNow.length)

    for (WorkItem(time, msg, target) <- todoNow) {
      assert(time == currentTime)
      target ! msg
    }
  }

上面的方法有三步:首先,takeWile取出所有当前时间的条目存入todoNow。然后,用 drop从日程表中去掉这些。最后遍历这些条目并向它们的发送目标消息。assert是为了 确保排程器的逻辑没有问题。

有了这些基础工作,处理时间可能收到的消息就会很简单。AfterDelay消息把新的条目 添加到工作队列;Pong消息从忙碌列表中去掉一个;Start让模拟开始;Stop让时钟 停止:

  def reactToOneMessage() {
    react {
      case AfterDelay(delay, msg, target) =>
        val item = WorkItem(currentTime + delay, msg, target)
        agenda = insert(agenda, item)

      case Pong(time, sim) =>
        assert(time == currentTime)
        assert(busySimulants contains sim)
        busySimulants -= sim
          
      case Start => running = true

      case Stop =>
        for (sim <- allSimulants)
          sim ! Stop
        exit()
    }
  }

insert方法和老版本的一样,就不列出来了。它把条目加入日程表并保证顺序正确。

时钟类到这里实现完毕,现在来看模拟特质。本质上说一个Simulant可以是理解模拟消息 StopPing并与它们合作的任何actor。其act方法简单这样实现:

  def act() {
    loop {
      react {
        case Stop => exit()
        case Ping(time) =>
          if (time == 1) simStarting()
          clock ! Pong(time, self)
        case msg => handleSimMessage(msg)
      }
    }
  }

每当收到Stop,它就退出;收到Ping就响应一个Pong;如果Ping的时间为1,则 在返回Pong前调用simStarting让子类可以定义当模拟开始运行时应该发生的行为。 其他的所有消息都由子类来翻译,给抽象方法handleSimMessage方法。

还有一个抽象成员clock记录时钟以回复Ping消息并安排新的工作项。由于在时钟发 消息来以前不会做任何事情,所以可以在对象创建以后马上运行:

  trait Simulant extends Actor {
    val clock: Clock
    def handleSimMessage(msg: Any)
    def simStarting() { }
    def act() {
      loop {
        react {
          case Stop => exit()
          case Ping(time) =>
            if (time == 1) simStarting()
            clock ! Pong(time, self)
          case msg => handleSimMessage(msg)
        }
      }
    }
    start()
  }

实现一个电路模拟

电路类Circuit

  class Circuit {
    val clock = new Clock
    // 模拟消息
    // 延时常量
    // Wire 类和 Gate 类的方法
    // 其他工具方法
  }

模拟消息这块:一旦模拟开始运行,线路和门就只能通过发消息通信,所以要对每一种信息 定义一个消息类型。这样消息只有两种:门要告诉线路改输出状态,线路要告诉门输入状态 改了:

  case class SetSignal(sig: Boolean)
  case class SignalChanged(wire: Wire, sig: Boolean)

电路是有时间延迟的,不过这个值不确定,就先用个val放着吧:

  val WireDelay = 1
  val InverterDelay = 2
  val OrGateDelay = 3
  val AndGateDelay = 3

WireGate类。线路记录当前的信号状态并观察门的列表的模拟对象。因为混入了 Simulant特质所以也需要指定一个时间:

  class Wire(name: String, init: Boolean) extends Simulant {
    def this(name: String) { this(name, false) }
    def this() { this("unnamed") }

    val clock = Circuit.this.clock
    clock.add(this)

    private var sigVal = init
    private var observers: List[Actor] = List()

handleSimMessage方法来处理收到的消息。线路只会收到SetSignal消息设定信号, 注意只有新的信号与当前的信号不同时会改变状态并发出消息通知别的模拟对象:

  def handleSimMessage(msg: Any) {
    msg match {
      case SetSignal(s) =>
        if (s != sigVal) {
          sigVal = s
          signalObservers()
        }
    }
  }
    
  def signalObservers() {
    for (obs <- observers)
      clock ! AfterDelay(
        WireDelay,
        SignalChanged(this, sigVal),
        obs)
  }

上面的代码中将线路初始状态传给任何作为观察者的门也很重要。这个动作只在初始化时做 一次,以后只收改变信号的消息。模拟开始的发送初始信号也只要简单地提供一个 simStarting()方法:

  override def simStarting() { signalObservers() }

还有怎么连接门的问题。用一个新的方法来连接门,并加上一个toString方法:

  def addObserver(obs: Actor) {
    observers = obs :: observers
  }

  override def toString = "Wire("+ name +")"

线路的实现完成,现在再来看门。与门和或门有两个输入而非门只有一个输入,为了代码 简单那就设定所有的门都有两个入口而非门会忽视第二个入口。被忽视的可以设为某个永远 为false的线路:

  private object DummyWire extends Wire("dummy")

通用的门超类:

  abstract class Gate(in1: Wire, in2: Wire, out: Wire)
      extends Simulant {

抽象方法算出输出信号:

  def computeOutput(s1: Boolean, s2: Boolean): Boolean

不同的门有不同的延迟:

  val delay: Int

时钟可以在创建好后马上加上:

  val clock = Circuit.this.clock
  clock.add(this)

连接两个线路作为输入连上门:

  in1.addObserver(this)
  in2.addObserver(this)

门唯一的本地状态就是它每一输入线路最近一次信号。这个状态要保存,因为线路只会在 状态变化时才发信号,而输出信号要两个线路的信号一起才能算出来:

  var s1, s2 = false

响应消息是在收到输入线路信号变化时重新算输出信号,并用SetSignal发消息给输出 线路:

  def handleSimMessage(msg: Any) {
    msg match {
      case SignalChanged(w, sig) =>
        if (w == in1)
          s1 = sig
        if (w == in2)
          s2 = sig
        clock ! AfterDelay(delay,
            SetSignal(computeOutput(s1, s2)),
            out)
    }
  }

具体的门实现就简单了:

  def orGate(in1: Wire, in2: Wire, output: Wire) = 
    new Gate(in1, in2, output) {
      val delay = OrGateDelay
      def computeOutput(s1: Boolean, s2: Boolean) = s1 || s2 
    }
  
  def andGate(in1: Wire, in2: Wire, output: Wire) = 
    new Gate(in1, in2, output) {
      val delay = AndGateDelay
      def computeOutput(s1: Boolean, s2: Boolean) = s1 && s2 
    }

非门有一个假的输入:

  def inverter(input: Wire, output: Wire) = 
    new Gate(input, DummyWire, output) {
      val delay = InverterDelay
      def computeOutput(s1: Boolean, ignored: Boolean) = !s1
    }

模拟电路的功能已经完成了。但增加一个查看器观察线路的变化还是有必要的。定义一个 接收Wire类作为参数的并输出文本的probe方法。这个方法可以简单地制作一个新把 自己连接到指定线路和模拟对象来实现。这个模拟对象可以通过打印出新的信号来响应 SignalChanged消息:

  def probe(wire: Wire) = new Simulant {
    val clock = Circuit.this.clock
    clock.add(this)
    wire.addObserver(this)
    def handleSimMessage(msg: Any) {
      msg match {
        case SignalChanged(w, s) =>
           println("signal "+ w +" changed to "+ s)
      }
    }
  }

这里基本上Circuit类就完成了。调用时创建一个Circuit类实例,和一组门电路,在 需要观察的线路上加上prob,开始模拟。启动模拟的方法只要向时钟发Start消息 就行:

  def start() { clock ! Start }

和前面非并行的版本不同我们把半加器和全加器包装成了Adders特质。由于继承自 Circuit所以它可以访问Circuit的成员如WireorGate

  trait Adders extends Circuit {
    def halfAdder(a: Wire, b: Wire, s: Wire, c: Wire) {
      val d, e = new Wire
      orGate(a, b, d)
      andGate(a, b, c)
      inverter(c, e)
      andGate(d, e, s)
    }

    def fullAdder(a: Wire, b: Wire, cin: Wire,
        sum: Wire, cout: Wire) {
      val s, c1, c2 = new Wire
      halfAdder(a, cin, s, c1)
      halfAdder(b, s, sum, c2)
      orGate(c1, c2, cout)
    }
  }


  val circuit = new Circuit with Adders

上面最后一行的circuit变量保存了拥有所有Circuit方法和所有Adders方法的电路。 这就是通过特质而不是用类来组合逻辑的强大之处。可以混入任何想要用的组件:

  val circuit =
    new Circuit 
      with Adders 
      with Multiplexers
      with FlipFlops
      with MultiCoreProcessors

把所有内容结合在一起

一切大功告成。简单地演示如下:

  object Demo {
    def main(args: Array[String]) {
      val circuit = new Circuit with Adders
      import circuit._

      val ain = new Wire("ain", true)
      val bin = new Wire("bin", false)
      val cin = new Wire("cin", true)
      val sout = new Wire("sout")
      val cout = new Wire("cout")

      probe(ain)
      probe(bin)
      probe(cin)
      probe(sout)
      probe(cout)
    
      fullAdder(ain, bin, cin, sout, cout)

      circuit.start()
    }
  }

这里import了所有成员,所以可以把circuit.probe(ain)简写成probe(ain)

创建了两条输出soutcout。还创建了三条输入线路:ain初始化为truebin 初始化为falsecin初始化为true。意思是给数值01,并带入一个进位1

五条线路上都有probe观测。把线路加到全加器中,模拟开始,输出如下:

  Advancing to time 1
  Advancing to time 2
  signal Wire(cout) changed to false
  signal Wire(cin) changed to true
  signal Wire(ain) changed to true
  signal Wire(sout) changed to false
  signal Wire(bin) changed to false
  Advancing to time 3
  Advancing to time 4
  Advancing to time 5
  Advancing to time 6
  Advancing to time 7
  Advancing to time 8
  Advancing to time 9
  Advancing to time 10
  signal Wire(cout) changed to true
  Advancing to time 11
  Advancing to time 12
  Advancing to time 13
  Advancing to time 14
  Advancing to time 15
  Advancing to time 16
  Advancing to time 17
  Advancing to time 18
  signal Wire(sout) changed to true
  Advancing to time 19
  Advancing to time 20
  Advancing to time 21
  signal Wire(sout) changed to false
  ** Agenda empty.  Clock exiting at time 21.

对于输入1,0,1输出进位1和为0