异常处理与持久化

Error Handling & Persistence | 异常处理与持久化

Fault Tolerance | 容错机制

Persistence | 持久化

Akka 持久化可以使有状态的 actor 能够保持其内部状态,以便在启动、JVM 崩溃后重新启动、或在集群中迁移时,恢复它们的内部状态。Akka 持久性关键点在于,只有对 actor 内部状态的更改才会被持久化,而不会直接保持其当前状态(可选快照除外)。这些更改只会追加到存储,没有任何修改,这允许非常高的事务速率和高效的复制;通过加载持久化的数据 stateful actors 可以重建内部状态。

Akka 持久性扩展依赖一些内置持久性插件,包括基于内存堆的日志,基于本地文件系统的快照存储和基于 LevelDB 的日志。

<dependency>
    <groupId>org.iq80.leveldb</groupId>
    <artifactId>leveldb</artifactId>
    <version>0.7</version>
</dependency>

然后我们还需要针对持久化策略添加相关的配置:

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false

当我们声明某个可持久化的 Actor 时,需要使其继承自 AbstractPersistentActor:

class ExamplePersistentActor extends AbstractPersistentActor {}

然后复写 createReceiveRecover 与 createReceive 方法;createReceive 是正常的处理消息的方法,而 createReceiveRecover 则是用于在恢复阶段处理接收到的消息的方法。

@Override
public Receive createReceiveRecover() {
    return receiveBuilder()
        // 恢复之前在上一个快照点之后发布的 Event
        .match(Evt.class, e -> state.update(e))
        // 恢复之前保存的状态
        .match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
        .build();
}

@Override
public Receive createReceive() {
    return receiveBuilder()
        .match(Cmd.class, c -> {
            final String data = c.getData();
            final Evt evt = new Evt(data + "-" + getNumEvents());

            // 持久化消息
            persist(evt, (Evt event) -> {
                state.update(event);
                getContext().system().eventStream().publish(event);
            });
        })
        // 触发持久化当前状态
        .matchEquals("snap", s -> saveSnapshot(state.copy()))
        .matchEquals("print", s -> System.out.println(state))
        .build();
}

在外部调用时,我们可以手动地触发进行状态存储:

persistentActor.tell(new Cmd("foo"), null);
persistentActor.tell("snap", null);
persistentActor.tell(new Cmd("buzz"), null);
persistentActor.tell("print", null);
上一页
下一页