Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ tasks:
cmds:
- task: _e2e:test-spiffe

e2e-crash-recovery:
desc: Run crash recovery E2E test
cmds:
- task: _e2e:test-crash-recovery

e2e-crash-recovery-byo:
desc: Run crash recovery BYO registry E2E test
cmds:
- task: _e2e:test-crash-recovery-byo

publish:
desc: "Publish both components to registry"
aliases: [p]
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func run(opts SatelliteOptions, pathConfig *config.PathConfig) error {
}
})

s := satellite.NewSatellite(cm)
s := satellite.NewSatellite(cm, pathConfig.StateFile)
err = s.Run(ctx)
if err != nil {
return fmt.Errorf("unable to start satellite: %w", err)
Expand Down
14 changes: 8 additions & 6 deletions internal/satellite/satellite.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@ import (
)

type Satellite struct {
cm *config.ConfigManager
schedulers []*scheduler.Scheduler
cm *config.ConfigManager
schedulers []*scheduler.Scheduler
stateFilePath string
}

func NewSatellite(cm *config.ConfigManager) *Satellite {
func NewSatellite(cm *config.ConfigManager, stateFilePath string) *Satellite {
return &Satellite{
cm: cm,
schedulers: make([]*scheduler.Scheduler, 0),
cm: cm,
schedulers: make([]*scheduler.Scheduler, 0),
stateFilePath: stateFilePath,
}
}

func (s *Satellite) Run(ctx context.Context) error {
log := logger.FromContext(ctx)
log.Info().Msg("Starting Satellite")

fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(s.cm)
fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(s.cm, s.stateFilePath)

// Create ZTR scheduler if not already done
if !s.cm.IsZTRDone() {
Expand Down
8 changes: 4 additions & 4 deletions internal/state/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ func NewBasicReplicatorWithTLS(sourceUsername, sourcePassword, sourceRegistry, r

// Entity represents an image or artifact which needs to be handled by the replicator
type Entity struct {
Name string
Repository string
Tag string
Digest string
Name string `json:"name"`
Repository string `json:"repository"`
Tag string `json:"tag"`
Digest string `json:"digest"`
}

func (e Entity) GetName() string {
Expand Down
82 changes: 82 additions & 0 deletions internal/state/state_persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package state

import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
)

// PersistedGroupState is the serializable form of a group's replicated entities.
type PersistedGroupState struct {
URL string `json:"url"`
Entities []Entity `json:"entities"`
}

// PersistedState is the top-level struct written to state.json.
type PersistedState struct {
ConfigDigest string `json:"config_digest,omitempty"`
Groups []PersistedGroupState `json:"groups"`
}

// SaveState writes the current stateMap and configDigest to disk.
func SaveState(path string, stateMap []StateMap, configDigest string) error {
persisted := PersistedState{
ConfigDigest: configDigest,
Groups: make([]PersistedGroupState, 0, len(stateMap)),
}
for _, sm := range stateMap {
persisted.Groups = append(persisted.Groups, PersistedGroupState{
URL: sm.url,
Entities: sm.Entities,
})
}

data, err := json.MarshalIndent(persisted, "", " ")
if err != nil {
return fmt.Errorf("marshal state: %w", err)
}

dir := filepath.Dir(path)
tmp, err := os.CreateTemp(dir, "state-*.json.tmp")
if err != nil {
return fmt.Errorf("create temp file: %w", err)
}
tmpName := tmp.Name()

if _, err := tmp.Write(data); err != nil {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
tmp.Close()
os.Remove(tmpName)
return fmt.Errorf("write temp file: %w", err)
}
if err := tmp.Close(); err != nil {
os.Remove(tmpName)
return fmt.Errorf("close temp file: %w", err)
}
if err := os.Rename(tmpName, path); err != nil {
os.Remove(tmpName)
return fmt.Errorf("rename temp to state file: %w", err)
}
Comment thread
bupd marked this conversation as resolved.
Outdated

return nil
}

// LoadState reads the persisted state from disk.
// Returns nil, nil if the file does not exist.
func LoadState(path string) (*PersistedState, error) {
data, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, fmt.Errorf("read state file: %w", err)
}

var persisted PersistedState
if err := json.Unmarshal(data, &persisted); err != nil {
return nil, fmt.Errorf("unmarshal state file: %w", err)
}
Comment thread
bupd marked this conversation as resolved.

return &persisted, nil
}
103 changes: 103 additions & 0 deletions internal/state/state_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package state

import (
"os"
"path/filepath"
"testing"
)

func TestSaveAndLoadRoundTrip(t *testing.T) {

Check warning on line 9 in internal/state/state_persistence_test.go

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

internal/state/state_persistence_test.go#L9

Method TestSaveAndLoadRoundTrip has a cyclomatic complexity of 14 (limit is 8)
dir := t.TempDir()
path := filepath.Join(dir, "state.json")

stateMap := []StateMap{
{
url: "http://registry.example.com/group1",
Entities: []Entity{
{Name: "alpine", Repository: "library", Tag: "latest", Digest: "sha256:abc123"},
{Name: "nginx", Repository: "library", Tag: "1.25", Digest: "sha256:def456"},
},
},
{
url: "http://registry.example.com/group2",
Entities: []Entity{
{Name: "redis", Repository: "library", Tag: "7", Digest: "sha256:ghi789"},
},
},
}
configDigest := "sha256:config123"

if err := SaveState(path, stateMap, configDigest); err != nil {
t.Fatalf("SaveState failed: %v", err)
}

loaded, err := LoadState(path)
if err != nil {
t.Fatalf("LoadState failed: %v", err)
}
if loaded == nil {
t.Fatal("LoadState returned nil")
}

if loaded.ConfigDigest != configDigest {
t.Errorf("ConfigDigest = %q, want %q", loaded.ConfigDigest, configDigest)
}

if len(loaded.Groups) != len(stateMap) {
t.Fatalf("Groups count = %d, want %d", len(loaded.Groups), len(stateMap))
}

for i, g := range loaded.Groups {
if g.URL != stateMap[i].url {
t.Errorf("Group[%d].URL = %q, want %q", i, g.URL, stateMap[i].url)
}
if len(g.Entities) != len(stateMap[i].Entities) {
t.Fatalf("Group[%d].Entities count = %d, want %d", i, len(g.Entities), len(stateMap[i].Entities))
}
for j, e := range g.Entities {
want := stateMap[i].Entities[j]
if e.Name != want.Name || e.Repository != want.Repository || e.Tag != want.Tag || e.Digest != want.Digest {
t.Errorf("Group[%d].Entities[%d] = %+v, want %+v", i, j, e, want)
}
}
}
}

func TestLoadNonexistentFile(t *testing.T) {
path := filepath.Join(t.TempDir(), "does-not-exist.json")

loaded, err := LoadState(path)
if err != nil {
t.Fatalf("LoadState returned error for missing file: %v", err)
}
if loaded != nil {
t.Fatalf("LoadState returned non-nil for missing file: %+v", loaded)
}
}

func TestSaveEmptyState(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "state.json")

if err := SaveState(path, nil, ""); err != nil {
t.Fatalf("SaveState failed for empty state: %v", err)
}

loaded, err := LoadState(path)
if err != nil {
t.Fatalf("LoadState failed: %v", err)
}
if loaded == nil {
t.Fatal("LoadState returned nil for empty state")
}
if loaded.ConfigDigest != "" {
t.Errorf("ConfigDigest = %q, want empty", loaded.ConfigDigest)
}
if len(loaded.Groups) != 0 {
t.Errorf("Groups count = %d, want 0", len(loaded.Groups))
}

if _, err := os.Stat(path); err != nil {
t.Errorf("state file should exist: %v", err)
}
}
42 changes: 35 additions & 7 deletions internal/state/state_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type FetchAndReplicateStateProcess struct {
currentConfigDigest string
cm *config.ConfigManager
mu sync.Mutex
stateFilePath string
}

// Define result types for channels
Expand All @@ -35,13 +36,27 @@ type ConfigFetcherResult struct {
Cancelled bool
}

func NewFetchAndReplicateStateProcess(cm *config.ConfigManager) *FetchAndReplicateStateProcess {
return &FetchAndReplicateStateProcess{
name: config.ReplicateStateJobName,
isRunning: false,
currentConfigDigest: "",
cm: cm,
func NewFetchAndReplicateStateProcess(cm *config.ConfigManager, stateFilePath string) *FetchAndReplicateStateProcess {
p := &FetchAndReplicateStateProcess{
name: config.ReplicateStateJobName,
cm: cm,
stateFilePath: stateFilePath,
}

if stateFilePath != "" {
persisted, err := LoadState(stateFilePath)
if err == nil && persisted != nil {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
p.currentConfigDigest = persisted.ConfigDigest
for _, g := range persisted.Groups {
p.stateMap = append(p.stateMap, StateMap{
url: g.URL,
Entities: g.Entities,
})
}
}
}

return p
}
Comment thread
bupd marked this conversation as resolved.
Outdated

type StateMap struct {
Expand Down Expand Up @@ -105,7 +120,7 @@ func (f *FetchAndReplicateStateProcess) Execute(ctx context.Context) error {

// Launch config fetcher goroutine
go func() {
result := f.reconcileRemoteConfig(ctx, satelliteState.Config, srcUsername, srcPassword, useUnsecure, &log)
result := f.reconcileRemoteConfig(ctx, satelliteState.Config, srcUsername, srcPassword, useUnsecure, mutex, &log)
configFetcherResult <- result
}()

Expand Down Expand Up @@ -294,6 +309,7 @@ func (f *FetchAndReplicateStateProcess) reconcileRemoteConfig(
ctx context.Context,
configURL, srcUsername, srcPassword string,
useUnsecure bool,
mutex *sync.Mutex,
log *zerolog.Logger,
) ConfigFetcherResult {
configFetcherLog := log.With().
Expand Down Expand Up @@ -353,7 +369,14 @@ func (f *FetchAndReplicateStateProcess) reconcileRemoteConfig(
result.Error = fmt.Errorf("failed to write new config to disk: %w", err)
return result
}
mutex.Lock()
f.currentConfigDigest = configDigest
if f.stateFilePath != "" {
Comment thread
bupd marked this conversation as resolved.
if err := SaveState(f.stateFilePath, f.stateMap, f.currentConfigDigest); err != nil {
configFetcherLog.Warn().Err(err).Msg("Failed to persist state to disk")
}
}
mutex.Unlock()
}

result.ConfigDigest = configDigest
Expand Down Expand Up @@ -415,6 +438,11 @@ func (f *FetchAndReplicateStateProcess) processGroupState(
mutex.Lock()
f.stateMap[index].State = newState
f.stateMap[index].Entities = FetchEntitiesFromState(newState)
if f.stateFilePath != "" {
if err := SaveState(f.stateFilePath, f.stateMap, f.currentConfigDigest); err != nil {
stateFetcherLog.Warn().Err(err).Msg("Failed to persist state to disk")
}
}
mutex.Unlock()

return result
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type PathConfig struct {
PrevConfigFile string
ZotTempConfig string
ZotStorageDir string
StateFile string
}

// expandPath expands ~ and ~/ to the user's home directory in paths.
Expand Down Expand Up @@ -88,6 +89,7 @@ func ResolvePathConfig(configDir string) (*PathConfig, error) {
PrevConfigFile: filepath.Join(expanded, "prev_config.json"),
ZotTempConfig: filepath.Join(expanded, "zot-hot.json"),
ZotStorageDir: filepath.Join(expanded, "zot"),
StateFile: filepath.Join(expanded, "state.json"),
}, nil
}

Expand Down
Loading
Loading