Add State Persistence for Crash Recovery#321
Conversation
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
📝 WalkthroughWalkthroughThis change implements state persistence for satellite crash recovery. It adds a persistence layer to save and load satellite state to/from disk, updates satellite initialization to load persisted state on startup, modifies replication to stream descriptors, and introduces E2E crash-recovery tasks and tests to validate restart behavior without full re-replication. Changes
Sequence DiagramsequenceDiagram
participant Main as Satellite Main
participant Process as FetchAndReplicateStateProcess
participant Disk as Disk (state.json)
participant GC as Ground Control
rect rgba(100, 150, 200, 0.5)
Note over Main,Disk: Startup - Load Persisted State
Main->>Process: NewFetchAndReplicateStateProcess(cm, stateFilePath, log)
Process->>Disk: LoadState(stateFilePath)
alt State file exists
Disk-->>Process: PersistedState {ConfigDigest, Groups}
Process->>Process: Initialize with persisted state
else State file missing
Disk-->>Process: nil, nil
Process->>Process: Start with empty state
end
end
rect rgba(100, 200, 100, 0.5)
Note over Process,Disk: Runtime - Sync and Persist
Process->>GC: FetchRemoteConfig()
GC-->>Process: New configuration
Process->>Process: Reconcile config changes
Process->>Disk: SaveState(stateFilePath, stateMap, configDigest)
Disk-->>Process: ✓ State persisted
end
rect rgba(200, 150, 100, 0.5)
Note over Main,Disk: Crash & Restart - Resume from Checkpoint
Main-xMain: Crash (SIGKILL)
Main->>Process: NewFetchAndReplicateStateProcess(cm, stateFilePath, log)
Process->>Disk: LoadState(stateFilePath)
Disk-->>Process: PersistedState (from last sync)
Process->>Process: Resume with checkpoint state
Note over Process: No full re-replication needed
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Note Unit test generation is a public access feature. Expect some limitations and changes as we gather feedback and continue to improve it. Generating unit tests... This may take up to 20 minutes. |
Codacy's Analysis Summary1 new issue (≤ 0 issue) Review Pull Request in Codacy →
|
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@internal/state/state_persistence.go`:
- Around line 67-79: The unmarshalling error from LoadState is being discarded
by the caller so corrupted state files aren’t logged; update behavior so the
operator sees a warning: either (A) in LoadState (function LoadState) detect
json.Unmarshal errors, call the package logger to emit a warning about a
corrupted state file and then return nil, nil so callers start fresh, or (B)
change the caller NewFetchAndReplicateStateProcess to check for a non-nil error
return from LoadState and log a warning before continuing; pick one approach and
implement it consistently (refer to LoadState and
NewFetchAndReplicateStateProcess in state_persistence.go / state_process.go).
- Around line 48-60: The temp-file write/rename sequence (tmp.Write, tmp.Close,
os.Rename using tmpName → path) lacks an explicit fsync, so data may remain in
page cache and be lost on crash; modify the sequence to call tmp.Sync() after
writing (and before tmp.Close()) and handle any Sync error similar to
Write/Close errors (cleanup tmpName and return a wrapped error) so data is
flushed to stable storage prior to os.Rename.
In `@internal/state/state_process.go`:
- Around line 39-60: The constructor NewFetchAndReplicateStateProcess silently
ignores errors from LoadState; modify it so that when stateFilePath != "" and
LoadState returns a non-nil error you log a warning including the file path and
the error before continuing with an empty state. Implement this by either adding
a logger parameter (e.g., zerolog.Logger) to NewFetchAndReplicateStateProcess
and calling logger.Warn().Err(err).Msgf(...) or (if you cannot change the
signature) using a package-level logger (github.com/rs/zerolog/log) or
fmt.Fprintf(os.Stderr, ...) to emit a clear warning that the persisted state at
stateFilePath was invalid/corrupted and is being ignored; keep the existing
behavior of continuing after logging. Ensure you import the chosen logging
package and include the unique symbols LoadState,
NewFetchAndReplicateStateProcess, stateFilePath and persisted in your change.
In `@taskfiles/e2e.yml`:
- Around line 817-850: Fix the brittle log-string assertions and the typo:
correct "entites" -> "entities" both in the test (verify-crash-recovery) and the
source at state_process.go:167, add a short comment in the test noting it relies
on that source log string, and make the greps less fragile by matching a stable
substring or regex (e.g., match "Old state has zero entit(ies)" and "Total
artifacts to replicate:\\s*0" or otherwise match the prefix "Total artifacts to
replicate" and assert the numeric value) so small rewordings won't break the
check.
🧹 Nitpick comments (5)
internal/state/state_persistence_test.go (1)
66-76: Consider adding a test for corrupted/malformed state files.The PR objective mentions handling corrupted state files gracefully. A test that writes invalid JSON to the state file path and verifies
LoadStatereturns an appropriate error would strengthen coverage of that requirement.💡 Suggested test
func TestLoadCorruptedFile(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "state.json") if err := os.WriteFile(path, []byte("{invalid json"), 0600); err != nil { t.Fatalf("failed to write corrupted file: %v", err) } loaded, err := LoadState(path) if err == nil { t.Fatal("LoadState should return error for corrupted file") } if loaded != nil { t.Fatalf("LoadState should return nil for corrupted file, got: %+v", loaded) } }internal/state/state_process.go (3)
372-379: Mutex held during disk I/O inSaveState.The mutex is held while
SaveStateperforms file creation, write, and rename (lines 372–379 here and 438–446 inprocessGroupState). Every concurrentprocessGroupStategoroutine will block on this mutex while disk I/O completes, serializing what was meant to be parallel group processing.This is correct for safety and acceptable for the current scale, but worth noting: if the number of groups grows or the filesystem is slow (e.g., network-attached storage), this will become a bottleneck. A possible future optimization is to snapshot the data under the lock and perform the write outside it.
438-446: State is saved on every individual group completion — consider batching.Each
processGroupStategoroutine callsSaveStateindependently after its own group finishes. With N groups, this means N full-state serializations and atomic file writes in a single sync cycle (plus potentially one more fromreconcileRemoteConfig). SincecollectResultsalready waits for all goroutines to finish, a singleSaveStatecall aftercollectResultswould be both simpler and more efficient.That said, the current approach gives stronger durability (each completed group is immediately persisted), so this is a trade-off. If the stronger guarantee is intentional, a brief comment documenting the choice would help future readers.
15-23: Two distinct mutexes protect different things — add clarifying comments.
f.mu(line 21) guardsisRunning/name, while the localmutexcreated inExecute(line 111) guardsstateMap/currentConfigDigestwrites and persistence. The separation is fine, but without any documentation it's easy for a future contributor to conflate them or accidentally use the wrong one. A brief comment on each noting what it protects would improve maintainability.taskfiles/e2e.yml (1)
814-815: Hardcodedsleep 15for satellite startup is fragile.If the satellite takes longer than 15 seconds (e.g., slow CI, image pulls) the subsequent verification could see incomplete state. A polling loop (similar to the
wait-harborandwait-gctasks) that checks the satellite's health endpoint would be more resilient.
There was a problem hiding this comment.
4 issues found across 9 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="internal/state/state_persistence.go">
<violation number="1" location="internal/state/state_persistence.go:48">
P1: `SaveState` never fsyncs the temp file before renaming it over the state file. If the process or machine crashes after `SaveState` returns, the kernel may drop the unwritten buffers and leave `state.json` empty or corrupt despite the rename. Add a `tmp.Sync()` (and handle its error) after writing the data so the next rename actually durably persists the state.</violation>
</file>
<file name="internal/state/state_process.go">
<violation number="1" location="internal/state/state_process.go:48">
P2: When `LoadState` returns an error (e.g., corrupted JSON), the error is silently discarded and the satellite starts with empty state. For crash recovery, operators should have visibility into state file corruption. Consider logging a warning when the persisted state cannot be loaded.</violation>
<violation number="2" location="internal/state/state_process.go:374">
P2: Persisted state is written only when a group fetch completes, so when `updateStateMap` removes a group there is no subsequent `SaveState` call and the deletion is never flushed to disk. This leaves stale groups/entities in `state.json`, and after a crash/restart the satellite will try to replicate artifacts that were already removed upstream. Please persist immediately after stateMap shrinks (e.g., right after `updateStateMap` prunes entries or whenever the set of groups changes) so removals are reflected on disk.</violation>
</file>
<file name="taskfiles/e2e.yml">
<violation number="1" location="taskfiles/e2e.yml:823">
P3: Typo in log message assertion: "entites" should be "entities". This typo currently matches the source code, but creates a fragile coupling. If the source typo is fixed, this grep will never match and the test assertion will be silently bypassed.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|
✅ Created PR with unit tests: #322 |
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@internal/state/replicator.go`:
- Around line 90-104: The custom TLS transport returned by buildTLSTransport is
only being applied to pullOpts and not to pushOpts, so pushes use the default
transport; modify the block where transport is appended (inside the else of if
r.useUnsecure) to also append remote.WithTransport(transport) to pushOpts
alongside pullOpts, ensuring that when transport != nil both pullOpts and
pushOpts receive the custom transport so pushes and pulls use the same TLS
config (referencing nameOpts, pullOpts, pushOpts, buildTLSTransport,
remote.WithTransport, and r.useUnsecure).
- Around line 135-136: The current conversion only calls mutate.MediaType(img,
types.OCIManifestSchema1) which leaves the config and layer descriptors using
Docker V2S2 media types; update the code after creating ociImage to also call
mutate.ConfigMediaType(ociImage, types.OCIConfigJSON) and ensure every layer
descriptor's MediaType is set to an OCI type (e.g., types.OCILayer) before
finalizing the image—iterate the manifest's layers (from img/ociImage
descriptors), replace their MediaType fields with types.OCILayer (or appropriate
OCI compressed/uncompressed layer types), and return the fully converted image
so the manifest, config, and layers are consistently OCI media types.
In `@internal/state/state_process.go`:
- Around line 106-114: The code only compares oldLen := len(f.stateMap) to
detect changes after calling f.updateStateMap(satelliteState.States), which
misses swaps where the number of groups stays the same but URLs changed; change
the logic to detect real content changes by comparing the old and new URL sets
(or the full map contents) before calling SaveState: capture a copy of
f.stateMap (or compute a set-hash/serialized representation) before calling
f.updateStateMap, then after the update compare that snapshot to the current
f.stateMap and call SaveState(f.stateFilePath, f.stateMap,
f.currentConfigDigest) whenever they differ (still guarding by f.stateFilePath
!= ""), ensuring the new comparison is used instead of the length-only check so
swaps are persisted immediately and not left to processGroupState.
🧹 Nitpick comments (1)
internal/state/replicator.go (1)
122-144: Inconsistent error handling: some errors wrapped, others returned raw.Reference parsing errors (lines 114, 119) are wrapped with
fmt.Errorffor context, but descriptor fetch (line 126), image resolve (line 132), and write (line 142) return bare errors after logging. Wrapping all errors consistently makes debugging from call sites easier.Proposed fix
desc, err := remote.Get(src, pullOpts...) if err != nil { - log.Error().Msgf("Failed to fetch image descriptor: %v", err) - return err + return fmt.Errorf("fetch descriptor for %s: %w", srcRef, err) } img, err := desc.Image() if err != nil { - log.Error().Msgf("Failed to resolve image: %v", err) - return err + return fmt.Errorf("resolve image for %s: %w", srcRef, err) } // Lazy OCI conversion - no data materialized ociImage := mutate.MediaType(img, types.OCIManifestSchema1) if err := remote.Write(dst, ociImage, pushOpts...); err != nil { - log.Error().Msgf("Failed to replicate image: %v", err) - return err + return fmt.Errorf("write image to %s: %w", dstRef, err) }
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Signed-off-by: bupd <bupdprasanth@gmail.com>
Summary
Summary by cubic
Persist satellite state to disk and reload it on startup to recover from crashes without re-replicating. Replication streams layers and skips existing blobs for fast resume; adds tests for layer-level resume, handles group swaps, and fixes TLS on push.
New Features
Bug Fixes
Written for commit 07a0d7c. Summary will update on new commits.
Summary by CodeRabbit
New Features
New Features (CLI/Tasks)
Tests