异常处理与持久化
Error Handling & Persistence | 异常处理与持久化
Fault Tolerance | 容错机制
Persistence | 持久化
Akka 持久化可以使有状态的 actor 能够保持其内部状态,以便在启动、JVM 崩溃后重新启动、或在集群中迁移时,恢复它们的内部状态。Akka 持久性关键点在于,只有对 actor 内部状态的更改才会被持久化,而不会直接保持其当前状态(可选快照除外)。这些更改只会追加到存储,没有任何修改,这允许非常高的事务速率和高效的复制;通过加载持久化的数据 stateful actors 可以重建内部状态。
Akka 持久性扩展依赖一些内置持久性插件,包括基于内存堆的日志,基于本地文件系统的快照存储和基于 LevelDB 的日志。
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.7</version>
</dependency>
然后我们还需要针对持久化策略添加相关的配置:
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false
当我们声明某个可持久化的 Actor 时,需要使其继承自 AbstractPersistentActor:
class ExamplePersistentActor extends AbstractPersistentActor {}
然后复写 createReceiveRecover 与 createReceive 方法;createReceive 是正常的处理消息的方法,而 createReceiveRecover 则是用于在恢复阶段处理接收到的消息的方法。
@Override
public Receive createReceiveRecover() {
return receiveBuilder()
// 恢复之前在上一个快照点之后发布的 Event
.match(Evt.class, e -> state.update(e))
// 恢复之前保存的状态
.match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
.build();
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Cmd.class, c -> {
final String data = c.getData();
final Evt evt = new Evt(data + "-" + getNumEvents());
// 持久化消息
persist(evt, (Evt event) -> {
state.update(event);
getContext().system().eventStream().publish(event);
});
})
// 触发持久化当前状态
.matchEquals("snap", s -> saveSnapshot(state.copy()))
.matchEquals("print", s -> System.out.println(state))
.build();
}
在外部调用时,我们可以手动地触发进行状态存储:
persistentActor.tell(new Cmd("foo"), null);
persistentActor.tell("snap", null);
persistentActor.tell(new Cmd("buzz"), null);
persistentActor.tell("print", null);