Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions akka-docs/src/main/paradox/typed/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,19 @@ Please refer to @ref[snapshots](persistence-snapshot.md#snapshots) if you need t

In any case, the highest sequence number will always be recovered so you can keep persisting new events without corrupting your event log.

### Recovery from only last event

For some use cases it is enough to recover the actor from the last event, as an optimization to not replay all events.
You can enable this recovery mode with:

Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #replay-last }

Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #replay-last }

Snapshots are not loaded when recovery from last event is selected.

## Tagging

Persistence allows you to use event tags without using an @ref[`EventAdapter`](../persistence.md#event-adapters):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package akka.persistence.snapshot

import scala.collection.immutable.Seq

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

Expand Down Expand Up @@ -181,11 +179,6 @@ abstract class SnapshotStoreSpec(config: Config)
snapshotStore.tell(SaveSnapshot(metadata, bigSnapshot), senderProbe.ref)
senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) => md }
}
"not load a snapshot when criteria is NoSnapshotAndReplayLastEvent" in {
snapshotStore.tell(
LoadSnapshot(pid, SnapshotSelectionCriteria.NoSnapshotAndReplayOnlyLast, Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
}
}

"A snapshot store optionally".may {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.persistence.PersistenceSpec
import akka.persistence.journal.JournalSpec

class InmemJournalSpec extends JournalSpec(config = PersistenceSpec.config("inmem", "InmemJournalSpec")) {
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.off()
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = false

override protected def supportsReplayOnlyLast: CapabilityFlag = CapabilityFlag.on()
override protected def supportsReplayOnlyLast: CapabilityFlag = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ class PersistenceTestKitPlugin(@nowarn("msg=never used") cfg: Config, cfgPath: S

override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
recoveryCallback: PersistentRepr => Unit): Future[Unit] =
Future.fromTry(
Try(
Future.fromTry(Try {
val highest = storage.tryReadSeqNumber(persistenceId)
if (highest != 0L && max != 0L) {
val to = math.min(toSequenceNr, highest)
// read only last when fromSequenceNr is -1
val from = if (fromSequenceNr == -1) to else fromSequenceNr

storage
.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max)
.tryRead(persistenceId, from, to, max)
.map { repr =>
// we keep the tags in the repr, so remove those here
repr.payload match {
Expand All @@ -71,7 +76,11 @@ class PersistenceTestKitPlugin(@nowarn("msg=never used") cfg: Config, cfgPath: S
}

}
.foreach(recoveryCallback)))
.foreach(recoveryCallback)
} else {
Future.successful(())
}
})

override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
Future.fromTry(Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class PersistenceTestKitJournalCompatSpec extends JournalSpec(config = Persisten

override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true
override protected def supportsMetadata: CapabilityFlag = true
override protected def supportsReplayOnlyLast: CapabilityFlag = true

}

class PersistenceTestKitSnapshotStoreCompatSpec
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (C) 2017-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.typed.scaladsl

import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
import akka.persistence.typed.PersistenceId
import akka.serialization.jackson.CborSerializable

object EventSourcedBehaviorReplayLastSpec extends Matchers {

sealed trait Command extends CborSerializable
final case class Increment(id: String) extends Command
final case class GetValue(replyTo: ActorRef[State]) extends Command

sealed trait Event extends CborSerializable
final case class Incremented(id: String) extends Event

final case class State(value: Int, history: Vector[String]) extends CborSerializable

def counter(
@nowarn("msg=never used") ctx: ActorContext[Command],
persistenceId: PersistenceId,
probe: Option[ActorRef[(State, Event)]] = None): EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
commandHandler = (state, cmd) =>
cmd match {
case Increment(id) =>
Effect.persist(Incremented(id))

case GetValue(replyTo) =>
replyTo ! state
Effect.none

},
eventHandler = (state, evt) =>
evt match {
case Incremented(id) =>
probe.foreach(_ ! ((state, evt)))
State(state.value + 1, state.history :+ id)
})
}.withRecovery(Recovery.replayOnlyLast)

}

class EventSourcedBehaviorReplayLastSpec
extends ScalaTestWithActorTestKit(
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
with AnyWordSpecLike
with LogCapturing {

import EventSourcedBehaviorReplayLastSpec._

val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()}")

"EventSourcedBehavior with Recovery.replayOnlyLast" must {
"recover from last event only" in {
val pid = nextPid()
val c = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid)))
val replyProbe = TestProbe[State]()

c ! Increment("a")
c ! Increment("b")
c ! Increment("c")
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector("a", "b", "c")))
testKit.stop(c)
replyProbe.expectTerminated(c)

val probe = TestProbe[(State, Event)]()
val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probe.ref))))
// replayed the last event
probe.expectMessage((State(0, Vector.empty), Incremented("c")))
probe.expectNoMessage()

c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector("c")))

c2 ! Increment("d")
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector("c", "d")))
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed._
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.{ Recovery => ClassicRecovery }
import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
Expand Down Expand Up @@ -69,7 +68,7 @@ class EventSourcedBehaviorWatchSpec
NoOpEventAdapter.instance[String],
NoOpSnapshotAdapter.instance[String],
snapshotWhen = SnapshotWhenPredicate.noSnapshot,
ClassicRecovery(),
Recovery.default,
RetentionCriteria.disabled,
holdingRecoveryPermit = false,
settings = settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ package akka.persistence.typed.internal

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

