Jade Dungeon

Akka纵向扩展:最大利用单机性能

主要思路

纵向扩展是在单台机器上尽可多地利用硬件资源。当有更大的内存或是更多CPU核心后, 能得到有效的利用。

Akka系统中本地的Actor与远程的Actor在使用上区别不大,所以横向扩与纵向扩展的 区别也不是非常明显。

主要的思路还是把如何把任务分解,并能让多个子任务并行地执行。

线程数量太少会在CPU空等时浪费太多时间;线程数量太多, 会在上下文切换中浪费太多的资源。在并行方案的选择上唯一有效的方法就只有测试。 按经验猜测时间花在什么地方,应该用多少线程的往往都是错的。

Future或Actor

有一个不太严肃的说法:「并发用Future,状态用Actor」, 虽然有点道理但还是要具体问题具体分析。

例如有一个现在的工具ArticleExtractor可以从网页上提取文本:

import de.13s.boilerpipe.extractors.ArticleExtractor

object ArticleParser {

	def apply(html: String): String = ArticleExtractor.INSTANCE.getText(html)

}

用Future并行编程

Future.sequence()方法可以把List[Future]转为Future[List]

import scala.concurrent.ExecutionContext.Implicits.global

val futures = articleList.map(article => {
		Future(ArticleParser.apply(article))
		})

val articlesFuture: Future[List[String]] = Future.sequence(futures)

使用Actor并行编程

case class ParseArticle(html: String)
	
class ArticleParseActor extends Actor {

	override def receive: Receive = {
		case ParseArticle(html) =>
			val body: String = ArticleParser(html)
			sender() ! body
	}

}

Ruter

Router是一个用于负载均衡和路由的抽象。实现的方案有两个:

  • 从一个已经有的Router创建出Actor的Pool。
val workerRouter: ActorRef = system.actorOf(
		Props.create(classOf[ArticleParseActor]
			).withRouter(new RoundRobinPool(8)))
  • 把已经有的Actor列表(Group),传递给Router
val router = system.actorOf(new RoundRobinGroup(
			actors.map(actor => actor.path).props()))
路由策略
  • Round Robin:依次发到Pool/Group中每个节点,循环往复。
  • Random:随机向每个节点发送。
  • Smallest Mailbox:优先发给邮件队列最少的Actor。
  • Scatter Gather:发给所有的Actor,选最快返回结果的,并忽略其他结果。 适用于需要响应速度最快的场景。
  • Tail Chopping:发送给所有的Actor,但每发一个会等一会,所以网络负载比 Scatter Gather要小得多。
  • Consistent Hashing:发给Router一个Key,Router以key生成散列值决定给哪个Actor。
  • BalancingPool:只能用于本地Actor。多个Actor共享同一个邮箱, 利用率高,推荐本地应用使用。

程序员可以实现自己的路由策略,但大多数情况下不推荐这么做。

向Router Group / Pool中所有的Actor发消息

akka.routing.Broadcase可以向同组所有的Actor广播信息。 比如数据库连接断了,要向所有受到影响的Actor广播:

router ! akka.routing.Broadcast(msg)
监督Router Pool中的路由对象

使用Pool方式创建的Actor都是Router的子节点。 在创建Router时可以用withSupervisorStrategy()方法定义一个自定义的监督策略:

val workerRouter: ActorRef = system.actorOf(
		Props.create(classOf[ArticleParseActor])
		.withRouter(new RoundRobinPool(8)
			.withSupervisorStrategy(strategy)))

由Group方法创建Router时已经传入事先存在的Actor,所以没有办法用Router来监督 Group中的Actor。

这种方法还可以用来创建由ActorSystem.actorOf()方法创建的顶层Actor的 自定义监控策略。

使用Dispatcher

Dispatcher把「如何执行任务」与「何时执行任务」两者解耦。 Dispatcher会包括一些线程,用来把任务分给Actor或是处理Future的回调函数, 所以Dispatcher是Akka能支持响应式编程的关键。 所有Actor或Future的工作是由Executor/Dispatcher分配资源来完成的。

通过创建并取得基于不同Executor的Dispatcher,所以可以定义不同的ThreadPoolForkJoinPool来隔离运行任务的环境。

Future接受ExecutorExecutionContext作为参数,而在Akka中, Dispatcher实现了scala.concurrent.ExecutionContextExecutor接口, 并且扩展了java.util.concurrent.Executorscala.concurrent.ExecutionContext。 所以可以把Executor传递给Future,ExecutionContext传递给Scala的Future。

传递给Future的Dipather可以用系统默认的Dispatcher:

system.dispathcer

也可以用ID指定特定的Dispatcher:

system.dispatchers.loopup("my-dispatcher")

Executor

Executor主要有两种类型:

  • ThreadPool Executor:多个线程可以重用,减少创建与销毁线程的次数。
  • ForkJoinPool Executor:用分治算法把任务分解为更小的子任务,先完成任务的线程 会帮还没有完成任务的线程完成没有完成任务中的子任务。因为效率比ThreadPool更高 所以一般默认选择ForkJoinPool。

创建Dispatcher

application.conf中定义Dispatcher:

