Jade Dungeon

Akka邮箱问题

邮箱配置

指定邮箱的类型

详细的内容在Akka Mailboxes文档中。

除了Balancing Dispatcher的PalancingPool是由Pool中所有的Actor共享一个邮箱, 其他的模式中每个Actor都有自己的邮箱。所以有两个地方可以对邮箱进行配置:

  • Actor内部声明对邮箱的配置。
  • Dispather中声明对邮箱的配置。

在部署配置中选择邮箱

部署配置中声明的配置是所有邮箱配置中优先级最高的。

例如,对于Actor:

system.actorOf(Props[MyActor], "myactor")

application.conf中,通过ActorPath指定Actor的邮箱配置:

akka.actor.deployment {
	/myactor { // 针对`/user/myactor`
		mailbox = default-mailbox
	}
}

在代码中选择邮箱

通过Props.withMailbox()方法可以在创建Actor时分配邮箱:

system.actorOf(Props[MyActor].withMailbox("default-mailbox"))

不同类型的邮箱

默认邮箱类型

default-mailbox可以用于所有的场合,包括多个Actor共享邮箱的BalancingPool/ BalancingDispatcher。

除非使用BalancingPool,否则默认邮箱的每个实例中的消息只会有一个消费者。

单消费者邮箱

如果不是BalancingPool/BalancingDispatcher模式,那么使用队列实现单消费者类型 SingleConsumerOnlyUnbondedMailbox在很多情况下性能比默认邮箱更好 (具体是不是更好要实际测试性能以后才能肯定)。

application.conf中,通过定义新的邮箱类型:

akka.actor.mymailbox {  // 新类型的名字
	mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

Actor就可以使用新类型的邮箱了:

system.actorOf(Props[MyActor].withMailbox("akka.actor.mymailbox"))

阻塞式有限容量邮箱

容量满时继续发消息会导致线程等待直到邮箱腾出空间,这样负载突然变大时, 系统的正常运行可能会受到影响。

非阻塞式有限容量邮箱

容量满时继续发消息会导致消息被丢弃,这样负载突然变大时,系统还可以正常运行。 这种情况发送者要记录请求状态,以便实现超时重试机制,

akka.actor.boundedmailbox {  // 新类型的名字
	mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
	mailbox-capacity = 1000000
}
system.actorOf(Props[MyActor].withMailbox("akka.actor.boundedmailbox"))
优先级邮箱

缺点是收到新消息后的重新排序会带来性能开销。

控制感知邮箱

ControlMessageAware邮箱在收到扩展了akka.dispatch.Controlmessage类型的消息时 ,会把这种消息提前到队列头部。可用这种机制来快速响应一些重要的控制信息, 而且性能开销比带优先级的邮箱要少。

自定义邮箱类型

有需要可以自定义新的邮箱类型,本质上来说邮箱就是一个消息的队列。

高负载与响应速度

当面对高负载:

  • 较小的邮箱容量丢失消息的风险较大,但是因为队列较短,所以消息的响应速度相对较快。
  • 较大的邮箱容量丢失消息的风险较小,但是因为队列较长,所以消息的响应超时的风险较大。

可以看到响应超时的风险一直存在,有时候等待了很长的时间后只收到错误消息作为响应。

熔断

消息先发给熔断器,由熔断器监控应用的响应延迟或错误。当延迟到达了指定的上限, 熔断器的状态就从「关闭」变为「打开」,切断消息的传递。

过一段时间后,熔断器回到「半开」状态,并尝试发一条消息。根据响应时间决定 接下来的状态是「打开」还是「关闭」。

Akka的熔断器不会区别错误、超时与其他类型的失败状态。有必要的话要开发者自己 加入逻辑区分。

监听熔断状态

在生产中知道熔断的时间与发生的方式是很有价值的信息。在akka的熔断器中有 onOpen()onClose()onHalfOpen()方法可以重写,添加日志等行为。

熔断的例子

熔断是基于Future而不是Actor,所以不仅Actor可以加入熔断机制,在非akka系统中也可以 加入熔断机制。

case GetRequest(key) =>
	Thread.sleep(70)       // 消息处理速度慢于生产速度
	val response: Option[Object] = map.get(key)
	response match {
		case Some(x) => sender() ! x
		case None    => 
			sender() ! Status.Failure(new KeyNotFoundException(key))
	}

定义熔断器,延时超过1秒就熔断:

import akka.pattern.CircuitBreaker

val system = ActorSystem("MyActorSystem")
implicit val ec = system.dispatcher  //used by circuit breaker

val breaker = new CircuitBreaker(system.scheduler,
	maxFailures = 1,            // 最大错误数(包括错误和超时)
	callTimeout = 1 seconds,    // 超时时间
	resetTimeout = 1 seconds    // 熔断后尝试半开时间
).onOpen(
	println("circuit breaker opened!")
).onClose(
	println("circuit breaker closed!")
).onHalfOpen(
	println("circuit breaker half-open")
)

让发消息的速度大于处理消息的速度,并用CiruitBreaker.withCircuitBreaker()方法 指定熔断器:

val db = system.actorOf(Props[TestDB])
Await.result(db ? SetRequest("userId","007"), 2 seconds)

(1 to 1000000).map(x => {
	Thread.sleep(50)
	val f = breaker.withCircuitBreaker(db ? GetRequest("userId"))
	f.map(x => "got it: " + x).recover({
		case t => "err: " + t.toString
	}).foreach(x => println(x))
})

反向压力与Akka Streams

另一种解决压力的方案是反过来,由消费者去拿数据。 这样消费者这一方的系统就不会被压垮。比如典型的Reactive Streams模式。

Akka Streams提供的Reactive Streams模式的一个实现,但还在早期阶段。