import org.slf4j.Logger
import org.slf4j.MDC

import akka.actor.{ ActorRef => ClassicActorRef }
import akka.actor.Cancellable
import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.Persistence
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.Recovery
import akka.persistence.typed.scaladsl.ReplicationInterceptor
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation._
import akka.persistence.JournalProtocol
import akka.persistence.Recovery
import akka.persistence.RecoveryPermitter
import akka.persistence.SnapshotProtocol
import akka.persistence.journal.Tagged
Expand All @@ -41,7 +40,6 @@ import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.scaladsl._
import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery }
import akka.persistence.typed.scaladsl.RetentionCriteria
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentationProvider

Expand Down Expand Up @@ -97,7 +95,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
snapshotWhen: SnapshotWhenPredicate[State, Event] = SnapshotWhenPredicate.noSnapshot[State, Event],
recovery: Recovery = Recovery(),
recovery: Recovery = Recovery.default,
retention: RetentionCriteria = RetentionCriteria.disabled,
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty,
Expand Down Expand Up @@ -273,7 +271,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](

override def withSnapshotSelectionCriteria(
selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection.toClassic))
copy(recovery = Recovery.withSnapshotSelectionCriteria(selection))
}

override def snapshotWhen(predicate: (State, Event, Long) => Boolean): EventSourcedBehavior[Command, Event, State] =
Expand Down Expand Up @@ -304,8 +302,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] =
copy(supervisionStrategy = backoffStrategy)

override def withRecovery(recovery: TypedRecovery): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = recovery.toClassic)
override def withRecovery(recovery: Recovery): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = recovery)
}

override def withEventPublishing(enabled: Boolean): EventSourcedBehavior[Command, Event, State] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,17 @@ private[akka] trait JournalInteractions[C, E, S] {
private[akka] def onWritesInitiated(ctx: ActorContext[_], cmd: Any, repr: immutable.Seq[PersistentRepr]): Unit = ()

protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = {
setup.internalLogger.debug("Replaying events: from: {}, to: {}", fromSeqNr, toSeqNr)
val from =
if (setup.recovery == ReplayOnlyLastRecovery) {
setup.internalLogger.debug("Recovery from last event only.")
-1L
} else {
setup.internalLogger.debug("Replaying events: from: {}, to: {}", fromSeqNr, toSeqNr)
fromSeqNr
}

setup.journal.tell(
ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId.id, setup.selfClassic),
ReplayMessages(from, toSeqNr, setup.recovery.toClassic.replayMax, setup.persistenceId.id, setup.selfClassic),
setup.selfClassic)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import akka.persistence.typed.{ javadsl, scaladsl, SnapshotSelectionCriteria }
/**
* INTERNAL API
*/
override private[akka] def toClassic = akka.persistence.Recovery()
override private[akka] def toClassic = akka.persistence.Recovery.default
}

/**
Expand All @@ -43,7 +43,8 @@ import akka.persistence.typed.{ javadsl, scaladsl, SnapshotSelectionCriteria }
/**
* INTERNAL API
*/
override private[akka] def toClassic = akka.persistence.Recovery.replayOnlyLast
override private[akka] val toClassic =
akka.persistence.Recovery(akka.persistence.SnapshotSelectionCriteria.None)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
// protect against snapshot stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = true)

// FIXME we could bypass loadSnapshot when the snapshot criteria is none or replayOnlyLast
// For replayOnlyLast we must also use fromSequenceNr = -1 for the replay
loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr)
val classicRecovery = setup.recovery.toClassic
loadSnapshot(classicRecovery.fromSnapshot, classicRecovery.toSequenceNr)

def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
Behaviors
Expand Down Expand Up @@ -194,7 +193,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
"Snapshot load error for persistenceId [{}]. Replaying all events since snapshot-is-optional=true",
setup.persistenceId)

loadSnapshotResult(snapshot = None, setup.recovery.toSequenceNr)
loadSnapshotResult(snapshot = None, setup.recovery.toClassic.toSequenceNr)
} else {
onRecoveryFailure(cause)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.annotation.InternalApi
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.internal.ReplayOnlyLastRecovery
import akka.persistence.typed.internal.{ DefaultRecovery, DisabledRecovery, RecoveryWithSnapshotSelectionCriteria }
import akka.persistence.typed.scaladsl.Recovery

/**
* Strategy for recovery of snapshots and events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,13 @@ public Optional<Integer> stashCapacity() {
}
// #custom-stash-buffer

// #replay-last
@Override
public Recovery recovery() {
return Recovery.replayOnlyLast();
}
// #replay-last

// #wrapPersistentBehavior
@Override
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ object BasicPersistentBehaviorCompileOnly {
//#recovery-disabled
}

object ReplayLastBehavior {
def apply(): Behavior[Command] =
//#replay-last
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRecovery(Recovery.replayOnlyLast)
//#replay-last
}

object TaggingBehavior {
def apply(): Behavior[Command] =
//#tagging
Expand Down
Loading