Akka邮箱问题
邮箱配置
指定邮箱的类型
详细的内容在Akka Mailboxes文档中。
除了Balancing Dispatcher的PalancingPool是由Pool中所有的Actor共享一个邮箱, 其他的模式中每个Actor都有自己的邮箱。所以有两个地方可以对邮箱进行配置:
- Actor内部声明对邮箱的配置。
- Dispather中声明对邮箱的配置。
在部署配置中选择邮箱
部署配置中声明的配置是所有邮箱配置中优先级最高的。
例如,对于Actor:
system.actorOf(Props[MyActor], "myactor")
在application.conf
中,通过ActorPath指定Actor的邮箱配置:
akka.actor.deployment { /myactor { // 针对`/user/myactor` mailbox = default-mailbox } }
在代码中选择邮箱
通过Props.withMailbox()
方法可以在创建Actor时分配邮箱:
system.actorOf(Props[MyActor].withMailbox("default-mailbox"))
不同类型的邮箱
默认邮箱类型
default-mailbox
可以用于所有的场合,包括多个Actor共享邮箱的BalancingPool/
BalancingDispatcher。
除非使用BalancingPool,否则默认邮箱的每个实例中的消息只会有一个消费者。
单消费者邮箱
如果不是BalancingPool/BalancingDispatcher模式,那么使用队列实现单消费者类型 SingleConsumerOnlyUnbondedMailbox在很多情况下性能比默认邮箱更好 (具体是不是更好要实际测试性能以后才能肯定)。
在application.conf
中,通过定义新的邮箱类型:
akka.actor.mymailbox { // 新类型的名字 mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" }
Actor就可以使用新类型的邮箱了:
system.actorOf(Props[MyActor].withMailbox("akka.actor.mymailbox"))
阻塞式有限容量邮箱
容量满时继续发消息会导致线程等待直到邮箱腾出空间,这样负载突然变大时, 系统的正常运行可能会受到影响。
非阻塞式有限容量邮箱
容量满时继续发消息会导致消息被丢弃,这样负载突然变大时,系统还可以正常运行。 这种情况发送者要记录请求状态,以便实现超时重试机制,
akka.actor.boundedmailbox { // 新类型的名字 mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox" mailbox-capacity = 1000000 }
system.actorOf(Props[MyActor].withMailbox("akka.actor.boundedmailbox"))
优先级邮箱
缺点是收到新消息后的重新排序会带来性能开销。
控制感知邮箱
ControlMessageAware
邮箱在收到扩展了akka.dispatch.Controlmessage
类型的消息时
,会把这种消息提前到队列头部。可用这种机制来快速响应一些重要的控制信息,
而且性能开销比带优先级的邮箱要少。
自定义邮箱类型
有需要可以自定义新的邮箱类型,本质上来说邮箱就是一个消息的队列。
高负载与响应速度
当面对高负载:
- 较小的邮箱容量丢失消息的风险较大,但是因为队列较短,所以消息的响应速度相对较快。
- 较大的邮箱容量丢失消息的风险较小,但是因为队列较长,所以消息的响应超时的风险较大。
可以看到响应超时的风险一直存在,有时候等待了很长的时间后只收到错误消息作为响应。
熔断
消息先发给熔断器,由熔断器监控应用的响应延迟或错误。当延迟到达了指定的上限, 熔断器的状态就从「关闭」变为「打开」,切断消息的传递。
过一段时间后,熔断器回到「半开」状态,并尝试发一条消息。根据响应时间决定 接下来的状态是「打开」还是「关闭」。
Akka的熔断器不会区别错误、超时与其他类型的失败状态。有必要的话要开发者自己 加入逻辑区分。
监听熔断状态
在生产中知道熔断的时间与发生的方式是很有价值的信息。在akka的熔断器中有
onOpen()
、onClose()
和onHalfOpen()
方法可以重写,添加日志等行为。
熔断的例子
熔断是基于Future而不是Actor,所以不仅Actor可以加入熔断机制,在非akka系统中也可以 加入熔断机制。
case GetRequest(key) => Thread.sleep(70) // 消息处理速度慢于生产速度 val response: Option[Object] = map.get(key) response match { case Some(x) => sender() ! x case None => sender() ! Status.Failure(new KeyNotFoundException(key)) }
定义熔断器,延时超过1秒就熔断:
import akka.pattern.CircuitBreaker val system = ActorSystem("MyActorSystem") implicit val ec = system.dispatcher //used by circuit breaker val breaker = new CircuitBreaker(system.scheduler, maxFailures = 1, // 最大错误数(包括错误和超时) callTimeout = 1 seconds, // 超时时间 resetTimeout = 1 seconds // 熔断后尝试半开时间 ).onOpen( println("circuit breaker opened!") ).onClose( println("circuit breaker closed!") ).onHalfOpen( println("circuit breaker half-open") )
让发消息的速度大于处理消息的速度,并用CiruitBreaker.withCircuitBreaker()
方法
指定熔断器:
val db = system.actorOf(Props[TestDB]) Await.result(db ? SetRequest("userId","007"), 2 seconds) (1 to 1000000).map(x => { Thread.sleep(50) val f = breaker.withCircuitBreaker(db ? GetRequest("userId")) f.map(x => "got it: " + x).recover({ case t => "err: " + t.toString }).foreach(x => println(x)) })
反向压力与Akka Streams
另一种解决压力的方案是反过来,由消费者去拿数据。 这样消费者这一方的系统就不会被压垮。比如典型的Reactive Streams模式。
Akka Streams提供的Reactive Streams模式的一个实现,但还在早期阶段。