Skip to content

Comments

feat: ChangeEvent from Durable State#32254

Merged
patriknw merged 9 commits intomainfrom
wip-chg-evt-patriknw
Dec 14, 2023
Merged

feat: ChangeEvent from Durable State#32254
patriknw merged 9 commits intomainfrom
wip-chg-evt-patriknw

Conversation

@patriknw
Copy link
Contributor

@patriknw patriknw commented Dec 8, 2023

  • by storing additional change events when Durable State is updated and deleted we make it possible to use all nice event sourced Projection capabilities with Durable State, including Projections over gRPC
  • otherwise we would have to duplicate many many things to use the existing DurableStateChange and the queries based on that
  • we might even deprecate the existing DurableStateChange if this works out, but that can be a later decision

TODO:

  • docs

* by storing additional change events when Durable State is updated and
  deleted we make it possible to use all nice event sourced Projection
  capabilities with Durable State, including Projections over gRPC
* otherwise we would have to duplicate many many things to use the
  existing DurableStateChange and the queries based on that
* we might even deprecate the existing DurableStateChange if this
  works out, but that can be a later decision
behavior

case callback: Callback[_] =>
case callback: Callback[Any] @unchecked =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird bug that we haven't noticed before. In the new test this fails when using:

Effect.delete().thenReply(replyTo)((_: String) => Done)

No compilation errors, but actually some red squiggles in IntelliJ. The runtime error was:

java.lang.ClassCastException: class java.lang.String cannot be cast to class scala.runtime.Nothing$ (java.lang.String is in module java.base of loader 'bootstrap'; scala.runtime.Nothing$ is in unnamed module of loader 'app')
	at akka.persistence.typed.state.internal.ReplyEffectImpl$$anonfun$$lessinit$greater$1.apply(SideEffect.scala:32)
	at akka.persistence.typed.state.internal.ReplyEffectImpl$$anonfun$$lessinit$greater$1.apply(SideEffect.scala:32)
	at akka.persistence.typed.state.internal.Running.applySideEffect(Running.scala:381)
	at akka.persistence.typed.state.internal.Running.applySideEffects(Running.scala:359)
	at akka.persistence.typed.state.internal.Running$PersistingState.onDeleteSuccess(Running.scala:283)
	at akka.persistence.typed.state.internal.Running$PersistingState.onMessage(Running.scala:236)
	at akka.persistence.typed.state.internal.Running$PersistingState.onMessage(Running.scala:222)
	at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:84)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:282)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
	at akka.persistence.typed.state.internal.DurableStateBehaviorImpl$$anon$1.aroundReceive(DurableStateBehaviorImpl.scala:144)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:282)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
	at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:132)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:282)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238)
	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:133)
	at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$aroundReceive$2(ActorAdapter.scala:101)
	at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$aroundReceive$2$adapted(ActorAdapter.scala:97)
	at akka.actor.typed.internal.adapter.ActorAdapter.withSafelyAdapted(ActorAdapter.scala:204)
	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:97)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

Same fix in EventSourcedBehavior.

@patriknw patriknw force-pushed the wip-chg-evt-patriknw branch from dd4dc76 to 22d5bf5 Compare December 11, 2023 11:28
@patriknw patriknw force-pushed the wip-chg-evt-patriknw branch from 22d5bf5 to ca4a7ad Compare December 11, 2023 14:02
@patriknw
Copy link
Contributor Author

This is ready for final review. Regarding reference docs, I added plugin api sections as we had for the state store, but I will add usage docs of the change event handler in Akka Projections docs since that is the purpose.

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

behavior

case callback: Callback[_] =>
case callback: Callback[Any] @unchecked =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchecked any is the best any.

if (changeEventHandler.isDefined && !durableStateStore.isInstanceOf[DurableStateUpdateWithChangeEventStore[_]])
new IllegalArgumentException(
"Change event handler was defined but the DurableStateStore " +
s"[${durableStateStore.getClass.getName}] doesn't implement [DurableStateUpdateWithChangeEventStore]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* The `updateHandler` and `deleteHandler` are invoked after the ordinary command handler. Be aware of that
* if the state is mutable and modified by the command handler the previous state parameter of the `updateHandler`
* will also include the modification, since it's the same instance. If that is problem you need to use
* immutable state and create a new state instance when modifying it in the command handler.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this caveat about mutable state

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good 👍

* The `updateHandler` and `deleteHandler` are invoked after the ordinary command handler. Be aware of that
* if the state is mutable and modified by the command handler the previous state parameter of the `updateHandler`
* will also include the modification, since it's the same instance. If that is problem you need to use
* immutable state and create a new state instance when modifying it in the command handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good 👍

patriknw and others added 2 commits December 12, 2023 10:52
@patriknw patriknw merged commit e71e1c5 into main Dec 14, 2023
@patriknw patriknw deleted the wip-chg-evt-patriknw branch December 14, 2023 16:00
@patriknw patriknw added this to the 2.9.1 milestone Dec 14, 2023
He-Pin pushed a commit to He-Pin/akka that referenced this pull request Jan 7, 2024
* by storing additional change events when Durable State is updated and
  deleted we make it possible to use all nice event sourced Projection
  capabilities with Durable State, including Projections over gRPC
* otherwise we would have to duplicate many many things to use the
  existing DurableStateChange and the queries based on that
* we might even deprecate the existing DurableStateChange if this
  works out, but that can be a later decision
* ApiMayChange
* reference docs for plugin api
* note about mutable state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants