Akka纵向扩展:最大利用单机性能
主要思路
纵向扩展是在单台机器上尽可多地利用硬件资源。当有更大的内存或是更多CPU核心后, 能得到有效的利用。
Akka系统中本地的Actor与远程的Actor在使用上区别不大,所以横向扩与纵向扩展的 区别也不是非常明显。
主要的思路还是把如何把任务分解,并能让多个子任务并行地执行。
线程数量太少会在CPU空等时浪费太多时间;线程数量太多, 会在上下文切换中浪费太多的资源。在并行方案的选择上唯一有效的方法就只有测试。 按经验猜测时间花在什么地方,应该用多少线程的往往都是错的。
Future或Actor
有一个不太严肃的说法:「并发用Future,状态用Actor」, 虽然有点道理但还是要具体问题具体分析。
例如有一个现在的工具ArticleExtractor
可以从网页上提取文本:
import de.13s.boilerpipe.extractors.ArticleExtractor object ArticleParser { def apply(html: String): String = ArticleExtractor.INSTANCE.getText(html) }
用Future并行编程
Future.sequence()
方法可以把List[Future]
转为Future[List]
:
import scala.concurrent.ExecutionContext.Implicits.global val futures = articleList.map(article => { Future(ArticleParser.apply(article)) }) val articlesFuture: Future[List[String]] = Future.sequence(futures)
使用Actor并行编程
case class ParseArticle(html: String) class ArticleParseActor extends Actor { override def receive: Receive = { case ParseArticle(html) => val body: String = ArticleParser(html) sender() ! body } }
Ruter
Router是一个用于负载均衡和路由的抽象。实现的方案有两个:
- 从一个已经有的Router创建出Actor的Pool。
val workerRouter: ActorRef = system.actorOf( Props.create(classOf[ArticleParseActor] ).withRouter(new RoundRobinPool(8)))
- 把已经有的Actor列表(Group),传递给Router
val router = system.actorOf(new RoundRobinGroup( actors.map(actor => actor.path).props()))
路由策略
- Round Robin:依次发到Pool/Group中每个节点,循环往复。
- Random:随机向每个节点发送。
- Smallest Mailbox:优先发给邮件队列最少的Actor。
- Scatter Gather:发给所有的Actor,选最快返回结果的,并忽略其他结果。 适用于需要响应速度最快的场景。
- Tail Chopping:发送给所有的Actor,但每发一个会等一会,所以网络负载比 Scatter Gather要小得多。
- Consistent Hashing:发给Router一个Key,Router以key生成散列值决定给哪个Actor。
- BalancingPool:只能用于本地Actor。多个Actor共享同一个邮箱, 利用率高,推荐本地应用使用。
程序员可以实现自己的路由策略,但大多数情况下不推荐这么做。
向Router Group / Pool中所有的Actor发消息
akka.routing.Broadcase
可以向同组所有的Actor广播信息。
比如数据库连接断了,要向所有受到影响的Actor广播:
router ! akka.routing.Broadcast(msg)
监督Router Pool中的路由对象
使用Pool方式创建的Actor都是Router的子节点。
在创建Router时可以用withSupervisorStrategy()
方法定义一个自定义的监督策略:
val workerRouter: ActorRef = system.actorOf( Props.create(classOf[ArticleParseActor]) .withRouter(new RoundRobinPool(8) .withSupervisorStrategy(strategy)))
由Group方法创建Router时已经传入事先存在的Actor,所以没有办法用Router来监督 Group中的Actor。
这种方法还可以用来创建由ActorSystem.actorOf()
方法创建的顶层Actor的
自定义监控策略。
使用Dispatcher
Dispatcher把「如何执行任务」与「何时执行任务」两者解耦。 Dispatcher会包括一些线程,用来把任务分给Actor或是处理Future的回调函数, 所以Dispatcher是Akka能支持响应式编程的关键。 所有Actor或Future的工作是由Executor/Dispatcher分配资源来完成的。
通过创建并取得基于不同Executor的Dispatcher,所以可以定义不同的ThreadPool
或
ForkJoinPool
来隔离运行任务的环境。
Future接受Executor
或ExecutionContext
作为参数,而在Akka中,
Dispatcher实现了scala.concurrent.ExecutionContextExecutor
接口,
并且扩展了java.util.concurrent.Executor
和scala.concurrent.ExecutionContext
。
所以可以把Executor传递给Future,ExecutionContext传递给Scala的Future。
传递给Future的Dipather可以用系统默认的Dispatcher:
system.dispathcer
也可以用ID指定特定的Dispatcher:
system.dispatchers.loopup("my-dispatcher")
Executor
Executor主要有两种类型:
-
ThreadPool Executor
:多个线程可以重用,减少创建与销毁线程的次数。 -
ForkJoinPool Executor
:用分治算法把任务分解为更小的子任务,先完成任务的线程 会帮还没有完成任务的线程完成没有完成任务中的子任务。因为效率比ThreadPool更高 所以一般默认选择ForkJoinPool。
创建Dispatcher
application.conf
中定义Dispatcher:
my-dispatcher { type = Dispatcher # 类型 executor = "fork-join-executor" # executor 类型 fork-join-executor { parallelism-min = 2 # 最小线程数 parallelism-factor = 2.0 # 每个CPU核心最大的线程数 parallelism-max = 10 # 最大的总线程数 } # 每个Actor一次性一批处理多少个消息 # Set to 1 for as fair as possible. throughput = 100 }
Dispatcher类型
- Dispatcher:默认的类型,大多数情况下性能最好。
- PinnedDispatcher:每个Actor都有独立的ThreadPool Executor。 除非单个Actor必须处理很多重要的工作,否则不推荐。
- CallingThreadDispatcher:没有Executor,直接使用发起调用的线程上执行, 适用测试时追踪栈信息。
- BalancingDispatcher:老的接口,现在已经不推荐使用。推荐是使用BalancingPool Router,因为它间接地调用了BalancingDispatcher。
Actor可以按名称直接使用配置好的Dispatcher:
system.actorOf(Props[MyActor].withDispatcher("my-dispathcer"))
不同Dispatcher适用的场景
用不同的Dispatcher分隔不同类型的任务,防止特定任务的堆积影响的所有的任务。 千万不要把阻塞IO操作放在程序的主Dispatcher中运行,这样会阻塞整个程序。
- 解析文章:长时间CPU密集任务。Executor,较小的ForkJoinPool。
- JDBC:长时间阻塞线程的IO操作。Dispatcher,较大的ForkJoingPool。
- 从缓存中获取文章:非阻塞请求。默认Dispatcher较小的ForkJoinPool。
默认Dispatcher
akka { Actor { default-dispatcher { throughput = 1 } } }
使用默认的Dispatcher:
val system = ActorSystem() implicit val ec = system.dispatcher // 隐式的默认Dispatcher val future = Future(() => println("run in ec))
默认的Dispather中使用Future会占用Actor自身的执行时间, 接下来的小节会介绍如何解决这个问题。但还是要尽量少用ask-future的异步模式, 优先考虑tell模式能不能解决问题。
使用Future的阻塞IO Dispatcher
千万不要把阻塞IO操作放在程序的主Dispatcher中运行,这样会阻塞整个程序。
比如可以合建一个新的Dispatcher,给JDBC操作,并多给一些资源:
blocking-io-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 50.0 # 最小线程数 parallelism-factor = 10 # 每个CPU核心最大的线程数 parallelism-max = 100 # 最大的总线程数 } }
对于阻塞IO操作,对于占用线程太长的操作,应该优先把查询数据库的执行计划、SQL、 表结构等问题优化到极限,还是不行的情况下再考虑是不是要分配更多的线程。
使用定义的Dispatcher:
val ec: ExecutionContext = context.system.dispatchers.lookup( "blocking-io-dispatcher) val future: Future[UserProfile] = Future { userProfileRepository.findById(id) } (ec)
用于CPU密集任务的Dispatcher
两种可用的方法:
- 定义一个Dispatcher,用于生成Actor Pool;
- 使用BalancingPool Router(内部使用了BalancingDispatcher)
在Actor中使用配置好的Dispatcher
这种方式适合于远程的Actor调用。
定义Dispatcher:
article-parsing-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 2 parallelism-factor = 2.0 parallelism-max = 10 } throughput = 10 }
根据Dispatcher创建Actor列表:
val actors: List[ActorRef] = (0 to 7).map(x => { system.actorOf(Props(classOf[ArticleParseActor]) .withDispatcher("article-parsing-dispatcher")) }).toList
创建使用Actor的Roter:
val workerRouter = system.actorOf( RoundRobinGroup( actors.map(x => x.path.toStringWithoutAddress).toList).props(), "workerRouter") val cameoActor: ActorRef = system.actorOf(Props(new TestCameoActor(p))) workerRouter.tell(new ParseArticle(TestHelper.file), cameoActor)
如果想把Router分配给某个Dispatcher,那么Group也可以接受Dispatcher的名字作为参数 。
使用BalancingPool
因为多个Actor共用一个邮箱,这种方式适合于本地的Actor调用。 虽然不像Fork-Join-Pool那样切分子任务,但因为共用邮箱,不会有Actor在空等状态。
BalancingPool使用了BalancingDispatcher,一般希望Dispatcher中的数量与Actor数量 相等:
pool-dispatcher { fork-join-executor { # force it to allocate exactly 8 threads parallelism-min = 8 parallelism-max = 8 } }
Actor Pool里的Actor数量也是8个:
val p = Promise[String]() val workerRouter = system.actorOf( BalancingPool(8).props(Props(classOf[ArticleParseActor])), "balancing-pool-router") val cameoActor: ActorRef = system.actorOf( Props(new TestCameoActor(p))) workerRouter.tell(new ParseArticle(TestHelper.file), cameoActor)
在本地工作中,这是一种在所有Actor中平衡负载的好方法。