Jade Dungeon

Akka横向扩展:集群化

CAP理论

  • Consistency(一致性)
  • Availability(可用性)
  • Partition Tolerance(分区容错性):有冗余节点,坏了一个其他的还能继续工作。

CAP理论中的妥协

在系统具备「分区容错性」的基础上,往往要在「可用性」和「一致性」中做出取舍, 例如当集群中有一个节点不用用,那么系统的设计者要面临选择:

  • 一致性优先:所有节点的状态要一致,系统返回错误信号。
  • 可用性优先:正常工作的节点返回结果,但错误节点的状态和正常工作节点的状态不一样了。

一致性优先的设计:CP系统

典型的例子是主从结构:只在主节点进行写入操作,多个从节点从主节点备份最新的数据。 如果主节点发生错误,就在从节点中选出新的主节点。

这样的结构适用于需要原子读写操作与事务支持的场景。

可用性优先的设计:AP系统

多个节点都可以读写,所以数据并不是每时每刻在所有的节点上都是一致的。 为了保证集群系统在关键的时刻具有「最终一致性」,可用性优先的系统需要设计 例如向量时钟等复杂的机制来确定系统中事件的时间和顺序。

集群的设计方案

分区模式

把同一份数据按key的散列值分散在不同的节点上。

例如在Cassandra中把散列出来的分区key称为partition key。

冗余模式

同一份数据重复记录在不同的多个节点上,当其中一个节点出问题时可以用其他的节点。 但具体实现细节上会遇到很多问题,如事件顺序等实现方式涉及到Lamport时钟或是向量时钟, 是一个很大的课题。比如先写入一条记录,再修改它的值,再删除这条记录。 三个操作以不同的顺序到达,对最终的结果有很大的影响。

一个粗糙的解决方案为「群体决策」。例如:

  • 一个写入操作发给3台机器,其中两台以上返回了成功的响应,才算写入成功。 以后的操作才可以读到刚刚写入的这一条数据。
  • 一个读取操作也是发给3台机器,只要工作中两台返回的结果一致就算是结果正确。

流行的工具Cassandra有一种叫作「读修复请求」的读请求的类型,

结合分区模式与冗余模式

先对集群进行分区,分成多个子集群。每个子集群里用冗余模式保证更高的可用性。

提前分区并向新节点重发key

原来只有4个分区,现在想分成5个分区的话,就一定要重新定义散列算法, 并移动原来分区里所有的数据。更好的方案是一开始就定义非常多的分区, 对应到比实际节点更多的虚拟的节点,例如Cassandra就有一个vnode的概念。 这样不用改散列算法,只要按虚拟区移动部分区块到新的节点上。

但是移动分区虽然是一次性的操作,但也会占用较多的时间,会影响新的写入请求。

如果没有用到这些功能,Akka Cluster提供一种机制使得集群中节点数量到达指定数量时 才会启动集群。这样在固定了partition key的数量时,可假设集群大小固定。 等到要添加分区再添加分区(但这样就要重新定义散列算法,并按新的散列算法 移动原来分区里所有的数据)。

Akka Cluster 机制

失败检测

Akka的失败检测是通过在节点之间发送心跳消息并等待响应的方式来实现的。

为了限制网络通信的开销,Akka的失败检测机制限制某个节点只监制附近有限数量的节点 (默认是5个),比如节点2只检查节点3到节点7这五个节点是否发生错误。

通过gossip协议达到一致性

Akka负责确定集群中的机器是否有状态变化,并负责把任何消息传递给集群中所有的节点。

每个节点会和相邻的节点交换当前已经知的集群状态,有了新的内容以后再把所有的消息 汇总在一起再和其他的相邻节点,就像传八卦消息一样一点点汇总出整个集群的状态。

这种机制也被称为epidemic协议,因为消息像是病毒一样传播开来。

使用Akka Cluster构建分布式系统

Akka Cluster模块是基于之前介绍的Akka Remoting,并加入了高可用性的支持。

基础配置

导入包

"com.typesafe.akka" %% "akka-cluster" % "2.3.6"
"com.typesafe.akka" %% "akka-contrib" % "2.3.6"

主节点

主节点的application.conf配置文件:

akka {
	actor { // 和remoting主要的区别在于provider的类型
		provider = "akka.cluster.ClusterActorRefProvider"
	}
	remote {
		netty.tcp {
			hostname = "127.0.0.1"
			port = 2552
		}
	}

	cluster {
		// 从节点
		seed-nodes = [
			"akka.tcp://Akkademy@127.0.0.1:2552",
			"akka.tcp://Akkademy@127.0.0.1:2551"
		]
	}

	// 启动ClusterReceptionist,负责客户端与服务器之间的通信
	extensions = ["akka.contrib.pattern.ClusterReceptionistExtension"]
}
监听端口

如果在同一台机器上跑集群的多个实例,一定要指定不同的端口:

  • 端口选0表示随机分配一个端口。
  • akka默认的端口是2552

除了配置文件,在sbt命令行中也可以传入akka.remote.netty.tcp.port参数来指定端口:

activator run -Dakka.remote.netty.tcp.port=0
指定种子节点的地址

akka.cluster.seed-nodes指定了种子节点的地址。 新节点加入集群时会按顺序连接种子节点, 连接到了种子节点后就可以通过gossip协议把自己的地址通知到整个群集。

生产环境中至少应该两个种子节点,并保证至少有一个种子节点是可用的。

启动集群种子节点时对顺序没有要求,但必须启动列出的第一个种子节点, 对集群进行初始化。

与客户端通信

启动ClusterReceptionist,负责客户端与服务器之间的通信的细节。 服务器上创建ClusterReceptionistActor的路径是:/user/receptionist

从节点配置

从节点的application.conf配置文件:

akka {
	actor {
		provider = "akka.cluster.ClusterActorRefProvider"
	}

	contrib.cluster.client {
		mailbox {      // 配置邮箱信息
			mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
			stash-capacity = 1000
		}
	}
}
provider类型

provider的类型和server一样都是ClusterActorRefProvider。

邮箱配置

配置邮箱类型。

订阅集群事件

先定义一个名为ClusterController的Actor作为其他例子的基础,它可以实例化Log, 创建集群对象:

package com.akkademy

import akka.actor.Actor
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{UnreachableMember, MemberEvent}
import akka.event.Logging

class ClusterController extends Actor {

	val log = Logging(context.system, this)
	// 创建集群对象
	val cluster = Cluster(context.system)

	override def preStart() {
		// 订阅消息
		cluster.subscribe(self, classOf[MemberEvent], 
				classOf[UnreachableMember])
	}

	override def postStop() {
		// 一定要取消订阅,不然会内存泄露
		cluster.unsubscribe(self)
	}

	override def receive = {
		case x: MemberEvent =>            // 收到节点状态变化消息
			log.info("MemberEvent: {}", x)
		case x: UnreachableMember =>      // 收到节点不可用消息
			log.info("UnreachableMember {}: ", x)
	}
}

这里涉及了两类Akka中定义的消息类型:

  • MemberEvent:集群中有状态变化时收到此类消息。
  • UnreachableMember:集群中有节点不可用时收到此类消息。

启动集群

以App程序的main函数作为入口,直接创建ClusterController的实例:

package com.akkademy

import akka.actor.{ActorSystem, Props}
import akka.contrib.pattern.ClusterReceptionistExtension
import akka.routing.BalancingPool

object Main extends App {
	val system = ActorSystem("Akkademy")

	val clusterController = system.actorOf(
		Props[ClusterController], "clusterController")
}

启动种子节点

种子节点列表里的第一个节点一定要启动,不然集群无法初始化。可以直接通过sbt启动:

activator run

为了节点能安全地退出集群,还要启动jmx。所以需要传入更多的参数指定jmx端口, 并在测试阶段关闭jmx的安全功能:

activator run \
	-Dcom.sun.management.jmsremote.port=9552 \
	-Dcom.sun.management.jmsremote.authenticate=false \
	-Dcom.sun.management.jmsremote.ssl=false

控制台信息中看到启动信息,还有可能尝试连接其他配置好的种子节点的dead-letters 消息。但最终应该会看到收到MemberEvent时输出的日志:

