Skip to content

Commit 17e5ddb

Browse files
authored
Add State Persistence for Crash Recovery (#321)
1 parent 00564f6 commit 17e5ddb

File tree

10 files changed

+967
-41
lines changed

10 files changed

+967
-41
lines changed

Taskfile.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ tasks:
8383
cmds:
8484
- task: _e2e:test-spiffe
8585

86+
e2e-crash-recovery:
87+
desc: Run crash recovery E2E test
88+
cmds:
89+
- task: _e2e:test-crash-recovery
90+
91+
e2e-crash-recovery-byo:
92+
desc: Run crash recovery BYO registry E2E test
93+
cmds:
94+
- task: _e2e:test-crash-recovery-byo
95+
8696
publish:
8797
desc: "Publish both components to registry"
8898
aliases: [p]

cmd/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func run(opts SatelliteOptions, pathConfig *config.PathConfig) error {
239239
}
240240
})
241241

242-
s := satellite.NewSatellite(cm)
242+
s := satellite.NewSatellite(cm, pathConfig.StateFile)
243243
err = s.Run(ctx)
244244
if err != nil {
245245
return fmt.Errorf("unable to start satellite: %w", err)

internal/satellite/satellite.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,24 @@ import (
1010
)
1111

1212
type Satellite struct {
13-
cm *config.ConfigManager
14-
schedulers []*scheduler.Scheduler
13+
cm *config.ConfigManager
14+
schedulers []*scheduler.Scheduler
15+
stateFilePath string
1516
}
1617

17-
func NewSatellite(cm *config.ConfigManager) *Satellite {
18+
func NewSatellite(cm *config.ConfigManager, stateFilePath string) *Satellite {
1819
return &Satellite{
19-
cm: cm,
20-
schedulers: make([]*scheduler.Scheduler, 0),
20+
cm: cm,
21+
schedulers: make([]*scheduler.Scheduler, 0),
22+
stateFilePath: stateFilePath,
2123
}
2224
}
2325

2426
func (s *Satellite) Run(ctx context.Context) error {
2527
log := logger.FromContext(ctx)
2628
log.Info().Msg("Starting Satellite")
2729

28-
fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(s.cm)
30+
fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(s.cm, s.stateFilePath, log)
2931

3032
// Create ZTR scheduler if not already done
3133
if !s.cm.IsZTRDone() {

internal/state/replicator.go

Lines changed: 105 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import (
1111
"github.com/container-registry/harbor-satellite/pkg/config"
1212
"github.com/google/go-containerregistry/pkg/authn"
1313
"github.com/google/go-containerregistry/pkg/crane"
14+
"github.com/google/go-containerregistry/pkg/name"
15+
v1 "github.com/google/go-containerregistry/pkg/v1"
1416
"github.com/google/go-containerregistry/pkg/v1/mutate"
17+
"github.com/google/go-containerregistry/pkg/v1/remote"
1518
"github.com/google/go-containerregistry/pkg/v1/types"
1619
)
1720

@@ -52,10 +55,10 @@ func NewBasicReplicatorWithTLS(sourceUsername, sourcePassword, sourceRegistry, r
5255

5356
// Entity represents an image or artifact which needs to be handled by the replicator
5457
type Entity struct {
55-
Name string
56-
Repository string
57-
Tag string
58-
Digest string
58+
Name string `json:"name"`
59+
Repository string `json:"repository"`
60+
Tag string `json:"tag"`
61+
Digest string `json:"digest"`
5962
}
6063

6164
func (e Entity) GetName() string {
@@ -70,59 +73,137 @@ func (e Entity) GetTag() string {
7073
return e.Tag
7174
}
7275

73-
// Replicate replicates images from the source registry to the Zot registry.
76+
// Replicate replicates images from the source registry to the local registry.
77+
// Before pulling, it checks which blobs already exist at the destination and
78+
// only downloads missing layers from source, saving bandwidth on crash recovery.
7479
func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []Entity) error {
7580
log := logger.FromContext(ctx)
76-
pullAuthConfig := authn.FromConfig(authn.AuthConfig{
81+
pullAuth := authn.FromConfig(authn.AuthConfig{
7782
Username: r.sourceUsername,
7883
Password: r.sourcePassword,
7984
})
80-
pushAuthConfig := authn.FromConfig(authn.AuthConfig{
85+
pushAuth := authn.FromConfig(authn.AuthConfig{
8186
Username: r.remoteUsername,
8287
Password: r.remotePassword,
8388
})
8489

85-
pullOptions := []crane.Option{crane.WithAuth(pullAuthConfig), crane.WithContext(ctx)}
86-
pushOptions := []crane.Option{crane.WithAuth(pushAuthConfig), crane.WithContext(ctx)}
90+
var nameOpts []name.Option
91+
pullOpts := []remote.Option{remote.WithAuth(pullAuth), remote.WithContext(ctx)}
92+
pushOpts := []remote.Option{remote.WithAuth(pushAuth), remote.WithContext(ctx)}
8793

8894
if r.useUnsecure {
89-
pullOptions = append(pullOptions, crane.Insecure)
90-
pushOptions = append(pushOptions, crane.Insecure)
95+
nameOpts = append(nameOpts, name.Insecure)
9196
} else {
9297
transport, err := r.buildTLSTransport()
9398
if err != nil {
9499
return fmt.Errorf("build TLS transport: %w", err)
95100
}
96101
if transport != nil {
97-
pullOptions = append(pullOptions, crane.WithTransport(transport))
102+
pullOpts = append(pullOpts, remote.WithTransport(transport))
103+
pushOpts = append(pushOpts, remote.WithTransport(transport))
98104
}
99105
}
100106

101-
for _, replicationEntity := range replicationEntities {
107+
for _, entity := range replicationEntities {
108+
srcRef := fmt.Sprintf("%s/%s/%s:%s", r.sourceRegistry, entity.GetRepository(), entity.GetName(), entity.GetTag())
109+
dstRef := fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, entity.GetRepository(), entity.GetName(), entity.GetTag())
102110

103-
log.Info().Msgf("Pulling image %s from repository %s at registry %s with tag %s", replicationEntity.GetName(), replicationEntity.GetRepository(), r.sourceRegistry, replicationEntity.GetTag())
104-
// Pull the image from the source registry
105-
srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s:%s", r.sourceRegistry, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), pullOptions...)
111+
src, err := name.ParseReference(srcRef, nameOpts...)
106112
if err != nil {
107-
log.Error().Msgf("Failed to pull image: %v", err)
108-
return err
113+
return fmt.Errorf("parse source ref %s: %w", srcRef, err)
109114
}
110115

111-
// Convert Docker manifest to OCI manifest
112-
ociImage := mutate.MediaType(srcImage, types.OCIManifestSchema1)
116+
dst, err := name.ParseReference(dstRef, nameOpts...)
117+
if err != nil {
118+
return fmt.Errorf("parse dest ref %s: %w", dstRef, err)
119+
}
113120

114-
// Push the converted OCI image to the Zot registry
115-
err = crane.Push(ociImage, fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), pushOptions...)
121+
// Lazy fetch: only the manifest is downloaded, no layer data yet
122+
desc, err := remote.Get(src, pullOpts...)
116123
if err != nil {
117-
log.Error().Msgf("Failed to push image: %v", err)
124+
log.Error().Msgf("Failed to fetch image descriptor: %v", err)
118125
return err
119126
}
120-
log.Info().Msgf("Image %s pushed successfully", replicationEntity.GetName())
121127

128+
img, err := desc.Image()
129+
if err != nil {
130+
log.Error().Msgf("Failed to resolve image: %v", err)
131+
return err
132+
}
133+
134+
// Lazy OCI conversion, no data materialized
135+
ociImage := mutate.MediaType(img, types.OCIManifestSchema1)
136+
137+
// Check if image already exists at destination with same digest
138+
srcDigest, err := ociImage.Digest()
139+
if err != nil {
140+
return fmt.Errorf("compute source digest: %w", err)
141+
}
142+
143+
dstDesc, dstErr := remote.Head(dst, pushOpts...)
144+
if dstErr == nil && dstDesc.Digest == srcDigest {
145+
log.Info().Msgf("Image %s already up-to-date at destination, skipping", entity.GetName())
146+
continue
147+
}
148+
149+
// Log which layers need pulling vs already present
150+
srcLayers, err := ociImage.Layers()
151+
if err != nil {
152+
return fmt.Errorf("get source layers: %w", err)
153+
}
154+
155+
missing := r.countMissingLayers(dst, srcLayers, pushOpts)
156+
log.Info().Msgf("Replicating image %s: %d/%d layers to pull", entity.GetName(), missing, len(srcLayers))
157+
158+
// remote.Write streams layers one-by-one. For each layer it HEAD-checks
159+
// the destination first; only missing blobs are pulled from source.
160+
// Manifest is pushed last.
161+
if err := remote.Write(dst, ociImage, pushOpts...); err != nil {
162+
log.Error().Msgf("Failed to replicate image: %v", err)
163+
return err
164+
}
165+
log.Info().Msgf("Image %s replicated successfully", entity.GetName())
122166
}
123167
return nil
124168
}
125169

170+
// countMissingLayers checks which source layers are absent from the destination
171+
// by comparing against the existing image's layer digests (if any).
172+
func (r *BasicReplicator) countMissingLayers(dst name.Reference, srcLayers []v1.Layer, pushOpts []remote.Option) int {
173+
dstImg, err := remote.Image(dst, pushOpts...)
174+
if err != nil {
175+
// No image at destination, all layers are missing
176+
return len(srcLayers)
177+
}
178+
179+
dstLayers, err := dstImg.Layers()
180+
if err != nil {
181+
return len(srcLayers)
182+
}
183+
184+
existing := make(map[v1.Hash]struct{}, len(dstLayers))
185+
for _, l := range dstLayers {
186+
d, err := l.Digest()
187+
if err != nil {
188+
continue
189+
}
190+
existing[d] = struct{}{}
191+
}
192+
193+
missing := 0
194+
for _, l := range srcLayers {
195+
d, err := l.Digest()
196+
if err != nil {
197+
missing++
198+
continue
199+
}
200+
if _, ok := existing[d]; !ok {
201+
missing++
202+
}
203+
}
204+
return missing
205+
}
206+
126207
func (r *BasicReplicator) DeleteReplicationEntity(ctx context.Context, replicationEntity []Entity) error {
127208
log := logger.FromContext(ctx)
128209
auth := authn.FromConfig(authn.AuthConfig{

0 commit comments

Comments
 (0)