|
| 1 | +/* |
| 2 | + * Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com> |
| 3 | + */ |
| 4 | + |
| 5 | +package akka.cluster.sharding |
| 6 | + |
| 7 | +import scala.concurrent.Future |
| 8 | +import scala.concurrent.duration._ |
| 9 | + |
| 10 | +import com.typesafe.config.ConfigFactory |
| 11 | + |
| 12 | +import akka.actor._ |
| 13 | +import akka.cluster.Cluster |
| 14 | +import akka.cluster.MemberStatus |
| 15 | +import akka.cluster.ddata.LWWRegister |
| 16 | +import akka.cluster.ddata.LWWRegisterKey |
| 17 | +import akka.cluster.ddata.Replicator._ |
| 18 | +import akka.cluster.sharding.ShardCoordinator.Internal._ |
| 19 | +import akka.cluster.sharding.ShardRegion.ShardId |
| 20 | +import akka.testkit._ |
| 21 | + |
| 22 | +object StaleRegionDetectionSpec { |
| 23 | + |
| 24 | + def config(enabled: Boolean = true) = ConfigFactory.parseString(s""" |
| 25 | + akka.actor.provider = cluster |
| 26 | + akka.remote.artery.canonical.port = 0 |
| 27 | + akka.loglevel = DEBUG |
| 28 | + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] |
| 29 | + akka.cluster.sharding.verbose-debug-logging = on |
| 30 | + akka.cluster.sharding.updating-state-timeout = 60s |
| 31 | + akka.cluster.sharding.rebalance-interval = 120s |
| 32 | + akka.cluster.sharding.shard-start-timeout = 120s |
| 33 | + akka.cluster.min-nr-of-members = 1 |
| 34 | + akka.cluster.sharding.stale-region-detection { |
| 35 | + enabled = $enabled |
| 36 | + check-interval = 500ms |
| 37 | + startup-grace-period = 0s |
| 38 | + } |
| 39 | + """) |
| 40 | + |
| 41 | + class TestAllocationStrategy extends ShardCoordinator.ShardAllocationStrategy { |
| 42 | + @volatile var targetRegion: Option[ActorRef] = None |
| 43 | + |
| 44 | + override def allocateShard( |
| 45 | + requester: ActorRef, |
| 46 | + shardId: ShardId, |
| 47 | + currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]): Future[ActorRef] = |
| 48 | + Future.successful(targetRegion.getOrElse(currentShardAllocations.minBy(_._2.size)._1)) |
| 49 | + |
| 50 | + override def rebalance( |
| 51 | + currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]], |
| 52 | + rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = |
| 53 | + Future.successful(Set.empty) |
| 54 | + } |
| 55 | +} |
| 56 | + |
| 57 | +class StaleRegionDetectionSpec extends AkkaSpec(StaleRegionDetectionSpec.config()) with WithLogCapturing { |
| 58 | + |
| 59 | + import StaleRegionDetectionSpec._ |
| 60 | + |
| 61 | + private type CoordinatorUpdate = Update[LWWRegister[State]] |
| 62 | + |
| 63 | + private case class Fixture(coordinator: ActorRef, replicatorProbe: TestProbe, strategy: TestAllocationStrategy) |
| 64 | + |
| 65 | + private var testCounter = 0 |
| 66 | + |
| 67 | + private def nextTypeName(): String = { |
| 68 | + testCounter += 1 |
| 69 | + s"TestEntity$testCounter" |
| 70 | + } |
| 71 | + |
| 72 | + override def atStartup(): Unit = { |
| 73 | + val cluster = Cluster(system) |
| 74 | + cluster.join(cluster.selfAddress) |
| 75 | + awaitAssert { |
| 76 | + cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1) |
| 77 | + } |
| 78 | + } |
| 79 | + |
| 80 | + private def createFixture(): Fixture = { |
| 81 | + val typeName = nextTypeName() |
| 82 | + val strategy = new TestAllocationStrategy |
| 83 | + val replicatorProbe = TestProbe() |
| 84 | + val coordinator = system.actorOf( |
| 85 | + ShardCoordinator.props( |
| 86 | + typeName, |
| 87 | + ClusterShardingSettings(system), |
| 88 | + strategy, |
| 89 | + replicatorProbe.ref, |
| 90 | + majorityMinCap = 0, |
| 91 | + rememberEntitiesStoreProvider = None)) |
| 92 | + |
| 93 | + // Bootstrap: respond to the initial ddata Get with empty state |
| 94 | + replicatorProbe.expectMsgType[Get[_]](5.seconds) |
| 95 | + replicatorProbe.reply(NotFound(LWWRegisterKey[State](s"${typeName}CoordinatorState"), None)) |
| 96 | + replicatorProbe.expectNoMessage(100.millis) |
| 97 | + |
| 98 | + Fixture(coordinator, replicatorProbe, strategy) |
| 99 | + } |
| 100 | + |
| 101 | + private def completeNextUpdate(replicatorProbe: TestProbe): DomainEvent = { |
| 102 | + val update = replicatorProbe.expectMsgType[CoordinatorUpdate](5.seconds) |
| 103 | + val evt = update.request.get.asInstanceOf[DomainEvent] |
| 104 | + replicatorProbe.reply(UpdateSuccess(update.key, update.request)) |
| 105 | + evt |
| 106 | + } |
| 107 | + |
| 108 | + private def registerRegion(coordinator: ActorRef, replicatorProbe: TestProbe): TestProbe = { |
| 109 | + val region = TestProbe() |
| 110 | + coordinator.tell(Register(region.ref), region.ref) |
| 111 | + completeNextUpdate(replicatorProbe) |
| 112 | + region.expectMsgType[RegisterAck](5.seconds) |
| 113 | + region |
| 114 | + } |
| 115 | + |
| 116 | + private def allocateShard(coordinator: ActorRef, shardId: String, replicatorProbe: TestProbe): ShardHome = { |
| 117 | + val probe = TestProbe() |
| 118 | + coordinator.tell(GetShardHome(shardId), probe.ref) |
| 119 | + completeNextUpdate(replicatorProbe) |
| 120 | + probe.expectMsgType[ShardHome](5.seconds) |
| 121 | + } |
| 122 | + |
| 123 | + "StaleRegionDetection" must { |
| 124 | + |
| 125 | + "detect and clean up a stale region via unwatch/rewatch" in { |
| 126 | + val f = createFixture() |
| 127 | + val Fixture(coordinator, replicatorProbe, strategy) = f |
| 128 | + |
| 129 | + try { |
| 130 | + val regionA = registerRegion(coordinator, replicatorProbe) |
| 131 | + |
| 132 | + strategy.targetRegion = Some(regionA.ref) |
| 133 | + allocateShard(coordinator, "s1", replicatorProbe) |
| 134 | + |
| 135 | + // Stop regionA without DeathWatch delivering Terminated to coordinator |
| 136 | + // (simulating the bug where Terminated is lost) |
| 137 | + watch(regionA.ref) |
| 138 | + system.stop(regionA.ref) |
| 139 | + expectTerminated(regionA.ref, 5.seconds) |
| 140 | + |
| 141 | + // Wait for stale region detection to kick in |
| 142 | + // startup-grace-period = 0s, check-interval = 500ms |
| 143 | + // The unwatch/rewatch cycle should trigger a new Terminated quickly |
| 144 | + val terminated = replicatorProbe.expectMsgType[CoordinatorUpdate](5.seconds) |
| 145 | + val evt = terminated.request.get.asInstanceOf[DomainEvent] |
| 146 | + evt shouldBe a[ShardRegionTerminated] |
| 147 | + evt.asInstanceOf[ShardRegionTerminated].region should ===(regionA.ref) |
| 148 | + } finally system.stop(coordinator) |
| 149 | + } |
| 150 | + |
| 151 | + "not act on regions whose member is still in the cluster" in { |
| 152 | + val f = createFixture() |
| 153 | + val Fixture(coordinator, replicatorProbe, strategy) = f |
| 154 | + |
| 155 | + try { |
| 156 | + val regionA = registerRegion(coordinator, replicatorProbe) |
| 157 | + |
| 158 | + strategy.targetRegion = Some(regionA.ref) |
| 159 | + allocateShard(coordinator, "s1", replicatorProbe) |
| 160 | + |
| 161 | + // regionA is local (same node), so its address hasLocalScope — never re-watched |
| 162 | + // Wait long enough for multiple check cycles |
| 163 | + replicatorProbe.expectNoMessage(3.seconds) |
| 164 | + } finally system.stop(coordinator) |
| 165 | + } |
| 166 | + } |
| 167 | +} |
| 168 | + |
| 169 | +class StaleRegionDetectionDisabledSpec |
| 170 | + extends AkkaSpec(StaleRegionDetectionSpec.config(enabled = false)) |
| 171 | + with WithLogCapturing { |
| 172 | + |
| 173 | + import StaleRegionDetectionSpec._ |
| 174 | + |
| 175 | + private type CoordinatorUpdate = Update[LWWRegister[State]] |
| 176 | + |
| 177 | + override def atStartup(): Unit = { |
| 178 | + val cluster = Cluster(system) |
| 179 | + cluster.join(cluster.selfAddress) |
| 180 | + awaitAssert { |
| 181 | + cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1) |
| 182 | + } |
| 183 | + } |
| 184 | + |
| 185 | + private def completeNextUpdate(replicatorProbe: TestProbe): DomainEvent = { |
| 186 | + val update = replicatorProbe.expectMsgType[CoordinatorUpdate](5.seconds) |
| 187 | + val evt = update.request.get.asInstanceOf[DomainEvent] |
| 188 | + replicatorProbe.reply(UpdateSuccess(update.key, update.request)) |
| 189 | + evt |
| 190 | + } |
| 191 | + |
| 192 | + "StaleRegionDetection when disabled" must { |
| 193 | + |
| 194 | + "not schedule stale region check timer" in { |
| 195 | + val strategy = new TestAllocationStrategy |
| 196 | + val replicatorProbe = TestProbe() |
| 197 | + val coordinator = system.actorOf( |
| 198 | + ShardCoordinator.props( |
| 199 | + "DisabledEntity", |
| 200 | + ClusterShardingSettings(system), |
| 201 | + strategy, |
| 202 | + replicatorProbe.ref, |
| 203 | + majorityMinCap = 0, |
| 204 | + rememberEntitiesStoreProvider = None)) |
| 205 | + |
| 206 | + try { |
| 207 | + replicatorProbe.expectMsgType[Get[_]](5.seconds) |
| 208 | + replicatorProbe.reply(NotFound(LWWRegisterKey[State]("DisabledEntityCoordinatorState"), None)) |
| 209 | + replicatorProbe.expectNoMessage(100.millis) |
| 210 | + |
| 211 | + val region = TestProbe() |
| 212 | + coordinator.tell(Register(region.ref), region.ref) |
| 213 | + completeNextUpdate(replicatorProbe) |
| 214 | + region.expectMsgType[RegisterAck](5.seconds) |
| 215 | + |
| 216 | + strategy.targetRegion = Some(region.ref) |
| 217 | + val probe = TestProbe() |
| 218 | + coordinator.tell(GetShardHome("s1"), probe.ref) |
| 219 | + completeNextUpdate(replicatorProbe) |
| 220 | + probe.expectMsgType[ShardHome](5.seconds) |
| 221 | + |
| 222 | + // With detection disabled, no StaleRegionCheckTick timer should fire. |
| 223 | + // Wait longer than the check-interval (500ms) to verify no timer was scheduled. |
| 224 | + // Region stays alive — we only verify no timer-based activity occurs. |
| 225 | + replicatorProbe.expectNoMessage(2.seconds) |
| 226 | + } finally system.stop(coordinator) |
| 227 | + } |
| 228 | + } |
| 229 | +} |
0 commit comments