[INFO] [01/14/2015 12:22:46.756] [Akkademy-akka.actor.default-dispatcher-3]
	[akka://...] MemberEvent: MemberUp(Member(address = akka..., status = up))

这样一个种子节点就被启动了。换一个端口2551再启动另一个种子节点:

activator run \
	-Dakka.remote.netty.tcp.port=2551 \
	-Dcom.sun.management.jmsremote.port=9552 \
	-Dcom.sun.management.jmsremote.authenticate=false \
	-Dcom.sun.management.jmsremote.ssl=false

启动非种子节点

非种子节点不需要监听端口,直接先0随机选一个端口:

activator run \
	-Dakka.remote.netty.tcp.port=0 \
	-Dcom.sun.management.jmsremote.port=9552 \
	-Dcom.sun.management.jmsremote.authenticate=false \
	-Dcom.sun.management.jmsremote.ssl=false

退出集群

Cluster实例的leave方法

程序中把节点的地址作为参数传递给Cluster实例的leave()方法,让节点退出集群:

cluster.leave(self().path().address())

使用jmx消息

通过akka-cluster的工具箱发送jmx消息让节点退出集群。

先从http://akka.io/downloads/下载akka-cluster工具箱,然后删除指定的节点:

./akka-cluster localhost 9552 leave akka.tcp://Akkademy@127.0.0.1:2552

从控制台中可以看到节点状态先是变成Exiting然后变成Removed

[INFO] ........ MemberExited( Member(....., status = Exiting))
[INFO] ........ MemberRemoved(Member(....., status = Removed), Exiting)

集群成员的状态

集群在逻辑上对节点进行排序,第一个节点就是逻辑上的leader节点。 Leader负责协调状态的变化,对加入和离开集群的请求做出响应,并修改集群成员的状态。

  • 加入集群时,新节点状态为Joining,Leader把状态再改为Up
  • 离开集群时,新节点状态为Leaving,Leader把状态改为Removed

状态改变消息都是通过MemberEvent消息来传播的。

失败检测

节点不可达时,状态并不变化,而是标记为MemberUnreachable。 ClusterController中请阅MemberUnreachable类型事件,就会收到通知。 如果在超时范围内连接成功,那么节点又会重新运行;如果超时还连不上, 节点才会被标记为Down,并不能再被加入集群。必须关闭节点,重新启动, 获得一个新的ID才能重新加入集群。

如果把不重启标记为Down的节点,再次加入集群,会产生两个分离的集群, 这是Akka目前没有解决的问题。

通过路由向集群发送消息

override def receive: Receive = {
	case ParseArticle(html) => 
		val body: String = ArticleParser(html)
		sender() ! body
}

启动集群时直接启动Actor(或是Pool中的多个Actor):

package com.akkademy

import akka.actor.{ActorSystem, Props}
import akka.contrib.pattern.ClusterReceptionistExtension
import akka.routing.BalancingPool

object Main extends App {
	val system = ActorSystem("Akkademy")
	
	// 控制集群状态的Actor
	val clusterController = system.actorOf(
			Props[ClusterController], "clusterController")

	// 解析HTML的Actor 注册为 '/user/workers'
	val workers = system.actorOf(
			BalancingPool(5).props(Props[ArticleParseActor]), "workers")

	// 把worker actor转为ClusterReceptionistExtension
	ClusterReceptionistExtension(system).registerService(workers)
}

为了和客户端通信,要把worker actor转为ClusterReceptionistExtension。

然后启动一个包含有3个节点的集群:

activator run &             # 默认就是2552
activator run -Dakka.remote.netty.tcp.port=2551 &
activator run -Dakka.remote.netty.tcp.port=0 &

客户端向集群发送消息

为了简单起见,避免在服务端与客户端都在定义类型ParseArticle(htmlString), 这里直接用String类型在服务端和客户端进行通信:

import akka.actor.Actor

class ArticleParseActor extends Actor{
	override def receive: Receive = {
		case htmlString: String =>
			val body: String = ArticleParser(htmlString)
			sender() ! body
		case _ =>
			println("msg!")
	}
}

object ArticleParser {
	def apply(html: String) : String =
		de.l3s.boilerpipe.extractors.ArticleExtractor.INSTANCE.getText(html)
}

receptionist代理C-S通信

服务器与客户端的通信由细节由receptionist实现。

根据种子节点生成receptionist:

// 种子节点的地址
val initialContacts: Set[ActorSelection] = Set(
	system.actorSelection(
		"akka.tcp://Akkademy@127.0.0.1:2552/user/receptionist"),
	system.actorSelection(
		"akka.tcp://Akkademy@127.0.0.1:2551/user/receptionist")
)
// 根据种子节点生成receptionist
import collection.JavaConversions._
val receptionist = system.actorOf(ClusterClient.defaultProps(initialContacts))

在单例ClientClient中定义了三种不同的消息类型:

// 发给任意一个节点的消息
final case class Send(path: String, msg: Any, localAffinity: Boolean) 
	extends Product with Serializable

// 发给所有的节点的消息
final case class SendToAll(path: String, msg: Any) 
	extends Product with Serializable

// 发给订阅了指定主题的所有Actor的消息
final case class Publish(topic: String, msg: Any) 
	extends Product with Serializable

然后receptionist把消息发到代表 receptionist 的ActorRef,得到一个Future:

val f: Future = akka.pattern.Patterns.ask(receptionist, msg, timeout)

完整的代码:

package com.akkademy

import akka.actor.{ActorSelection, ActorSystem}
import akka.contrib.pattern.ClusterClient
import akka.pattern.Patterns
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.Duration


object ArticleParseClusterClient {

	def main(args: Array[String]) {
		val timeout = new Timeout(Duration.create(5, "seconds"))
		val system = ActorSystem.create("clientSystem")

		// 种子节点的地址
		val initialContacts: Set[ActorSelection] = Set(
			system.actorSelection(
				"akka.tcp://Akkademy@127.0.0.1:2552/user/receptionist"),
			system.actorSelection(
				"akka.tcp://Akkademy@127.0.0.1:2551/user/receptionist")
		)
		// 连接种子节点
		import collection.JavaConversions._
		val receptionist = system.actorOf(
				ClusterClient.defaultProps(initialContacts))

		// 服务器与客户端的通信由 receptionist 实现。
		// 向集群中的任意一个节点发送消息,用ClusterClient.Send()
		// 路径'/user/workers'是在刚刚Server端的main方法里定义的
		val msg = ClusterClient.Send("/user/workers", articleToParse, false)

		// receptionist把消息发到代表 receptionist 的ActorRef
		val f = Patterns.ask(receptionist, msg, timeout)
		val result = Await.result(f, timeout.duration).asInstanceOf[String]
		println("result: " + result)
	}

	val articleToParse = " ....  "  // some html String
}