Actor和并发
多线程
synchronized块同步
虽然推荐使用Actor模式,但还是讲一下传统的多线程控制方式吧。
AnyRef
类追加了来自Object
类的wait
与notify
、notifyAll
方法。并且提供了带
参数的synchronized
方法,等同于Java的synchronized
块:
account.synchronized { account.balance += amount }
Future
-
scala.concurrent.Future
的Apply
工厂方法创建一个Future
的实例, -
onSuccess
注册相成功后执行的回调方法。 -
onFailure
注册的回调方法是偏函数。
scala> import scala.concurrent.Future scala> import scala.concurrent.ExecutionContext.Implicits.global scala> (1 to 5) foreach { index => | val future = Future { | Thread.sleep((math.random * 1000).toLong) | index | } | | future onSuccess { | case answer: Int => println(s"Result: $answer") | } | | future onFailure { | case th: Throwable => println(s"Failure: $th") | } | } scala> Result: 4 Result: 2 Result: 5 Result: 3 Result: 1
使用集合类Future实现多线程
在使用Future前要先通过指定的上下文环境来定义并发方式。
scala.concurrent.ExecutionContext.Implicts.global
用Java线程库实现多线程,
默认的ExecutionContext
使用Java的ForkJoinPool
来管理Java线程池。
scala> import concurrent.ExecutionContext.Implicits.global import concurrent.ExecutionContext.Implicits.global
scala.concurrent.Future
生成异步(或同步)任务:
scala> concurrent.Future { Thread.sleep(5000); println("hi") } res0: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@185d8b6 scala> hi
参考Future
的apply
、onSuccess
、onFailure
的方法声明,
可以看到有一个隐式参数:
def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] def onSuccess[U](func: (Try[T]) => U)( implicit executor: ExecutionContext): Unit def onFailure[U](callback: PartialFunction[Throwable, U])( implicit executor: ExecutionContext): Unit
而在scala.concurrent.ExecutionContext
对Implicits.global
的声明中,
声明了隐式的值:
object Implicits { implict val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor) }
异步方式处理Future
Future可以被视为一个一元集合。当前的Future执行完成之后,返回的一个util.Try
。
-
成功就是包含结果的
Success
实例,并把结果传递给另一外Future或是函数。 -
失败就是
Failure
,包含异常信息,不会调用下一步。
例子,函数nextFtr
在参数大于0时会抛异常:
scala> def nextFtr(i: Int = 0) = concurrent.Future { | Thread.sleep(5000) | if (i > 0) (i + 1) else throw new Exception("Err for Example") | } nextFtr: (i: Int)scala.concurrent.Future[Int]
使用方法onComplete
可能在任务完成以后,调用指定函数。
定义在成功或失败的情况下,分别应该进行什么后续操作:
scala> nextFtr(1) onComplete { | case scala.util.Success(value) => println(s"value is: $value") | case scala.util.Failure(e) => e.printStackTrace | } scala> value is: 2 scala> nextFtr(-1) onComplete { | case scala.util.Success(value) => println(s"value is: $value") | case scala.util.Failure(e) => e.printStackTrace | } scala> java.lang.Exception at $line29.$read$$iw$$iw$$anonfun$nextFtr$1.apply$mcI$sp(<console>:11) at $line29.$read$$iw$$iw$$anonfun$nextFtr$1.apply(<console>:9) at $line29.$read$$iw$$iw$$anonfun$nextFtr$1.apply(<console>:9) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
还有onSuccess
和onFailure
也用来解决类似的问题:
scala> nextFtr(1) onSuccess { case x => println(s"value is: $x") } value is: 2 scala> nextFtr(-1) onFailure { case e => println(e.getMessage) } Err for Example
使用方法fallbackTo
可以在第一个Future失败时调用第二个Future:
scala> nextFtr(-1) fallbackTo nextFtr(665) onSuccess { | case x => println(s"value is: $x") | } value is: 666
flatMap
方法在第一个Future成功的情况下,以结果为参数调用下一个Future:
scala> nextFtr(1) flatMap nextFtr onSuccess { | case x => println(s"value is: $x") | } value is: 3
map
方法在成功后调用指定函数:
scala> nextFtr(3) map (_ * 2) onSuccess { | case x => println(s"value is: $x") | } value is: 8
Future.sequence
会「并行」地调用指定序列中所有Future,
全部成功就返回所有结果的列表,如果有失败的话,返回第一个出现的异常:
scala> concurrent.Future sequence (nextFtr(1) :: nextFtr(2) :: nextFtr(3) :: Nil) onSuccess { | case x => println(s"value is: $x") | } scala> value is: List(2, 3, 4) scala> concurrent.Future sequence (nextFtr(1) :: nextFtr(-2) :: nextFtr(3) :: Nil) onFailure { | case e => println(e.getMessage) | } Err for Example
同步处理Future
concurrent.Await.result()
限制在指定时间内完成任务,如果超时抛
java.util.concurrent.TimeoutException
异常,
需要用util.Try
来安全地管理超时条件。
例,限时内完成:
scala> concurrent.Await.result(nextFtr(1), Duration(10, SECONDS)) res36: Int = 2
例,超时:
scala> concurrent.Await.result(nextFtr(1), Duration(1, SECONDS)) java.util.concurrent.TimeoutException: Futures timed out after [1 second] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) ... 33 elided
例,限时内异常:
scala> concurrent.Await.result(nextFtr(-1), Duration(10, SECONDS)) java.lang.Exception: Err for Example at $anonfun$nextFtr$1.apply$mcI$sp(<console>:11) at $anonfun$nextFtr$1.apply(<console>:9) at $anonfun$nextFtr$1.apply(<console>:9) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
如果不想让异常抛出,则要另外用util.Try
来包装。
actor和消息传递
actor是一个类似线程的实体,它有一个接收消息的邮箱。
实现Actor特质
Scala里实现actor的方法是混入scala.actors.Actor
特质并实现其act()
方法。
下面一个简单的例子都不检查它的邮箱,只是把一个消息打印五次然后退出:
import scala.actors._ object SillyActor extends Actor { def act() { for (i <- 1 to 5) { println("I'm acting!") Thread.sleep(1000) } } }
注意:如果没有继承任何类,可以用关键字extends
来混入特质。
start()方法启动
启动方法和Java线程类似,也是调用start()
方法:
scala> SillyActor.start() I'm acting! res4: scala.actors.Actor = SillyActor$@1945696 scala> I'm acting! I'm acting! I'm acting! I'm acting!
注意输出的文本与Scala交互程序的回显交错在一起了,因为这是在另一个独立的线程里。 所以运行是独立的。再来看第二版的actor:
import scala.actors._ object SeriousActor extends Actor { def act() { for (i <- 1 to 5) { println("To be or not to be.") Thread.sleep(1000) } } }
这里同时运行两个actor:
scala> SillyActor.start(); SeriousActor.start() res3: scala.actors.Actor = seriousActor$@1689405 scala> To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting!
Actor.actor()方法
还可以用伴生对象scala.actor.Actor
的actor()
方法来创建actor,它接收一个函数值/
闭包做参数:
scala> import scala.actors.Actor._ scala> val seriousActor2 = actor { | for (i <- 1 to 5) | println("That is the question.") | Thread.sleep(1000) | } scala> That is the question. That is the question. That is the question. That is the question. That is the question.
注意上面用actor()
方法创建的actor在创建后是立即启动的。
!()方法发送消息
actor之间的协作方式是通过!()
方法来发送消息,发送操作是并发非阻塞的:
scala> SillyActor ! "hi there"
虽然消息"hi there"
被发给了SillyActor
,但消息只是放在邮箱里保持未读状态。
receive()方法接收消息
receive()
方法从消息队列中取出一条消息来处理。它可以接收一个闭包作为参数。如
下面这样只是把消息打印出来:
val echoActor = actor { while (true) { receive { case msg => println("received message: "+ msg) } } }
注意这里的使用的偏函数字面量,除了没有match
关键字外,和match表达式样子很像。
当actor发消息时并不会阻塞,收到消息时处理的过程是阻塞的,不会被打断。发送的消息
在接收的actor的邮箱里等待被处理,直到actor调用receive
方法:
scala> echoActor ! "hi there" received message: hi there scala> echoActor ! 15 scala> received message: 15
这里要解释一下receive
以偏函数字面量作为参数。除了没有match
关键字外,和match
表达式样子很像,而且不像match表达式一样要匹配所有的可能性,的会直接忽略掉没有
匹配的情况(即偏函数没有定义)。
正如「模式匹配」一章中「模式无处不在」这一节中所说的,偏函数(可以表示为特质
PartialFunction[Any, T]
的实例。其中T
是case =>
语句右边表达式计算的结果类型
)并不是完整的函数。所以它不对所有输入值有都定义。除了接受单个apply
方法
外,偏函数还提供一个isDefineAt
方法,同样只接收单个参数。如果偏函数能处理传给
isDefineAt
函数的参数,那么isDefineAt
返回true
。把这个消息传递给偏函数的
apply
方法,apply
方法就会处理这个消息。
例如echoActor
的apply
方法会打印"received message: "
,然后再输出消息对象的
toString
结果。如果邮箱里没有让isDefineAt
返回true
的消息,则被调用receive
的actor将会被阻塞,直到有匹配的消息。
举例来说,以下是只处理Int
类型消息的actor:
scala> val intActor = actor { | receive { | case x: Int => // I only want Ints | println("Got an Int: "+ x) | } | } intActor: scala.actors.Actor = scala.actors.Actor$$anon$1@34ba6b
如果是String
或Double
类型的消息会被忽略:
scala> intActor ! "hello" scala> intActor ! Math.Pi
给个Int
就会有响应:
scala> intActor ! 12 Got an Int: 12
处理掉不匹配的消息
注意:如果邮箱可能会被不能匹配case
条件的消息挤满,可以用一个case _
匹配所有
情况来处理掉那些消息。
处理消息是串行化的
因为消息处理是串行化的,所以不会有并发的竞争状况。比如下面的balance
是安全的,
不会出现同时加减:
class AccountActor extends Actor { private var balance = 0.0 def act() { while (true) { receive { case Deposit(account) => balance += amount case Withdraw(account) => balance -= amount } } } }
但是这只能保证同一个Actor对象中的成员安全。如果多个Actor共用的话,必然会有竞争 状况。
所以Actor 绝对不应该 访问自身外部的状态对象,虽然Scala并没有强制。
actor未启动也会收到消息
如果actor还没有启动,收到的消息也会放在队列里,不是不会马上处理。在start()
方法
被调用后也会开始处理:
myAct ! "before start 1" myAct ! "before start 2" myAct ! "before start 3" myAct ! "before start 4" myAct.start() myAct ! "after start 1" myAct ! "after start 2" myAct ! "after start 3" myAct ! "after start 4"
把原生线程当作actor
Actor子系统会管理线程,所以定义了actor以后开发人员不用操心actor与线程的对应关系 是怎么样的。
Java里可以用Thread.current
得到当前线程的引用,类似地,Scala里Actor
对象的
Actor.self()
方法可以取得用当前线程生成的Actor的引用。这样在交互环境可以少写
很多代码,如:
scala> import scala.actors.Actor import scala.actors.Actor scala> Actor.self ! "hello" scala> Actor.self.receive { case x => x } res2: Any = hello
也可以直接引入Actor._
,这样可以把Actor.self
简写为self
:
scala> import scala.actors.Actor._ import scala.actors.Actor._ scala> self ! "hello" scala> self.receive { case x => x } res6: Any = hello
上面的receive()
方法返回由传递给它的偏函数计算出的值。在本例中,偏函数返回消息
本身,所以消息被交互环境打印了出来。
receiveWithin()
因为receive()
是阻塞的,所以不知道要等到啥时候。所以如果使用了这项技巧,最好是
用receive()
的变种receiveWithin()
。因为它可以指定一个以毫秒计的超时时限:
scala> self.receiveWithin(1000) { case x => x } // wait a sec! res7: Any = TIMEOUT
可以用case TIMEOUT
来匹配超时的情况,这里指的是Actor.TIMEOUT
。
future
通过!!()
方法表示接收一个Future对象,它表示一个未来会得到的结果,但现在先不
计算它。类似于惰性求值:
val replyFuture = account !! Deposit(1000)
上面的代码因为不求出结果,所以可以看作是不阻塞的。在取出值时阻塞:
val reply = replyFuture()
简单的并行计算例子
要计算从1到n的所有整数累加,可以分成多段并行执行。下面是处理每一段的方法:
import scala.actors._ import scala.actors.Actor._ def sumPart(floor:Int, top:Int) = { (0 /: (floor to top)) { (sum, i) => { sum + i } } }
把整个区间分成多个段并行执行:
def sumAll(num:Int) = { val RANGE = 10 val partCount = (num.toDouble / RANGE).ceil.toInt val caller = self // this is Actor.self() for (i <- 0 until partCount) { val floor = i * RANGE + 1; val top = num min (i + 1) * RANGE actor { caller ! sumPart(floor, top) } // Actor.actor() create new Actor } (0 /: (0 until partCount)) { (allSum, i) => receive { // this is Actor.receive() case sumInRange:Int => { allSum + sumInRange } } } }
能够实现并行操作的关键在于:actor发送消息是不阻塞的,打个比方就是每个发送操作都 在新建的线程中执行。结合上面代码的第9行是在发送的操作中调用的计算局部的方法, 所以每个局部计算的工作都是在新的线程中进行的。
调用:
println( sumAll(100) )
小结
消息传递
每个actor都有自己的消息队列:
-
InputChannel[Any]
接收队列。 -
OutputChannel[Any]
发送队列。
注意actor传递消息有两个重要的特性:
发送不阻塞
对于以下的代码来说:
op1(); myActor ! "msg" op2();
即使中间一步发消息的动作如果要花很长时间,感觉也像是op1()
执行以后接着就是
op2()
的执行,好像发消息的动作不占用时间一样。
接收消息不中断
如果不用receive
方法去检查接收队列,actor根本不知道有没有消息进来,所以对于以下
的代码:
op1(); op2(); receive { case msg => println(msg) }
在op1()
和op2()
执行过程中如果外部有发消息给当前actor,由于还没有调用receive
方法,actor根本就不知道有消息进来,所以op1()
和op2()
的执行不会被打断。
接收消息阻塞
如果在调用receive
方法时消息队列里没有消息,那么receive
方法会阻塞。
使用同步的方式传递消息
虽然异步方式能利用并发,但也有需要用同步的情况存在。这时可以用!?()
方法,注意
这样会导致阻塞,所以推荐用带超时参数的版本:!?(timeout, msg)
。它在正常时返回
Option[Any]
;超时时返回None
,但actor还是会收到这条消息,并把它发给自己。可以
通过一个特殊的case类![a](val ch:Channel[a], val msg:a)
表示actor发给自己的消息
,这样就能得到超时丢失的消息。
sender
引用的总是最近一个发送消息的actor,用reply
方法也可以说明把消息发送给
sender
。
如果发送消息给一个actor以后不希望它把响应发回给自己,而是希望它发给另外一个actor
,就可以用send()
方法。
import scala.actors._ import Actor._ val fortuneTeller = actor { for (i <- 1 to 4) { Thread.sleep(1000) receive { case _ => sender ! "your day will rock! " + i // reply to sender // case _ => reply("your day will rock! " + i) // same as above } } } println( fortuneTeller !? (2000, "what's ahead") ) // async send with timeout println( fortuneTeller !? (200, "what's ahead") ) val aPrinter = actor { receive { case msg => println("Ah, Fortune message for you: " + msg) } } fortuneTeller.send("what's up", aPrinter) // use send() to send another actor fortuneTeller ! "How's my future?" Thread.sleep(3000) receive { case msg : String => println("received " + msg) } println("let's get that message") receive { case !(channel, msg) => println("received belated message " + msg) }
上面的代码最后一行拿到的是超时丢弃的消息。运行时输出如下:
└─(1:pts/4:%)=> scala tmp.scala Some(your day will rock! 1) None Ah, Fortune message for you: your day will rock! 3 received your day will rock! 4 let's get that message received belated message your day will rock! 2
通过重用线程取得更好的性能
Acter建立在普通的Java线程上,用之前的方法每actor的receive
方法个都占一个线程。
就是说,在没有消息的时候,阻塞着的receive
方法也会一直占一个线程。为了重用线程,
Scala提供了react
方法从线程池里取已经有的线程(还有一个带超时的reactWithin
)
。它处理消息以后不会返回,返回类型是Nothing
。其实在后台,react
在完成后会抛出
一个异常。
react
方法执行完毕以后就会放弃对线程的占用,所以在react
方法后面的代码是执行
不到的。如果还有其他代码要执行,就一定要放在react
方法内。
下面同时用了receive
和react
两个方法比较:
import scala.actors.Actor._ def info(msg: String) = println(msg + " received by " + Thread.currentThread()) def receiveMessage(id : Int) { for(i <- 1 to 2) { receiveWithin(20000) { case msg : String => info("receive: " + id + msg) } } } def reactMessage(id : Int) { react { case msg : String => info("react: " + id + msg) reactMessage(id) } } val actors = Array( actor { info("react: 1 actor created"); reactMessage(1) }, actor { info("react: 2 actor created"); reactMessage(2) }, actor { info("receive: 3 actor created"); receiveMessage(3) }, actor { info("receive: 4 actor created"); receiveMessage(4) } ) Thread.sleep(1000) for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) } Thread.sleep(2000) for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) }
调用:
create actor by Thread[ForkJoinPool-1-worker-3,5,main] create actor by Thread[ForkJoinPool-1-worker-3,5,main] create actor by Thread[ForkJoinPool-1-worker-1,5,main] create actor by Thread[ForkJoinPool-1-worker-5,5,main] react: 1hello by Thread[ForkJoinPool-1-worker-7,5,main] react: 2hello by Thread[ForkJoinPool-1-worker-5,5,main] receive: 3hello by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4hello by Thread[ForkJoinPool-1-worker-3,5,main] react: 1hello by Thread[ForkJoinPool-1-worker-5,5,main] react: 2hello by Thread[ForkJoinPool-1-worker-5,5,main] receive: 3hello by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4hello by Thread[ForkJoinPool-1-worker-3,5,main]
从输出可以看到,同一个actor的receiveWithin
方法总是在同一个线程。如:
-
actor3总是在线程
Thread[ForkJoinPool-1-worker-1,5,main]
中; -
actor4总是在同一个线程
Thread[ForkJoinPool-1-worker-3,5,main]
; -
react()
方法就不固定了,可能会多个actor共用一个线程,也可能会一个actor用多个 线程。
因为react
不返回所以接收消息处理器必须同时处理消息并执行actor所有剩下的工作。
通常做法是用一个顶级的工作方法,比如act
自身,以供处理器在完成时调用。下面的
例子显示了这种方式。等待内容为主机名的字符串,如果有的话,返回那个主机名对应的
IP地址:
object NameResolver extends Actor { import java.net.{InetAddress, UnknownHostException} def act() { react { case (name: String, actor: Actor) => actor ! getIp(name) act() // 尾递归再次调用 case "EXIT" => println("Name resolver exiting.") // quit case msg => println("Unhandled message: "+ msg) act() // 尾递归再次调用 } } def getIp(name: String): Option[InetAddress] = { try { Some(InetAddress.getByName(name)) } catch { case _:UnknownHostException => None } } }
调用方式:
scala> NameResolver.start() res0: scala.actors.Actor = NameResolver$@90d6c5 scala> NameResolver ! ("www.scala-lang.org", self) scala> self.receiveWithin(0) { case x => x } res2: Any = Some(www.scala-lang.org/128.178.154.102) scala> NameResolver ! ("wwwwww.scala-lang.org", self) scala> self.receiveWithin(0) { case x => x } res4: Any = None
因为不返回所以当前线程的调用栈可以被再次使用,所以如果全用react
的话只要一个
线程就可以运行所有的actor。实例应用中应该用少用几个receive
,尽量多用react
。
react工作原理
返回类型Nothing
表示永远不会正常返回的函数,相反,总是以一个异常方式完成。
因为react
会因为异常退出,所以不能把它放在while
循环中。如:
def act() { while (true) { react { case Withdraw(amount) => println("Withdrawing " + amount) } } }
因为react()
方法的参数是一个没有返回对象的偏函数:
{ case Withdraw(amount) => println("Withdrawing " + amount) }
所以它不能返回到循环当中去,所以要实现循环的方法之一是在消息处理器中再次调用
act()
方法:
def act() { react { case Withdraw(amount) => { println("Withdrawing " + amount) act() } } }
这样的作法相当于用一个无穷的递归替换掉无穷的循环。但由于每次react()
调用都会
抛出异常,所以栈被清空,不会占用太大的栈空间。
虽然不同版本中实现方式还在不断调整,但可以认为react
是这样工作的:
actor的start
方法被调用时,start
方法会以某种方式确保最终会有某个线程来调用
actor的act
方法。如果act
调用了react
,则react
方法会在actor邮箱里查找偏函数
能够处理的消息。如果找到react
会安排在未来某个时间处理该消息并抛出异常。如果
没有act还是处于「冷存储」状态等待邮箱里有新消息时重新激活并抛出异常。无论哪种情况
react
都会以这个异常的方式完成其执行,act
方法也随之结束。调用act
的线程会
捕获这个异常,并忘记这个actor,转而处理其他任务。
这就是为什么想要react
在处理第一个消息之外做更多的事,就需要在偏函数中再次调用
act
方法,或使用其他手段让react
再次被调用。
reactWithin
reactWithin
在超时没有收到消息时会匹配case TIMEOUT
。下面有一个累加器的实现:
import scala.actors._ import Actor._ val caller = self def accumulate() { var sum = 0 reactWithin(500) { case number: Int => sum += number accumulate() case TIMEOUT => println("Timed out! Will send result now") caller ! sum } println("This will not be called...") } val accumulator = actor { accumulate() } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(10000) { case result => println("Total is " + result) }
结果是错误的,没有累加输出:
Timed out! Will send result now Total is 0
问题在于再次调用accumulate
方法时对sum
产生了新的闭包,可以把sum
放到外面作为
全局变量,但这样不够函数式。更加函数式的修改方案是把每次的结果作为参数:
import scala.actors._ import Actor._ val caller = self def accumulate(sum : Int) { reactWithin(500) { case number: Int => accumulate(sum + number) case TIMEOUT => println("Timed out! Will send result now") caller ! sum } println("This will not be called...") } val accumulator = actor { accumulate(0) } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(10000) { case result => println("Total is " + result) }
这次得到了正确的结果:
Timed out! Will send result now Total is 16
loop与loopWhile
由于通过react
重用线程的方案非常普遍,所以actor库对此有特别的支持。actor.loop
函数重复执行一个代码块,哪怕代码调用的是react
。下面用loop
来重写
NameResolver
的act
方法。下面的例子与前一个例子不同的地方是它并不退出来响应
"EXIT"
消息,而是一直循环响应消息:
def act() { loop { react { case (name: String, actor: Actor) => actor ! getIp(name) case msg => println("Unhandled message: " + msg) } } }
还有带循环条件的版本loopWhile(...)
。
loopWhile(count < max) { react { ... } }
还有eventloop()
方法可以制作聘个无穷循套环react()
的简化版本,不过前提是偏函数
不会再次调用react
:
def act() { eventloop { case Withdraw(amount) => println("Withdrawing " + amount) } }
控制线程执行
Scala会让单例对象Scheduler
去运行actor,通过设置Scheduler.impl
的各种实现,
可以整个应用的actor的调度策略。
通过下面的例子SingleThreadedScheduler
可以控制Scala在主线程里运行actor:
import scala.Actor._ import scala.actors._ import scheduler._ if (args.length > 0 && args(0) == "Single") { println("Command-line argument Single found") Scheduler.impl = new SingleThreadedScheduler } println("Main running in " + Thread.currentThread) actor { println("Actor1 running in " + Thread.currentThread) } actor { println("Actor2 running in " + Thread.currentThread) } receiveWithin(3000) { case _ => }
如果没有参数,两个actor和主线程会分别执行:
$ scala InMainThread.scala Main running in Thread[main,5,main] Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main] Actor2 running in Thread[ForkJoinPool-1-worker-5,5,main]
如果加上参数,会在同一线程中执行:
$ scala InMainThread.scala Single Command-line argument Single found Main running in Thread[main,5,main] Actor1 running in Thread[main,5,main] Actor2 running in Thread[main,5,main]
上面是例子配置的两个应用的线程调度。其实通过继承Actor
特质时改写scheduler()
方法还可以更加细粒度地控制具体actor是在主线程,一部分在新线程中。
import scala.Actor._ import scala.actors._ import scheduler._ trait SingleThreadedActor extends Actor { override def scheduler() = new SingleThreadedScheduler } class MyActor1 extends Actor { def act() = println("Actor1 running in " + Thread.currentThread) } class MyActor2 extends SingleThreadedActor { def act() = println("Actor2 running in " + Thread.currentThread) } println("Main running in " + Thread.currentThread) new MyActor1().start() new MyActor2().start() actor { println("Actor3 running in " + Thread.currentThread) } receiveWithin(5000) { case _ => }
可以看到主线程与fork出的线程:
$ scala InMainThreadSelective.scala Main running in Thread[main,5,main] Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main] Actor2 running in Thread[main,5,main] Actor 3 running in Thread[ForkJoinPool-2-worker-7,5,main]
消息通道
除了共享Actor的引用,还可以通过共享消息通道的方式。消息通道有以下优点:
- 消息通道是类型安全的,只能发送或接受某个特定类型的消息。
- 防止调用到某个Actor的方法。
消息通道可以是一个OutputChannel特质(带有!
方法)或是一个InputChannel特技(带有
receive
或react
方法)。也可以有Channel
特质同时扩展这两个特质。
消息通道的构造函数需要一个Actor作为参数:
val channel - new Channel[Int](someActor)
如果不提供参数,那就是绑定到当前执行的Actor上。
通常,会指定Actor发送消息到一个输出消息通道:
import scala.actors._ import scala.actors.Actor._ case class Compute(data: Seq[Int], result: Channel[Int]) class PrimeCounter extends Actor { def isPrime(n: Int) = new java.math.BigInteger("" + n).isProbablePrime(20) def act() { while (true) { receive { case Compute(data, continuation) => { val answer = data.count(isPrime) continuation ! answer // Send the answer to the channel } } } } } object Main extends App { val max = 100000 // try with 1000000 val nactors = 10 val groupSize = max / nactors / 10 val primeCounters = for (i <- 0 until nactors) yield new PrimeCounter for (a <- primeCounters) a.start() actor { val c = new Channel[Int] // A channel to this actor val groups = (2 to max).grouped(groupSize).zipWithIndex.toSeq var count = groups.length var total = 0 for ((g, i) <- groups) { val a = primeCounters(i % primeCounters.length) a ! Compute(g, c) // a is an actor, not a channel } while (count > 0) { c.receive { case t => { // The channel receives an Int total += t count -= 1 } } } println(total + " primes") System.exit(0) } }
注意这里调用的是消息通道的receive()
方法而不是Actor自己的。如果要通过Actor来
接收,可以匹配一个!
样本类:
receive { case !(channel, x) => ... }
Actor的生命周期
Actor的act()
方法在start()
方法被调用时开始执行。然后通常情况下都是进入某个
循环:
def act() { while (...) { receive { ... } } }
会让Actor终止的情况:
-
act()
方法返回。 -
act()
方法因异常终止。 -
actor
调用exit()
方法。
exit()方法停止actor
exit()
方法是一个受保护的方法,只能被Actor
的子类调用。exit()
方法会抛出异常
,以试图停止当前线程的方式停止actor。它还有一个版本接收一个参数作为退出原因:
import scala.actors._ import Actor._ class MyAct extends Actor { def act() { while (true) { receive { case "quit" => exit case msg => println(msg) } } } } val myAct = new MyAct myAct.start() myAct ! "Hello World!" myAct ! "nice Day!" myAct ! "quit"
如果不带原因,那就相当于exit('normal)
。
因为异常停止
如果因为异常终止,退出原因就是UncaughtException
样本类的一个实例。它有以下属性
:
- actor:抛出异常的Actor。
-
message:
Some(msg)
,其中的msg
就是该Actor处理的最后一条消息;或为None
, 表示Actor在没有来得及处理任何消息时就挂掉了。 -
sender:
Some(channel)
,最后一条消息发送方的输出通道;为None
表示Actor在没有 来得及处理任何消息时就挂掉了。 - thread:退出时所在的线程。
- cause:相应的异常。
默认情况下所有末处理的异常都会造成一个UncaughtException
原因的退出。必要时可以
重写exceptionHandler()
方法来改变这个行为,它应该产出一个偏函数,类型为:
PartialFunction[Exception, Unit]
这样如果这个偏函数有效就被调用,Actor以'normal
原因退出。
例如在需要不把非受检异常当作非正常的话,可以提供以下处理器:
override def exceptionHandler = { case e: RuntimeException => log(e) }
链接多个Actor
通过link()
方法链接多个Actor可以在一个Actor终止得通知另一个:
def act() { link(master) ...
链接是双向的,一个终止会通知另外一个。但是并不是对称的,比如上面的:
link(master)
并不能用:
master.link(self)
来代替。link()
方法必须由请求链接的Actor调用。
默认情况下,只要链接到的Actor中有一个以非'normal
原因退出,当前Actor就会终止。
退出的原因和链接到的那个Actor一样。
要改变这一行为的方法是设置trapExit
为true
。这样Acotr会收到一个类型为Exit
的
消息,包含了终止的Actor和退出的原因:
override def act() { trapExit = true link(worker) while(...) { recieve { ... case Exit(linked, UncaughtException(_, _, _, _, cause)) => ... case Exit(linked, reason) => ... } } }
让Actor挂掉是很正常的,但要连接到一个监管它的Actor来处理失败的处理方案,如:重新 分配工作、或是重启。
在大型的系统中,可以给Actor分成多个区域,各区域安排一个监管的actor。
对于已经终止的Actor,它们依然保留内部状态与邮箱。可以用它的restart()
方法重新
启动。在这样做之前,可能会想要修复内部状态或设置一个标志来表示这个Actor正在以
修复模式运行。另外还需要重新建立链接,因为终止时连接也被移除了。
良好的actor风格
actor不应该阻塞
当一个阻塞时,另一个actor可以会发出别的请求。多个actor等一个阻塞的actor会带来
死锁。actor应该允许某种表示动作可以执行的消息发送给它。通常这样要安排其他的actor
帮助。例如通过调用Thread.sleep
代替并阻塞当前的actor,可以创建一个助手actor,
这个actor睡眠并在一定时间后发加一个消息:
actor { Thread.sleep(time) mainActor ! "WAKEUP" }
这个助手actor虽然阻塞了,但是由于它不会收到消息,所以这是OK的。主actor可以继续 响应新的请求。
下面的emoteLater
方法展示了这种处理方式的用法。它创建了一个新的actor来执行
sleep
以便主actor不阻塞。以确保它向正确的actor发送"Emote"
消息,我们必须小心地
在主actor中对self
求值而不是在助手actor中:
val sillyActor2 = actor { def emoteLater() { val mainActor = self actor { Thread.sleep(1000) mainActor ! "Emote" } } var emoted = 0 emoteLater() loop { react { case "Emote" => println("I'm acting!") emoted += 1 if (emoted < 5) emoteLater() case msg => println("Received: "+ msg) } } }
由于这个actor并不在sleep
方法中阻塞,而是助手actor阻塞,所以主actor可以在等待
下次表演前继续做其他的事情。与早先的SillyActor
不同,这个actor会在等待下一个
输出的同时继续打印消息:
scala> sillyActor2 ! "hi there" scala> Received: hi there I'm acting! I'm acting! I'm acting!
只通过消息与actor通信
Scala可以混用actor模式与共享数据/锁模式两种方式。但第二种方式有锁死线程和线程 不安全变量的风险。
不要调用Actor的方法
如果调用了Actor的方法那就等于引入所有传统并发的同步问题。
优选不可变消息
虽然actor模式对于线程来说数据是不共享的,但还是有共享数据的情况:就是消息会在 多个actor中传递,如果传递过程中被改变的话是被谁改的都不知道。
而且下面会说到,有些消息会有一个指向自己的引用。所以尽量使用不可变的消息。
让消息自包含
因为actor是非阻塞的,发出一个请求不等响应就做别的事情的。等响应回来都不知道是 响应哪个请求的。所以一个简单的方案是以发出的消息里包含与请求相关的冗余信息。
如果请求是一个不可变的对象,花费代价很少的一个方案是在返回值中包含一个指向请求 自己的引用。这样就知道响应是对应哪个请求的了。
举例,IP查询功能的actor在返回IP时带一个当时请求的主机名该有多好:
def act() { loop { react { case (name: String, actor: Actor) => actor ! (name, getIp(name)) } } }
另一个增加冗余的方案是为每类消息制作样本类。比如当字符串看起来意义不是很明确:
lookerUpper ! ("www.scala-lang.org", self)
但有了类型名字看起来就好理解多了:
case class LookupIP(hostname: String, requester: Actor) lookerUpper ! LookupIP("www.scala-lang.org", self)
下面是使用样本类而不是简单的元组来查询IP的程序:
import scala.actors.Actor._ import java.net.{InetAddress, UnknownHostException} case class LookupIP(name: String, respondTo: Actor) case class LookupResult( name: String, address: Option[InetAddress] ) object NameResolver2 extends Actor { def act() { loop { react { case LookupIP(name, actor) => actor ! LookupResult(name, getIp(name)) } } } def getIp(name: String): Option[InetAddress] = { // As before (in Listing 30.3) } }
最小化回复
Actor的目的不是远程调用,任务应该被分发到一个Actor组成的网网络。每个Actor算出 部分答案,发给一个知道如何合并出全部答案的Actor。
监管Actor的终止
Actor会终止是正常的。在大型的系统中,可以给Actor分成多个区域,各区域安排一个监管 的actor。
更加长的例子:并行离散事件模拟
把「有状态对象」一章的电路模拟强化成并行化的。
总体设计
核心思路是把每个模拟的对象都用actor,状态位于actor内部。这样实现并行性。
在类层级上,因为不同的模拟对象可能会有一些共通行为,所以抽象一个Simulant
特质
出来。线路、门等都可以加上这个特质:
trait Simulant extends Actor class Wire extends Simulant
时间同步问题:每个参与者都自己的任务列表顺序而不看任务的时间处理任务的话,可能
参与者A正在处理时间点90秒的任务;参与者B已经在处理时间点100的任务。A的任务要与
B的任务协作时就有问题。所以要同步,任何参与者都不应该当其他参与者完成时间位于
n-1
的任务前,处理时间位于n
的任务。
专门有一个actor来控制时钟,给模拟对象发出ping
消息,模拟对象在准备好让钟走到下
一个时间单位时发出pong
消息给时钟:
case class Ping(time: Int) case class Pong(time: Int, from: Actor)
这两个消息里可以没有字段,但这里的冗余是有好处的。ping
里的time
是为了回写到
pong
的time
里,知道对应成功的是哪个时间。
现在的原则是每个模拟对象在处理完了要完成的任务以后才会响应ping
消息。但问题是
任务也可能是别的模拟对象要发给它的,所以当前任务做完了并不一定是真的做完了,
可能是其他模拟对象还没有把任务发过来。
为了简化这个问题,要再增加两个限制。首先,要假设模拟对象之间不直接发送消息,而
只是相互安排事件日程。其次,它们不向当前的时间点n
提交事件,而是给n+1
以上的
时间提交事件。虽然有点限制,但对于模拟的事件来说,有点时延还是可以接受的。
还有一个方案:模拟对象可以相互发消息,但是要精心设计一个机制来决定一个actor何时
可以安全地送回一个Pong
。每个模拟对象都应该延迟响应Ping
消息直到它发出的所有
请求都已经完成了处理。所以传递的消息要加一些额外的内容。
现在假定模拟对象之间不发直接消息,只发送事件日程表。这样还要有一个工作项的日程表
这个日程表也可能由时钟actor所持有。这样时钟actor就可以等到当前时间点所有工作项的
请求都发送完成后再发出Ping
消息。Actor们知道收到Ping
时就表示当前时间点所有的
工作项都已经拿到了,可以马上送回Pong
,因为不会有更多的工作发过来了。时钟由类
Clock
实现:
class Clock extends Actor { private var running = false private var currentTime = 0 private var agenda: List[WorkItem] = List() }
最后要设计如何设置好一个初始模拟场景。一种自然的方式是在时钟停止的状态下创建模拟 ,添加所有的模拟对象再连接在一起,然后启动时钟。注意一定要绝对确认所有的东西都 连接好了再启动时钟。
所以关键在于如何确定都已经连接好了,方案有很多。这里用最简单的方法是在设置过程中 不向actor发消息,这样当最后一个消息调用返回就知道模拟被完整地构建好了。最终的 编码模式是用常规的方法调用将模拟设置好,然后可以在模拟运行时使用actor发消息。
基本设计完成其他的就比较直接。WorkItem
还是像「有状态对象」里实现的那样有一个时间
和一个动作成员,区别在动作模拟,老版本是用无参函数,在这里的并行版本中使用目标
actor和发往该actor的消息:
case class WorkItem(time: Int, msg: Any, target: Actor)
afterDelay
方法变成了发往时钟的AfterDelay
消息。无参函数被替换成了一个消息和
一个目标actor:
case class AfterDelay(delay: Int, msg: Any, target: Actor)
定义要求模拟启动和停止的消息:
case object Start case object Stop
总体设计就是这样,时钟Clock
有当前时间和日程表,它只会在Ping
了所有模拟对象并
确保它们准备好了以后才往前进。Simulant
特质来模拟参与者,它们所代表的参与者们
通过向时钟发送工作项增加到日程表的方式来和其他参与者们通信。
实现模拟架构
Clock
类的running
初始为false
。模拟初始化全都完成以后,会发Start
消息给它,
才会变成true
。保证初始化完成以后才会运行:
class Clock extends Actor { private var running = false private var currentTime = 0 private var agenda: List[WorkItem] = List() private var allSimulants: List[Actor] = List() private var busySimulants: Set[Actor] = Set.empty
时钟可以在创建后后马上调用start()
方法,因为running
还是false
。所以不会真的
运行:
start()
allSimulants
是所有模拟对象,不会变所以用列表;busySimulants
是正有任务的,会
不断添加删除所以用集。一旦模拟器开始运行,它只有当busySimulants
为空时才会进入
下一个时间单位,并把allSimulants
的成员全放到busySimulants
里:
def add(sim: Simulant) { allSimulants = sim :: allSimulants }
以上是时钟状态的描述,再来看时钟活动。它的主循环由两个职责交替:让时钟前进、响应 消息。一旦时间 前进,它只能在至少收到一个消息时才会再次前进,所以把主循环定义为 这两个活动交替是安全的:
def act() { loop { if (running && busySimulants.isEmpty) advance() reactToOneMessage() } }
时间的推进除了简单地currentTime
步进之外还有些额外的工作:
首先,如果日程表是空的而且模拟不是刚刚开始,那么模拟需要退出。
其次,如果日程表非空,则当前时间点所有的工作需要现在进行。
再次,所有的模拟对象要放到busySimulant
中,并发Ping
消息给它们。等它们全响应
之后才能再次推进时间。
def advance() { if (agenda.isEmpty && currentTime > 0) { println("** Agenda empty. Clock exiting at time "+ currentTime+".") self ! Stop return } currentTime += 1 println("Advancing to time "+currentTime) processCurrentEvents() for (sim <- allSimulants) sim ! Ping(currentTime) busySimulants = Set.empty ++ allSimulants }
处理当前事件只示过是简单地处理所有在日程表最上方时间为currentTime
的事件:
private def processCurrentEvents() { val todoNow = agenda.takeWhile(_.time <= currentTime) agenda = agenda.drop(todoNow.length) for (WorkItem(time, msg, target) <- todoNow) { assert(time == currentTime) target ! msg } }
上面的方法有三步:首先,takeWile
取出所有当前时间的条目存入todoNow
。然后,用
drop
从日程表中去掉这些。最后遍历这些条目并向它们的发送目标消息。assert
是为了
确保排程器的逻辑没有问题。
有了这些基础工作,处理时间可能收到的消息就会很简单。AfterDelay
消息把新的条目
添加到工作队列;Pong
消息从忙碌列表中去掉一个;Start
让模拟开始;Stop
让时钟
停止:
def reactToOneMessage() { react { case AfterDelay(delay, msg, target) => val item = WorkItem(currentTime + delay, msg, target) agenda = insert(agenda, item) case Pong(time, sim) => assert(time == currentTime) assert(busySimulants contains sim) busySimulants -= sim case Start => running = true case Stop => for (sim <- allSimulants) sim ! Stop exit() } }
insert
方法和老版本的一样,就不列出来了。它把条目加入日程表并保证顺序正确。
时钟类到这里实现完毕,现在来看模拟特质。本质上说一个Simulant
可以是理解模拟消息
Stop
和Ping
并与它们合作的任何actor。其act
方法简单这样实现:
def act() { loop { react { case Stop => exit() case Ping(time) => if (time == 1) simStarting() clock ! Pong(time, self) case msg => handleSimMessage(msg) } } }
每当收到Stop
,它就退出;收到Ping
就响应一个Pong
;如果Ping
的时间为1
,则
在返回Pong
前调用simStarting
让子类可以定义当模拟开始运行时应该发生的行为。
其他的所有消息都由子类来翻译,给抽象方法handleSimMessage
方法。
还有一个抽象成员clock
记录时钟以回复Ping
消息并安排新的工作项。由于在时钟发
消息来以前不会做任何事情,所以可以在对象创建以后马上运行:
trait Simulant extends Actor { val clock: Clock def handleSimMessage(msg: Any) def simStarting() { } def act() { loop { react { case Stop => exit() case Ping(time) => if (time == 1) simStarting() clock ! Pong(time, self) case msg => handleSimMessage(msg) } } } start() }
实现一个电路模拟
电路类Circuit
:
class Circuit { val clock = new Clock // 模拟消息 // 延时常量 // Wire 类和 Gate 类的方法 // 其他工具方法 }
模拟消息这块:一旦模拟开始运行,线路和门就只能通过发消息通信,所以要对每一种信息 定义一个消息类型。这样消息只有两种:门要告诉线路改输出状态,线路要告诉门输入状态 改了:
case class SetSignal(sig: Boolean) case class SignalChanged(wire: Wire, sig: Boolean)
电路是有时间延迟的,不过这个值不确定,就先用个val
放着吧:
val WireDelay = 1 val InverterDelay = 2 val OrGateDelay = 3 val AndGateDelay = 3
Wire
和Gate
类。线路记录当前的信号状态并观察门的列表的模拟对象。因为混入了
Simulant
特质所以也需要指定一个时间:
class Wire(name: String, init: Boolean) extends Simulant { def this(name: String) { this(name, false) } def this() { this("unnamed") } val clock = Circuit.this.clock clock.add(this) private var sigVal = init private var observers: List[Actor] = List()
handleSimMessage
方法来处理收到的消息。线路只会收到SetSignal
消息设定信号,
注意只有新的信号与当前的信号不同时会改变状态并发出消息通知别的模拟对象:
def handleSimMessage(msg: Any) { msg match { case SetSignal(s) => if (s != sigVal) { sigVal = s signalObservers() } } } def signalObservers() { for (obs <- observers) clock ! AfterDelay( WireDelay, SignalChanged(this, sigVal), obs) }
上面的代码中将线路初始状态传给任何作为观察者的门也很重要。这个动作只在初始化时做
一次,以后只收改变信号的消息。模拟开始的发送初始信号也只要简单地提供一个
simStarting()
方法:
override def simStarting() { signalObservers() }
还有怎么连接门的问题。用一个新的方法来连接门,并加上一个toString
方法:
def addObserver(obs: Actor) { observers = obs :: observers } override def toString = "Wire("+ name +")"
线路的实现完成,现在再来看门。与门和或门有两个输入而非门只有一个输入,为了代码
简单那就设定所有的门都有两个入口而非门会忽视第二个入口。被忽视的可以设为某个永远
为false
的线路:
private object DummyWire extends Wire("dummy")
通用的门超类:
abstract class Gate(in1: Wire, in2: Wire, out: Wire) extends Simulant {
抽象方法算出输出信号:
def computeOutput(s1: Boolean, s2: Boolean): Boolean
不同的门有不同的延迟:
val delay: Int
时钟可以在创建好后马上加上:
val clock = Circuit.this.clock clock.add(this)
连接两个线路作为输入连上门:
in1.addObserver(this) in2.addObserver(this)
门唯一的本地状态就是它每一输入线路最近一次信号。这个状态要保存,因为线路只会在 状态变化时才发信号,而输出信号要两个线路的信号一起才能算出来:
var s1, s2 = false
响应消息是在收到输入线路信号变化时重新算输出信号,并用SetSignal
发消息给输出
线路:
def handleSimMessage(msg: Any) { msg match { case SignalChanged(w, sig) => if (w == in1) s1 = sig if (w == in2) s2 = sig clock ! AfterDelay(delay, SetSignal(computeOutput(s1, s2)), out) } }
具体的门实现就简单了:
def orGate(in1: Wire, in2: Wire, output: Wire) = new Gate(in1, in2, output) { val delay = OrGateDelay def computeOutput(s1: Boolean, s2: Boolean) = s1 || s2 } def andGate(in1: Wire, in2: Wire, output: Wire) = new Gate(in1, in2, output) { val delay = AndGateDelay def computeOutput(s1: Boolean, s2: Boolean) = s1 && s2 }
非门有一个假的输入:
def inverter(input: Wire, output: Wire) = new Gate(input, DummyWire, output) { val delay = InverterDelay def computeOutput(s1: Boolean, ignored: Boolean) = !s1 }
模拟电路的功能已经完成了。但增加一个查看器观察线路的变化还是有必要的。定义一个
接收Wire
类作为参数的并输出文本的probe
方法。这个方法可以简单地制作一个新把
自己连接到指定线路和模拟对象来实现。这个模拟对象可以通过打印出新的信号来响应
SignalChanged
消息:
def probe(wire: Wire) = new Simulant { val clock = Circuit.this.clock clock.add(this) wire.addObserver(this) def handleSimMessage(msg: Any) { msg match { case SignalChanged(w, s) => println("signal "+ w +" changed to "+ s) } } }
这里基本上Circuit
类就完成了。调用时创建一个Circuit
类实例,和一组门电路,在
需要观察的线路上加上prob
,开始模拟。启动模拟的方法只要向时钟发Start
消息
就行:
def start() { clock ! Start }
和前面非并行的版本不同我们把半加器和全加器包装成了Adders
特质。由于继承自
Circuit
所以它可以访问Circuit
的成员如Wire
和orGate
:
trait Adders extends Circuit { def halfAdder(a: Wire, b: Wire, s: Wire, c: Wire) { val d, e = new Wire orGate(a, b, d) andGate(a, b, c) inverter(c, e) andGate(d, e, s) } def fullAdder(a: Wire, b: Wire, cin: Wire, sum: Wire, cout: Wire) { val s, c1, c2 = new Wire halfAdder(a, cin, s, c1) halfAdder(b, s, sum, c2) orGate(c1, c2, cout) } } val circuit = new Circuit with Adders
上面最后一行的circuit
变量保存了拥有所有Circuit
方法和所有Adders
方法的电路。
这就是通过特质而不是用类来组合逻辑的强大之处。可以混入任何想要用的组件:
val circuit = new Circuit with Adders with Multiplexers with FlipFlops with MultiCoreProcessors
把所有内容结合在一起
一切大功告成。简单地演示如下:
object Demo { def main(args: Array[String]) { val circuit = new Circuit with Adders import circuit._ val ain = new Wire("ain", true) val bin = new Wire("bin", false) val cin = new Wire("cin", true) val sout = new Wire("sout") val cout = new Wire("cout") probe(ain) probe(bin) probe(cin) probe(sout) probe(cout) fullAdder(ain, bin, cin, sout, cout) circuit.start() } }
这里import
了所有成员,所以可以把circuit.probe(ain)
简写成probe(ain)
。
创建了两条输出sout
、cout
。还创建了三条输入线路:ain
初始化为true
、bin
初始化为false
、cin
初始化为true
。意思是给数值0
加1
,并带入一个进位1
。
五条线路上都有probe
观测。把线路加到全加器中,模拟开始,输出如下:
Advancing to time 1 Advancing to time 2 signal Wire(cout) changed to false signal Wire(cin) changed to true signal Wire(ain) changed to true signal Wire(sout) changed to false signal Wire(bin) changed to false Advancing to time 3 Advancing to time 4 Advancing to time 5 Advancing to time 6 Advancing to time 7 Advancing to time 8 Advancing to time 9 Advancing to time 10 signal Wire(cout) changed to true Advancing to time 11 Advancing to time 12 Advancing to time 13 Advancing to time 14 Advancing to time 15 Advancing to time 16 Advancing to time 17 Advancing to time 18 signal Wire(sout) changed to true Advancing to time 19 Advancing to time 20 Advancing to time 21 signal Wire(sout) changed to false ** Agenda empty. Clock exiting at time 21.
对于输入1,0,1
输出进位1
和为0
。