diff --git a/pkg/epp/flowcontrol/registry/doc.go b/pkg/epp/flowcontrol/registry/doc.go index 8ab1c5d4d8..3b3a70be8f 100644 --- a/pkg/epp/flowcontrol/registry/doc.go +++ b/pkg/epp/flowcontrol/registry/doc.go @@ -16,8 +16,6 @@ limitations under the License. // Package registry provides the concrete implementation of the `contracts.FlowRegistry` interface. // -// # Architecture: A Sharded, Concurrent Control Plane -// // This package implements the flow control state machine using a sharded architecture to enable scalable, parallel // request processing. It separates the orchestration control plane from the request-processing data plane. // @@ -27,10 +25,4 @@ limitations under the License. // read-optimized, concurrent-safe view for a single `controller.FlowController` worker. // - `managedQueue`: A stateful decorator around a `framework.SafeQueue`. It is the fundamental unit of state, // responsible for atomically tracking statistics (e.g., length and byte size) and ensuring data consistency. -// -// # Concurrency Model -// -// The registry uses a multi-layered strategy to maximize performance on the hot path while ensuring correctness for -// administrative tasks. -// (See the `FlowRegistry` struct documentation for detailed locking rules). package registry diff --git a/pkg/epp/flowcontrol/registry/registry.go b/pkg/epp/flowcontrol/registry/registry.go index d881edc064..703c4b708e 100644 --- a/pkg/epp/flowcontrol/registry/registry.go +++ b/pkg/epp/flowcontrol/registry/registry.go @@ -47,67 +47,51 @@ type bandStats struct { len atomic.Int64 } -// flowState holds all tracking state for a single flow instance within the registry. +// flowState tracks the lifecycle and usage of a specific flow instance. +// +// It uses a mutex-protected reference counter to arbitrate between active request processing and garbage collection. +// This structure allows the registry to safely determine if a flow is currently in use or eligible for deletion. type flowState struct { key types.FlowKey - // gcLock protects the flow's lifecycle state. - // - The Garbage Collector takes an exclusive write lock to safely delete the flow. - // - Active connections take a shared read lock for the duration of their operation, preventing the GC from running - // while allowing other connections to proceed concurrently. - gcLock sync.RWMutex + // mu protects the lifecycle fields (leaseCount, becameIdleAt, closing). + // We use a mutex instead of independent atomics to ensure that state transitions (e.g., Active -> Idle) are atomic + // and consistent. + mu sync.Mutex - // leaseCount is an atomic reference counter for all concurrent, in-flight connections. - // It is the sole source of truth for determining if a flow is Idle. - leaseCount atomic.Int64 + // leaseCount tracks the number of concurrent, in-flight connections using this flow. + // - count > 0: Active. The flow is pinned and cannot be garbage collected. + // - count == 0: Idle. The flow is eligible for garbage collection if the timeout is exceeded. + leaseCount int - // becameIdleAt tracks the time at which the lease count last dropped to zero. - // A zero value indicates the flow is currently Active. - // This field is always protected by the gcLock's exclusive write lock during modifications. + // becameIdleAt tracks the timestamp when leaseCount last dropped to zero. + // A zero value (time.Time{}) indicates the flow is currently Active. becameIdleAt time.Time + + // markedForDeletion indicates that the Garbage Collector has selected this flow for deletion. + // If true, incoming requests must back off and allow the flow to be cleaned up. + markedForDeletion bool + + // initialized ensures that the heavy-weight infrastructure provisioning (creating queues on shards) happens exactly + // once per flowState instance. + // This prevents race conditions where multiple concurrent requests might attempt to provision the same flow + // simultaneously. + initialized sync.Once } -// FlowRegistry is the concrete implementation of the `contracts.FlowRegistry` interface. -// -// # Role: The Central Orchestrator -// -// The `FlowRegistry` is the single source of truth for flow control configuration and the lifecycle manager for all -// shards and flow instances. It provides a highly concurrent data path for request processing while ensuring -// correctness for administrative tasks like scaling and garbage collection. -// -// # Concurrency Model: A Multi-Layered Strategy -// -// The registry is designed for high throughput by separating the concurrency domains of the request hot path, garbage -// collection, and administrative tasks. -// -// 1. `sync.Map` for `flowStates` (Hot Path): The `WithConnection` method uses a `sync.Map` for highly concurrent, -// often lock-free, lookups and Just-In-Time registration of different flows. -// 2. `flowState.gcLock` (`sync.RWMutex`) for Per-Flow Lifecycle: Each flow has its own `RWMutex` to arbitrate between -// active connections and the garbage collector. This surgical locking prevents GC on one flow from impacting any -// other. The interaction is as follows: -// 3. `mu (sync.RWMutex)` for Global Topology: A single registry-wide mutex protects the overall shard topology during -// infrequent administrative operations like scaling. -// -// # Flow Lifecycle: Lease-Based with Surgical GC +// FlowRegistry is the concrete implementation of the contracts.FlowRegistry interface. // -// A flow's lifecycle is managed by a lease-based reference count. +// The FlowRegistry manages the mapping between abstract FlowKeys and the concrete managed queues distributed across +// internal shards. It serves as the single source of truth for flow control configuration and lifecycle management. // -// 1. Lease Acquisition: A client calls `WithConnection` to begin a managed session. This acquires a lease by -// incrementing an atomic counter. -// 2. Lease Release: Lease release is automatic and guaranteed. When the callback function provided to `WithConnection` -// returns, the lease is released by decrementing the atomic counter. When the count reaches zero, the flow is -// marked Idle with a timestamp. -// 3. Garbage Collection: A background task scans for Idle flows. To prevent a TOCTOU race, the GC acquires an -// exclusive lock on the specific `flowState.gcLock` before re-verifying the lease count is still zero and -// proceeding with deletion. +// # Concurrency Model // -// # Locking Order -// -// To prevent deadlocks, locks MUST be acquired in the following order: -// -// 1. `FlowRegistry.mu` (Registry-level write lock) -// 2. `registryShard.mu` (Shard-level write lock) -// 3. `flowState.gcLock` (Per-flow GC lock) +// The registry employs a split concurrency model to maximize throughput: +// 1. Request Hot Path (Flows): Uses lock-free atomic tracking and sync.Map for high-frequency operations +// (Connect/Release). This allows request processing to proceed without contention from the garbage collector or +// other flows. +// 2. Administrative Path (Topology): Uses mutex-based synchronization (fr.mu) for infrequent operations such as +// scaling, configuration updates, or dynamic priority band provisioning. type FlowRegistry struct { // --- Immutable dependencies (set at construction) --- config *Config @@ -116,8 +100,9 @@ type FlowRegistry struct { // --- Lock-free / Concurrent state (hot path) --- - // flowStates tracks all flow instances, keyed by `types.FlowKey`. - flowStates sync.Map // stores `types.FlowKey` -> *flowState + // flowStates tracks all active flow instances, keyed by FlowKey. + // Access to this map is lock-free; lifecycle management is handled via the flowState atomics. + flowStates sync.Map // FlowKey -> *flowState // Globally aggregated statistics, updated atomically via lock-free propagation. totalByteSize atomic.Int64 @@ -201,90 +186,120 @@ func (fr *FlowRegistry) Run(ctx context.Context) { // --- `contracts.FlowRegistryDataPlane` Implementation --- -// Connect establishes a session for a given flow, acquiring a lifecycle lease. -// This is the primary entry point for the data path. -// If the flow does not exist, it is registered Just-In-Time (JIT). +// WithConnection establishes a managed session for the specified flow. +// +// It guarantees that the flow's associated resources are pinned and valid for the duration of the provided callback fn. +// This method relies on an atomic leasing mechanism, ensuring that active flows are never garbage collected while +// requests are in flight. +// +// If the flow does not exist, it is provisioned Just-In-Time (JIT). func (fr *FlowRegistry) WithConnection(key types.FlowKey, fn func(conn contracts.ActiveFlowConnection) error) error { if key.ID == "" { return contracts.ErrFlowIDEmpty } - // --- JIT Registration --- - val, ok := fr.flowStates.Load(key) - if !ok { - newFlowState, err := fr.prepareNewFlow(key) - if err != nil { - return fmt.Errorf("failed to prepare JIT registration for flow %s: %w", key, err) + // 1. Acquire lease: Pin the flow state in memory. + state := fr.pinActiveFlow(key) + defer fr.releaseFlow(state) + + // 2. JIT provisioning: Ensure physical resources exist on shards. + // We use sync.Once to ensure we only pay the initialization cost (building components, locking shards) exactly once + // per flowState object. + var jitErr error + state.initialized.Do(func() { + jitErr = fr.ensureFlowInfrastructure(key) + }) + + if jitErr != nil { + // If provisioning failed, this state object is invalid. + // We remove it from the map so that subsequent requests will attempt to create a fresh state object. + fr.flowStates.Delete(key) + return fmt.Errorf("failed to provision JIT flow resources: %w", jitErr) + } + + // 3. Execute callback. + // The lease is held throughout the execution of fn, preventing GC. + return fn(&connection{registry: fr, key: key}) +} + +// pinActiveFlow locates or creates the flow state and increments its lease count. +// +// It uses an optimistic loop to handle race conditions where the Garbage Collector might delete the object from the map +// concurrently. It ensures that the returned state object is both authoritative (present in the map) and leased +// (count > 0). +func (fr *FlowRegistry) pinActiveFlow(key types.FlowKey) *flowState { + for { + val, ok := fr.flowStates.Load(key) // Optimization: Check Load first to avoid allocation on the hot path. + if !ok { + val, _ = fr.flowStates.LoadOrStore(key, &flowState{key: key}) } + state := val.(*flowState) - actual, loaded := fr.flowStates.LoadOrStore(key, newFlowState) - val = actual - if loaded { - // Another goroutine won the race. Use its state and discard ours. - // If future changes make the `managedQueue` or its components more stateful (e.g., by adding background - // goroutines, registering with a metrics system, or using `sync.Pool`), a deterministic cleanup function MUST be - // called here to release those resources promptly and prevent leaks. - fr.logger.V(logging.DEBUG).Info("Concurrent JIT registration detected for flow", - "flowKey", key, "flowID", key.ID, "priority", key.Priority) + state.mu.Lock() + if state.markedForDeletion { + // The GC has marked this flow for deletion. + // We must back off and let it die. We will retry and create a fresh one. + state.mu.Unlock() + continue } + state.leaseCount++ + state.becameIdleAt = time.Time{} // Mark as Active + state.mu.Unlock() + + // Did the GC delete this object while we were acquiring it? + currentVal, ok := fr.flowStates.Load(key) + if !ok || currentVal != state { + // We acquired a "stale" object. Back off and retry. + fr.releaseFlow(state) + continue + } + return state } +} - // --- Lease Acquisition & Guaranteed Release --- - state := val.(*flowState) - state.gcLock.Lock() - state.leaseCount.Add(1) - state.becameIdleAt = time.Time{} // Mark the flow as Active. - state.gcLock.Unlock() - defer func() { - if state.leaseCount.Add(-1) == 0 { - // This was the last active lease; mark the flow as Idle. - state.gcLock.Lock() - if state.leaseCount.Load() == 0 { - state.becameIdleAt = fr.clock.Now() - } - state.gcLock.Unlock() - } - }() - - // --- Callback Execution --- - // We acquire a read lock. This has two effects: - // 1. It allows many connections to execute this section concurrently. - // 2. It prevents the GC from acquiring a write lock, thus guaranteeing the flow state cannot be deleted while `fn()` - // is running. - state.gcLock.RLock() - defer state.gcLock.RUnlock() - return fn(&connection{registry: fr, key: key}) +// releaseFlow decrements the lease count for a flow. +// If the lease count reaches zero, the flow is marked as idle with the current timestamp. +func (fr *FlowRegistry) releaseFlow(state *flowState) { + state.mu.Lock() + defer state.mu.Unlock() + state.leaseCount-- + if state.leaseCount == 0 { + state.becameIdleAt = fr.clock.Now() + } } -// prepareNewFlow creates a new `flowState` and synchronizes its queues and policies onto all existing shards. -func (fr *FlowRegistry) prepareNewFlow(key types.FlowKey) (*flowState, error) { +// ensureFlowInfrastructure guarantees that the Priority Band exists and that the flow's queues are synchronized across +// all active shards. +func (fr *FlowRegistry) ensureFlowInfrastructure(key types.FlowKey) error { + // 1. Ensure Priority Band exists. fr.mu.RLock() _, exists := fr.config.PriorityBands[key.Priority] fr.mu.RUnlock() - // If the band was missing, we must acquire the Write Lock to create it. if !exists { if err := fr.ensurePriorityBand(key.Priority); err != nil { - return nil, err + return err } } // Now we know the band exists (or we errored). Re-acquire Read Lock to safely read the topology and build components. + + // 2. Synchronize shards. + // Acquire Read Lock to iterate the shard topology safely. fr.mu.RLock() defer fr.mu.RUnlock() components, err := fr.buildFlowComponents(key, len(fr.allShards)) if err != nil { - return nil, err + return err } for i, shard := range fr.allShards { shard.synchronizeFlow(key, components[i].policy, components[i].queue) } - fr.logger.Info("Successfully prepared and synchronized new flow instance", - "flowKey", key, "flowID", key.ID, "priority", key.Priority) - return &flowState{key: key}, nil + fr.logger.V(logging.DEBUG).Info("JIT provisioned flow infrastructure", "flowKey", key) + return nil } // ensurePriorityBand safely provisions a new priority band. @@ -365,73 +380,66 @@ func (fr *FlowRegistry) ShardStats() []contracts.ShardStats { // executeGCCycle orchestrates the periodic GC of Idle flows and Drained shards. func (fr *FlowRegistry) executeGCCycle() { fr.logger.V(logging.DEBUG).Info("Starting periodic GC scan") - var flowCandidates []types.FlowKey + fr.gcFlows() + fr.sweepDrainingShards() +} + +// gcFlows performs the Mark-and-Sweep of Idle flows. +// +// It iterates through all tracked flows and identifies candidates that have zero active leases and have exceeded the +// configured idle timeout. These flows are first removed from the internal map (Logical Delete) and then cleaned up +// from the shards (Physical Delete). +func (fr *FlowRegistry) gcFlows() { + var flowsToClean []types.FlowKey fr.flowStates.Range(func(key, value interface{}) bool { state := value.(*flowState) - state.gcLock.RLock() - // A flow is a candidate if its lease count is zero and its idleness timeout has expired. - if state.leaseCount.Load() == 0 && !state.becameIdleAt.IsZero() { - if fr.clock.Since(state.becameIdleAt) > fr.config.FlowGCTimeout { - flowCandidates = append(flowCandidates, key.(types.FlowKey)) - } + state.mu.Lock() + + // 1. Check Lease. + if state.leaseCount > 0 { + state.mu.Unlock() + return true + } + + // 2. Check Idle Timeout. + if state.becameIdleAt.IsZero() || fr.clock.Since(state.becameIdleAt) < fr.config.FlowGCTimeout { + state.mu.Unlock() + return true // Not yet expired or active. } - state.gcLock.RUnlock() + + // 3. Mark for Deletion. + state.markedForDeletion = true + idleTime := state.becameIdleAt // Captured for logging + state.mu.Unlock() + + // 4. Logical Delete. + // Remove from the map. Concurrent WithConnection calls will now create a fresh instance. + fr.flowStates.Delete(key) + flowsToClean = append(flowsToClean, key.(types.FlowKey)) + fr.logger.V(logging.VERBOSE).Info("Garbage collecting flow", "flowKey", key, "becameIdleAt", idleTime) return true }) - if len(flowCandidates) > 0 { - fr.verifyAndSweepFlows(flowCandidates) + + // 5. Physical Cleanup. + // Performed outside the map iteration to avoid blocking or complex lock interactions. + if len(flowsToClean) > 0 { + fr.cleanupFlowResources(flowsToClean) } - fr.sweepDrainingShards() } -// verifyAndSweepFlows performs the "verify" and "sweep" phases of GC for Idle flows. -// For each candidate, it acquires an exclusive lock on that specific flow's state, re-verifies it is still Idle, and -// then safely performs the deletion. -func (fr *FlowRegistry) verifyAndSweepFlows(candidates []types.FlowKey) { - fr.logger.V(logging.DEBUG).Info("Starting GC Verify and Sweep phase for flows", "candidateCount", len(candidates)) - - // Get a stable snapshot of the shard topology, so the list of shards does not change while we are preparing to delete - // queues from them. - fr.mu.RLock() - shardsSnapshot := fr.allShards - fr.mu.RUnlock() - - var collectedCount int - for _, key := range candidates { - val, ok := fr.flowStates.Load(key) - if !ok { - // Benign race: the flow was already deleted by a previous GC cycle or another process. We can safely ignore it. - continue - } - state := val.(*flowState) +// cleanupFlowResources removes queue resources from the shards for the specified flows. +func (fr *FlowRegistry) cleanupFlowResources(keys []types.FlowKey) { + fr.mu.Lock() // Exclusive lock to prevent race with ensureFlowInfrastructure. + defer fr.mu.Unlock() - // Acquire the exclusive write lock for this specific flow, blocking any new `Connect/Close` operations for this - // flow only and ensuring the state is stable for our check. All other flows are unaffected. - state.gcLock.Lock() - - // Verify Phase: - if state.leaseCount.Load() > 0 { - // Verification failed. A new lease was acquired between our initial scan and acquiring the lock. - // The flow is Active again, so we leave it alone. - fr.logger.V(logging.DEBUG).Info("GC of flow aborted: re-verification failed (flow is Active)", - "flowKey", key, "flowID", key.ID, "priority", key.Priority, - "leaseCount", state.leaseCount.Load(), "becameIdleAt", state.becameIdleAt) - state.gcLock.Unlock() - continue + for _, key := range keys { + if _, exists := fr.flowStates.Load(key); exists { + continue // 'Zombie' flow } - - // Sweep Phase: - for _, shard := range shardsSnapshot { + for _, shard := range fr.allShards { shard.deleteFlow(key) } - fr.flowStates.Delete(key) - fr.logger.V(logging.VERBOSE).Info("Successfully verified and swept flow", - "flowKey", key, "flowID", key.ID, "priority", key.Priority, "becameIdleAt", state.becameIdleAt) - collectedCount++ - state.gcLock.Unlock() } - - fr.logger.V(logging.DEBUG).Info("GC Verify and Sweep phase completed", "flowsCollected", collectedCount) } // sweepDrainingShards finalizes the removal of drained shards. @@ -484,23 +492,15 @@ func (fr *FlowRegistry) updateShardCount(n int) error { } // executeScaleUpLocked handles adding new shards. -// It uses a "prepare-then-commit" pattern to ensure that the entire scale-up operation is transactional and never -// leaves the system in a partially-synchronized, inconsistent state. -// -// The preparation phase iterates over all existing flows once, pre-building all necessary components for every new -// shard. This requires O(M*K) operations (M=flows, K=new shards) and is performed while holding the main control plane -// lock. If M is large, this operation may block the control plane for a significant duration. -// -// Expects the registry's write lock to be held. +// It pre-provisions all existing active flows onto the new shards to ensure continuity. func (fr *FlowRegistry) executeScaleUpLocked(newTotalActive int) error { currentActive := len(fr.activeShards) numToAdd := newTotalActive - currentActive fr.logger.Info("Scaling up shards", "currentActive", currentActive, "newTotalActive", newTotalActive) - // Prepare All New Shard Objects (Fallible): + // Prepare new shards. newShards := make([]*registryShard, numToAdd) for i := range numToAdd { - // Using a padding of 4 allows for up to 9999 shards, which is a very safe upper bound. shardID := fmt.Sprintf("shard-%04d", fr.nextShardID+uint64(i)) partitionedConfig := fr.config.partition(currentActive+i, newTotalActive) shard, err := newShard( diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index d9dbfaf93b..07d9701e52 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -361,53 +361,35 @@ func TestFlowRegistry_GarbageCollection(t *testing.T) { h.assertFlowExists(key, "Flow should survive GC because its idleness timer was reset") }) - t.Run("ShouldAbortSweep_WhenFlowBecomesActiveAfterScan", func(t *testing.T) { + t.Run("ShouldSkipGC_WhenIdleTimeoutExpired_ButActiveLeaseExists", func(t *testing.T) { t.Parallel() h := newRegistryTestHarness(t, harnessOptions{}) - key := types.FlowKey{ID: "re-activated-flow", Priority: highPriority} + key := types.FlowKey{ID: "race-resurrected-flow", Priority: highPriority} h.openConnectionOnFlow(key) - // Get the flow's state so we can manipulate its lock. + // Manually manipulate the state to simulate a race condition. + // The flow is "Technically Idle" (timeout expired) ... val, ok := h.fr.flowStates.Load(key) - require.True(t, ok, "Test setup: flow state must exist") + require.True(t, ok) state := val.(*flowState) - // Acquire the lock before stepping the clock. - // This ensures the Background GC (which wakes up on Step) is blocked and cannot touch our flow until we release - // this lock. - state.gcLock.Lock() - defer state.gcLock.Unlock() + // Force the idle timestamp to be old. + oldTime := h.fakeClock.Now().Add(-h.config.FlowGCTimeout * 2) - // Now step the clock to mark the flow as a candidate for GC. - // The Background GC wakes up but hangs on the lock. - h.fakeClock.Step(h.config.FlowGCTimeout + time.Second) - candidates := []types.FlowKey{key} + state.mu.Lock() + state.becameIdleAt = oldTime - // Start the manual sweep in the background; it will also block on the lock. - sweepDone := make(chan struct{}) - go func() { - defer close(sweepDone) - h.fr.verifyAndSweepFlows(candidates) - }() - - // While the sweeps are blocked, simulate the flow becoming Active. - state.leaseCount.Add(1) + // ... BUT it has an active lease (simulating a request arriving just now). + // Note: In the real code, these two updates happen atomically, but we force this + // state to verify the GC's safety priority (Lease > Time). + state.leaseCount = 1 + state.mu.Unlock() - // Unblock the sweeps. - // The Background GC and Manual Sweep will now race for the lock. - // Since leaseCount is now 1, whoever wins will see it is Active and abort. - state.gcLock.Unlock() - - // Wait for the manual sweep to complete. - select { - case <-sweepDone: - // Success - case <-time.After(time.Second): - t.Fatal("verifyAndSweepFlows deadlocked or timed out") - } + // Trigger GC. + h.fr.executeGCCycle() - h.assertFlowExists(key, "Flow should not be collected because it became active before the sweep") - state.gcLock.Lock() // Re-lock for the deferred unlock. + // The GC should have seen the leaseCount > 0 and skipped the deletion, despite the expired timestamp. + h.assertFlowExists(key, "Flow must not be collected if lease > 0, even if idle timer is expired") }) t.Run("ShouldCollectDrainingShard_OnlyWhenEmpty", func(t *testing.T) { @@ -437,49 +419,6 @@ func TestFlowRegistry_GarbageCollection(t *testing.T) { h.fr.sweepDrainingShards() assert.Empty(t, h.fr.drainingShards, "Draining shard should be collected after it becomes empty") }) - - t.Run("ShouldHandleBenignRace_WhenSweepingAlreadyDeletedFlow", func(t *testing.T) { - t.Parallel() - h := newRegistryTestHarness(t, harnessOptions{}) - key := types.FlowKey{ID: "benign-race-flow", Priority: highPriority} - h.openConnectionOnFlow(key) - - // Get the flow state so we can lock it. - val, ok := h.fr.flowStates.Load(key) - require.True(t, ok, "Test setup: flow state must exist") - state := val.(*flowState) - - // Make the flow a candidate for GC. - h.fakeClock.Step(h.config.FlowGCTimeout + time.Second) - candidates := []types.FlowKey{key} - - // Manually lock the flow's `gcLock`. This simulates the GC being stuck just before its "Verify" phase. - state.gcLock.Lock() - defer state.gcLock.Unlock() - - // In a background goroutine, run the sweep. It will block on the lock. - sweepDone := make(chan struct{}) - go func() { - defer close(sweepDone) - h.fr.verifyAndSweepFlows(candidates) - }() - - // While the sweep is blocked, delete the flow from underneath it. - // This creates the benign race condition. - h.fr.flowStates.Delete(key) - - // Unblock the sweep logic. - state.gcLock.Unlock() // Temporarily unlock to let the sweep proceed. - - // The sweep must complete without panicking. - select { - case <-sweepDone: - // Success! The test completed gracefully. - case <-time.After(time.Second): - t.Fatal("verifyAndSweepFlows deadlocked or timed out") - } - state.gcLock.Lock() // Re-lock for the deferred unlock. - }) } // --- Shard Management Tests --- @@ -713,6 +652,107 @@ func TestFlowRegistry_Concurrency(t *testing.T) { h.assertFlowExists(key, "Flow must exist after concurrent JIT registration") }) + t.Run("ShouldRecover_WhenGCDeletesFlow_DuringConnectionAttempt", func(t *testing.T) { + t.Parallel() + h := newRegistryTestHarness(t, harnessOptions{}) + key := types.FlowKey{ID: "zombie-race-flow", Priority: highPriority} + + // We want to force the specific race in pinActiveFlow where: + // 1. User loads ptr A. + // 2. GC deletes ptr A from Map. + // 3. User checks map, sees nil or ptr B. + // 4. User retries. + + var wg sync.WaitGroup + stopCh := make(chan struct{}) + + // Routine 1: The "User" - Constantly tries to connect. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopCh: + return + default: + // This triggers the optimistic loop. + err := h.fr.WithConnection(key, func(c contracts.ActiveFlowConnection) error { + return nil + }) + if err != nil { + h.t.Logf("Connection failed during race: %v", err) + } + } + } + }() + + // Routine 2: The "GC" - Constantly deletes the flow. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopCh: + return + default: + // Forcefully delete the key to trigger the "Zombie" condition in Routine 1. + h.fr.flowStates.Delete(key) + time.Sleep(100 * time.Microsecond) // Yield briefly to let Routine 1 make progress + } + } + }() + + // Let the chaos run for a bit. + time.Sleep(100 * time.Millisecond) + close(stopCh) + wg.Wait() + + // Final consistency check: Ensure that we can still connect successfully after the chaos. + // If the optimistic loop works, the final state in the map should be valid. + h.openConnectionOnFlow(key) + }) + + t.Run("ShouldBackOff_WhenFlowIsMarkedForDeletion_ButStillInMap", func(t *testing.T) { + t.Parallel() + h := newRegistryTestHarness(t, harnessOptions{}) + key := types.FlowKey{ID: "doomed-flow", Priority: highPriority} + h.openConnectionOnFlow(key) + + // Get the original flow state object. + val, ok := h.fr.flowStates.Load(key) + require.True(t, ok) + originalState := val.(*flowState) + + // Manually poison it (simulate GC step: marked but not yet deleted from map). + originalState.mu.Lock() + originalState.markedForDeletion = true + originalState.mu.Unlock() + + // Launch a background routine to simulate the GC completing the deletion. + // Without this, the main thread would spin forever in pinActiveFlow reloading the same doomed object. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Yield to allow the main thread to enter the retry loop and hit the "poisoned" check at least once. + time.Sleep(10 * time.Millisecond) + h.fr.flowStates.Delete(key) + }() + + // Attempt to connect. + // It should spin briefly, detect the deletion, create a new flow, and succeed. + err := h.fr.WithConnection(key, func(c contracts.ActiveFlowConnection) error { + return nil + }) + require.NoError(t, err, "WithConnection should recover and succeed") + wg.Wait() + + // Verification: Ensure we are using a fresh object, not the resurrected corpse. + newVal, ok := h.fr.flowStates.Load(key) + require.True(t, ok) + assert.NotSame(t, originalState, newVal, "Should have created a new flow object, not reused the marked one") + }) + t.Run("MixedAdminAndDataPlaneWorkload", func(t *testing.T) { t.Parallel() h := newRegistryTestHarness(t, harnessOptions{initialShardCount: 1})