Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -746,10 +746,49 @@ private[akka] class ShardRegion(

var coordinator: Option[ActorRef] = None

def reRegisterIfCoordinatorNotUp(): Unit =
if (coordinator.nonEmpty) {
val coordAddress = coordinator.get.path.address // safe: guarded by nonEmpty
val coordinatorStatus =
coordinator.flatMap { _ =>
membersByAge.find(_.address == coordAddress).map(_.status)
}

coordinatorStatus match {
case Some(MemberStatus.Up) => () // Do nothing
case Some(notUp) =>
if (log.isDebugEnabled)
log.debug(
"{}: Coordinator is on node with status [{}], proactively attempting to find next coordinator",
typeName,
notUp)

// For now, make one attempt to register with the oldest Up member we know about without forgetting
// about the current coordinator
// We only make one attempt to one candidate so as to not flood with registration messages
// Important since this is level-triggered (any membership change where the coordinator is on a
// not-up node) while registration is otherwise edge-triggered
coordinatorSelection.headOption.foreach(sendRegistrationMessage)

if (!timers.isTimerActive(RegisterRetry)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this work as intended, since we don't set coordinator = None for this case?

case RegisterRetry =>
      if (coordinator.isEmpty) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we watch the current coordinator, and when it terminates we will startRegistration() so that is already covered

Copy link
Contributor Author

@leviramsey leviramsey May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RegisterRetry is also changed to reRegisterIfCoordinatorNotUp if there's a coordinator present.

Yes, eventually the current coordinator will stop (worst case, we see it stop thanks to the failure detector before removal gossip) and our watch will trigger full registration. That can be several seconds in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduling the retry also helps in the event that the next-youngest is also being stopped by (e.g.) kubernetes (consider a deployment where maxSurge/maxUnavailable allows multiple pods to be stopped per round and deletion cost is being set based on cluster age: then it's likely that the oldest n pods get stopped) but we haven't found out about it yet.

An alternative could be to compute the youngest Up on membership changes and go through this if that changes, but I think a fairly quick (default 250ms) retry is sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are good here. Should also be fine to configure a shorter retry-interval. Default is 2 seconds. The initial interval is derived from that, but at least 100 ms.

nextRegistrationDelay = initRegistrationDelay

scheduleNextRegistration()
}

case None =>
// coordinator is on node which has been removed... can this actually happen?
log.warning("{}: Coordinator was on removed node [{}], attempting to re-register", typeName, coordAddress)
coordinator = None
startRegistration()
}
}

def changeMembers(newMembers: immutable.SortedSet[Member]): Unit = {
val before = membersByAge.headOption
val after = newMembers.headOption
membersByAge = newMembers
// NB: equaliity check is on uniqueAddress, not status etc.
if (before != after) {
if (log.isDebugEnabled)
log.debug(
Expand All @@ -759,7 +798,7 @@ private[akka] class ShardRegion(
after.map(_.address).getOrElse(""))
coordinator = None
startRegistration()
}
} else reRegisterIfCoordinatorNotUp()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we could speed up the registration when coordinator is None, and there is a membership change?
Right now that will be from the scheduled retries, which could be up to 2 seconds (default config)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe good enough with the retry interval. Should also be fine to configure a shorter retry-interval. Default is 2 seconds. The initial interval is derived from that, but at least 100 ms.


def receive: Receive = {
Expand Down Expand Up @@ -789,9 +828,13 @@ private[akka] class ShardRegion(
def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
case MemberUp(m) =>
addMember(m)

case MemberLeft(m) =>
// updates the status in the set
addMember(m)

case MemberExited(m) =>
// updates the status in the set
addMember(m)

case MemberRemoved(m, _) =>
Expand Down Expand Up @@ -930,7 +973,7 @@ private[akka] class ShardRegion(
if (coordinator.isEmpty) {
register()
scheduleNextRegistration()
}
} else reRegisterIfCoordinatorNotUp()

case GracefulShutdown =>
if (preparingForShutdown) {
Expand Down Expand Up @@ -1128,7 +1171,7 @@ private[akka] class ShardRegion(

def register(): Unit = {
val actorSelections = coordinatorSelection
actorSelections.foreach(_ ! registrationMessage)
actorSelections.foreach(sendRegistrationMessage)
if (shardBuffers.nonEmpty && retryCount >= 5) {
if (actorSelections.nonEmpty) {
val coordinatorMessage =
Expand Down Expand Up @@ -1175,6 +1218,10 @@ private[akka] class ShardRegion(
}
}

def sendRegistrationMessage(selection: ActorSelection): Unit = {
selection ! registrationMessage
}

def registrationMessage: Any =
if (entityProps.isDefined) Register(self) else RegisterProxy(self)

Expand Down
Loading