在 Actor 模型中,消息是唯一的沟通方式,你要做任何事情都需要发送消息,不能直接更改 Actor 内部的状态或者使用其内部的资源。
Actor 模型简介 1、Actor 是 actor 系统中的最小单位,就类似对象是 OOP 系统中的最小单位一样。 2、也正如一个对象,一个 actor 的状态和行为也是被封装起来的。你不能直接去访问或执行一个 actor 的方法或者是成员对象(field),而只能是向这个 actor 发送一个要求访问其内部状态的消息,正如你不可能有读心术去了解别人心里是怎么想的,而只能问对方你的感受如何。 3、每一个 actor 都有一个邮箱用来接收消息,而它们的日常工作就是处理这些消息。 4、你与每个 actor 交互的方式就是向其发送一条不可变的消息,这些消息会进入到这个 actor 的邮箱中。 5、actor 也有自己的层级关系,类似一个公司的组织结构。一个 actor 会有父 actor 或者子 actor 或者兄弟 actor,正如一个高管会有董事长管他,会有其他高管与之一起合作,也会有自己的下属。这些组织结构就是为了实现一个目的:“代理委派”,让所有 actor 合作起来使系统正常运转。 6、最后一点就是 actor 模型的异常处理,当 actor 处理消息时,某些地方可能会出错,可能会抛出一些异常,那么此时怎么处理呢?通常做法是当前 actor 会停止其本身和其子 actor 的所有操作,然后向其父 actor 发送一条通知错误信息的消息,然后父节点根据不同情况做出处理,一般处理方式有以下几种:
恢复子 actor,所有状态保持不变
重启子 actor,所有状态清空,从零开始
终止子 actor (当一个 actor 终止的时候,其邮箱內的消息会进入 dead letter 邮箱)。
把错误抛给上一级 actor 处理
Actor 之间如何沟通?
一个基于 actor 的系统当然会包含很多 actor,那么它们之间是怎么沟通的呢,让通过一个 ping-pong 的例子来了解一下。
objectPingPongTestextendsApp{ val system = ActorSystem("PingPongSystem") val pong = system.actorOf(Props[Pong], name = "pong") val ping = system.actorOf(Props(newPing(pong)), name = "ping") ping ! StartMessage }
overridedefreceive: Receive = { caseForceRestart => thrownewException("Boom!") case _ => println("Kenny received a message") } }
输出如下:
entered the Kenny constructor sending kenny a simple String message Kenny: preStart Kenny received a message make kenny restart Kenny: preRestart MESSAGE: ForceRestart REASON: Boom! Kenny: postStop entered the Kenny constructor Kenny: postRestart REASON: Boom! Kenny: preStart [ERROR] [05/21/201823:21:39.109] [LifecycleDemo-akka.actor.default-dispatcher-4] [akka://LifecycleDemo/user/Kenny] Boom! java.lang.Exception: Boom! at ink.baixin.akka.example.Kenny$$anonfun$receive$1.applyOrElse(LifecycleDemo.scala:45) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at ink.baixin.akka.example.Kenny.aroundReceive(LifecycleDemo.scala:26) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:590) at akka.actor.ActorCell.invoke(ActorCell.scala:559) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
stopping kenny shutting down system Kenny: postStop
让我们来分析一下整个过程: 首先创建一个 actor,进入其构造器创建一个 actor 实例;然后启动这个 actor,启动之前会先调用该实例的 preStart 方法;之后我们向这个 actor 发送了一条 String 类型的消息,actor 调用其 receive 方法进行处理;然后我们向这个 actor 发送了 ForceRestart 的消息用于重启,该消息会抛出一个异常,当 actor 检测到异常时,就会启动自动重启机制,调用实例自身的 preRestart 方法,此时会终止该 actor 实例及其所以子 actor 然后调用 postStop 方法;这个时候之前的 actor 实例的生命周期就彻底结束了,会创建一个新的 actor 实例,创建之后调用这个新实例的 postRestart 方法,然后再调用其 preStart 方法,之后新的 actor 就启动了,最后我们通过 ActorSystem 停止了这个 actor 实例,调用该实例的 postStop 方法,这就是上面 actor 经历的完整生命周期。
可以看到,一个 actor restart 前后实际上是两个不同的实例。
Start and Stop an Actor
启动 actor
使用 ActorSystem.actorOf 启动 可以使用 ActorSystem 创建其他普通 actor
val system = ActorSystem("HelloSystem") // the actor is created and started here val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
使用 context.actorOf 启动 每一个普通 actor 都有一个隐含的 context 实例,通过这个实例可以启动其它 actor
classParentextendsActor{ val child = context.actorOf(Props[Child], name = "Child") // more code here ... }
停止 actor
system.stop and context.stop 在 ActorSystem 级别,可以使用 ActorSystem 实例去停止一个 actor:
actorSystem.stop(anActor)
在一个 actor 内部,可以使用 context 引用停止其子 actor:
context.stop(childActor)
当然一个 actor 也可以停止其自身:
context.stop(self)
PoisonPill message 也可以通过向 actor 发送停止指令 PoisonPill 来停止它,当这条消息被处理时 actor 就会停止,这条消息也会像普通消息一样出现在邮箱的队列里。 下面是一个 PoisonPill 的实例:
objectAskTestextendsApp{ // create the system and actor val system = ActorSystem("AskTestSystem") val myActor = system.actorOf(Props[AskActor], name = "myActor")
// (1) this is one way to "ask" another actor for information implicitval timeout = Timeout(5 seconds) val future = myActor ? AskNameMessage val result = Await.result(future, timeout.duration).asInstanceOf[String] println(result)
// (2) a slightly different way to ask another actor for information val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String] val result2 = Await.result(future2, 1 second) println(result2)
system.terminate }
caseobjectAskNameMessage classAskActorextendsActor{ defreceive= { caseAskNameMessage => // response to the 'ask' request sender ! "Fred" case _ => println("that was unexpected") } }
如果一个 actor 在不同的时间需要对应不同的状态,这样可以进行不同的消息处理,此时可以使用 become 方法切换 actor 的状态,就可以对应到不同的 receive 方法,下面是一个绿巨人变身的例子:
import akka.actor.{Actor, ActorSystem, Props}
objectBecomeHulkExampleextendsApp{ val system = ActorSystem("BecomeHulkExample") val davidBanner = system.actorOf(Props[DavidBanner], name = "DavidBanner")
defangryState: Receive = { caseSolution => println("Fight with you!") caseActNormalMessage => println("Phew, I'm back to become David.") become(normalState) }
defnormalState: Receive = { caseSolution => println("Looking for solution to my problem ...") caseBadGuysMakeMeAngry => println("I'm getting angry ...") become(angryState) }