邮箱与路由
邮箱与路由
Mailbox
整个 akka 的 Actor 系统是通过消息进行传递的,其实还可以使用 Inbox 消息收件箱来给某个 actor 发消息,并且可以进行交互。
// 创建 Inbox 收件箱
Inbox inbox = Inbox.create(system);
// 监听一个 actor
inbox.watch(inboxTest);
//通过inbox来发送消息
inbox.send(inboxTest, Msg.WORKING);
inbox.send(inboxTest, Msg.DONE);
inbox.send(inboxTest, Msg.CLOSE);
然后在 Inbox 中可以循环等待消息回复:
while(true){
try {
Object receive = inbox.receive(Duration.create(1, TimeUnit.SECONDS));
if(receive == Msg.CLOSE){//收到的inbox的消息
System.out.println("inboxTextActor is closing");
}else if(receive instanceof Terminated){//中断,和线程一个概念
System.out.println("inboxTextActor is closed");
system.terminate();
break;
}else {
System.out.println(receive);
}
} catch (TimeoutException e) {
e.printStackTrace();
}
}
路由与投递
通常在分布式任务调度系统中会有这样的需求:一组 actor 提供相同的服务,我们在调用任务的时候只需要选择其中一个 actor 进行处理即可。其实这就是一个负载均衡或者说路由策略,akka 作为一个高性能支持并发的 actor 模型,可以用来作为任务调度集群使用,当然负载均衡就是其本职工作了,akka 提供了 Router 来进行消息的调度。
// RouterActor,用于分发消息的 Actor
// 创建多个子 Actor
ArrayList<Routee> routees = new ArrayList<>();
for(int i = 0; i < 5; i ++) {
//借用上面的 inboxActor
ActorRef worker = getContext().actorOf(Props.create(InboxTest.class), "worker_" + i);
getContext().watch(worker);//监听
routees.add(new ActorRefRoutee(worker));
}
/**
* 创建路由对象
* RoundRobinRoutingLogic: 轮询
* BroadcastRoutingLogic: 广播
* RandomRoutingLogic: 随机
* SmallestMailboxRoutingLogic: 空闲
*/
router = new Router(new RoundRobinRoutingLogic(), routees);
@Override
public void onReceive(Object o) throws Throwable {
if(o instanceof InboxTest.Msg){
// 进行路由转发
router.route(o, getSender());
}else if(o instanceof Terminated){
// 发生中断,将该actor删除
router = router.removeRoutee(((Terminated)o).actor());
System.out.println(((Terminated)o).actor().path() + " 该actor已经删除。router.size=" + router.routees().size());
// 没有可用 actor 了
if(router.routees().size() == 0){
System.out.print("没有可用actor了,系统关闭。");
flag.compareAndSet(true, false);
getContext().system().shutdown();
}
}else {
unhandled(o);
}
}
// 外部系统照常发送消息
ActorRef routerActor = system.actorOf(Props.create(RouterActor.class), "RouterActor");
routerActor.tell(Msg.WORKING, ActorRef.noSender());