my-dispatcher {
  type = Dispatcher               # 类型
  executor = "fork-join-executor" # executor 类型

  fork-join-executor {
    parallelism-min = 2           # 最小线程数
    parallelism-factor = 2.0      # 每个CPU核心最大的线程数
    parallelism-max = 10          # 最大的总线程数
  }

  # 每个Actor一次性一批处理多少个消息
  # Set to 1 for as fair as possible.
  throughput = 100
}

Dispatcher类型

  • Dispatcher:默认的类型,大多数情况下性能最好。
  • PinnedDispatcher:每个Actor都有独立的ThreadPool Executor。 除非单个Actor必须处理很多重要的工作,否则不推荐。
  • CallingThreadDispatcher:没有Executor,直接使用发起调用的线程上执行, 适用测试时追踪栈信息。
  • BalancingDispatcher:老的接口,现在已经不推荐使用。推荐是使用BalancingPool Router,因为它间接地调用了BalancingDispatcher。

Actor可以按名称直接使用配置好的Dispatcher:

system.actorOf(Props[MyActor].withDispatcher("my-dispathcer"))

不同Dispatcher适用的场景

用不同的Dispatcher分隔不同类型的任务,防止特定任务的堆积影响的所有的任务。 千万不要把阻塞IO操作放在程序的主Dispatcher中运行,这样会阻塞整个程序。

  • 解析文章:长时间CPU密集任务。Executor,较小的ForkJoinPool。
  • JDBC:长时间阻塞线程的IO操作。Dispatcher,较大的ForkJoingPool。
  • 从缓存中获取文章:非阻塞请求。默认Dispatcher较小的ForkJoinPool。

默认Dispatcher

akka {
	Actor {
		default-dispatcher {
			throughput = 1
		}
	}
}

使用默认的Dispatcher:

val system = ActorSystem()
implicit val ec = system.dispatcher   // 隐式的默认Dispatcher

val future = Future(() => println("run in ec))

默认的Dispather中使用Future会占用Actor自身的执行时间, 接下来的小节会介绍如何解决这个问题。但还是要尽量少用ask-future的异步模式, 优先考虑tell模式能不能解决问题。

使用Future的阻塞IO Dispatcher

千万不要把阻塞IO操作放在程序的主Dispatcher中运行,这样会阻塞整个程序。

比如可以合建一个新的Dispatcher,给JDBC操作,并多给一些资源:

blocking-io-dispatcher {
	type = Dispatcher
	executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 50.0       # 最小线程数
    parallelism-factor = 10      # 每个CPU核心最大的线程数
    parallelism-max = 100        # 最大的总线程数
  }
}

对于阻塞IO操作,对于占用线程太长的操作,应该优先把查询数据库的执行计划、SQL、 表结构等问题优化到极限,还是不行的情况下再考虑是不是要分配更多的线程。

使用定义的Dispatcher:

val ec: ExecutionContext = context.system.dispatchers.lookup(
		"blocking-io-dispatcher)

val future: Future[UserProfile] = Future {
	userProfileRepository.findById(id)
} (ec)

用于CPU密集任务的Dispatcher

两种可用的方法:

  • 定义一个Dispatcher,用于生成Actor Pool;
  • 使用BalancingPool Router(内部使用了BalancingDispatcher)

在Actor中使用配置好的Dispatcher

这种方式适合于远程的Actor调用。

定义Dispatcher:

article-parsing-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 2.0
    parallelism-max = 10
  }
  throughput = 10
}

根据Dispatcher创建Actor列表:

val actors: List[ActorRef] = (0 to 7).map(x => {
		system.actorOf(Props(classOf[ArticleParseActor])
		.withDispatcher("article-parsing-dispatcher"))
}).toList

创建使用Actor的Roter:

val workerRouter = system.actorOf(
	RoundRobinGroup(
		actors.map(x => x.path.toStringWithoutAddress).toList).props(),
	"workerRouter")

val cameoActor: ActorRef = system.actorOf(Props(new TestCameoActor(p)))

workerRouter.tell(new ParseArticle(TestHelper.file), cameoActor)

如果想把Router分配给某个Dispatcher,那么Group也可以接受Dispatcher的名字作为参数 。

使用BalancingPool

因为多个Actor共用一个邮箱,这种方式适合于本地的Actor调用。 虽然不像Fork-Join-Pool那样切分子任务,但因为共用邮箱,不会有Actor在空等状态。

BalancingPool使用了BalancingDispatcher,一般希望Dispatcher中的数量与Actor数量 相等:

pool-dispatcher {
  fork-join-executor { # force it to allocate exactly 8 threads
    parallelism-min = 8
    parallelism-max = 8
  }
}

Actor Pool里的Actor数量也是8个:

val p = Promise[String]()

val workerRouter = system.actorOf(
		BalancingPool(8).props(Props(classOf[ArticleParseActor])),
    "balancing-pool-router")

val cameoActor: ActorRef = system.actorOf(
		Props(new TestCameoActor(p)))

workerRouter.tell(new ParseArticle(TestHelper.file), cameoActor)

在本地工作中,这是一种在所有Actor中平衡负载的好方法。