邮箱与路由

邮箱与路由

Mailbox

整个 akka 的 Actor 系统是通过消息进行传递的,其实还可以使用 Inbox 消息收件箱来给某个 actor 发消息,并且可以进行交互。

// 创建 Inbox 收件箱
Inbox inbox = Inbox.create(system);
// 监听一个 actor
inbox.watch(inboxTest);

//通过inbox来发送消息
inbox.send(inboxTest, Msg.WORKING);
inbox.send(inboxTest, Msg.DONE);
inbox.send(inboxTest, Msg.CLOSE);

然后在 Inbox 中可以循环等待消息回复:

while(true){
    try {
        Object receive = inbox.receive(Duration.create(1, TimeUnit.SECONDS));
        if(receive == Msg.CLOSE){//收到的inbox的消息
            System.out.println("inboxTextActor is closing");
        }else if(receive instanceof Terminated){//中断,和线程一个概念
            System.out.println("inboxTextActor is closed");
            system.terminate();
            break;
        }else {
            System.out.println(receive);
        }
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

路由与投递

通常在分布式任务调度系统中会有这样的需求:一组 actor 提供相同的服务,我们在调用任务的时候只需要选择其中一个 actor 进行处理即可。其实这就是一个负载均衡或者说路由策略,akka 作为一个高性能支持并发的 actor 模型,可以用来作为任务调度集群使用,当然负载均衡就是其本职工作了,akka 提供了 Router 来进行消息的调度。

// RouterActor,用于分发消息的 Actor
// 创建多个子 Actor
ArrayList<Routee> routees = new ArrayList<>();

for(int i = 0; i < 5; i ++) {
    //借用上面的 inboxActor
    ActorRef worker = getContext().actorOf(Props.create(InboxTest.class), "worker_" + i);
    getContext().watch(worker);//监听
    routees.add(new ActorRefRoutee(worker));
}

/**
* 创建路由对象
* RoundRobinRoutingLogic: 轮询
* BroadcastRoutingLogic: 广播
* RandomRoutingLogic: 随机
* SmallestMailboxRoutingLogic: 空闲
*/
router = new Router(new RoundRobinRoutingLogic(), routees);

@Override
public void onReceive(Object o) throws Throwable {
    if(o instanceof InboxTest.Msg){
        // 进行路由转发
        router.route(o, getSender());
    }else if(o instanceof Terminated){
        // 发生中断,将该actor删除
        router = router.removeRoutee(((Terminated)o).actor());
        System.out.println(((Terminated)o).actor().path() + " 该actor已经删除。router.size=" + router.routees().size());

        // 没有可用 actor 了
        if(router.routees().size() == 0){
            System.out.print("没有可用actor了,系统关闭。");
            flag.compareAndSet(true, false);
            getContext().system().shutdown();
        }
    }else {
        unhandled(o);
    }
}

// 外部系统照常发送消息
ActorRef routerActor = system.actorOf(Props.create(RouterActor.class), "RouterActor");
routerActor.tell(Msg.WORKING, ActorRef.noSender());
上一页