Skip to content

Commit cefa3c8

Browse files
committed
Add State Persistence for Crash Recovery (container-registry#321)
Signed-off-by: bupd <bupdprasanth@gmail.com>
1 parent cb610dd commit cefa3c8

File tree

10 files changed

+968
-41
lines changed

10 files changed

+968
-41
lines changed

Taskfile.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ tasks:
8383
cmds:
8484
- task: _e2e:test-spiffe
8585

86+
e2e-crash-recovery:
87+
desc: Run crash recovery E2E test
88+
cmds:
89+
- task: _e2e:test-crash-recovery
90+
91+
e2e-crash-recovery-byo:
92+
desc: Run crash recovery BYO registry E2E test
93+
cmds:
94+
- task: _e2e:test-crash-recovery-byo
95+
8696
publish:
8797
desc: "Publish both components to registry"
8898
aliases: [p]

cmd/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func run(opts SatelliteOptions, pathConfig *config.PathConfig, shutdownTimeout s
249249
}
250250
})
251251

252-
s := satellite.NewSatellite(cm)
252+
s := satellite.NewSatellite(cm, pathConfig.StateFile)
253253
err = s.Run(ctx)
254254
if err != nil {
255255
return fmt.Errorf("unable to start satellite: %w", err)

internal/satellite/satellite.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,24 @@ import (
1010
)
1111

1212
type Satellite struct {
13-
cm *config.ConfigManager
14-
schedulers []*scheduler.Scheduler
13+
cm *config.ConfigManager
14+
schedulers []*scheduler.Scheduler
15+
stateFilePath string
1516
}
1617

17-
func NewSatellite(cm *config.ConfigManager) *Satellite {
18+
func NewSatellite(cm *config.ConfigManager, stateFilePath string) *Satellite {
1819
return &Satellite{
19-
cm: cm,
20-
schedulers: make([]*scheduler.Scheduler, 0),
20+
cm: cm,
21+
schedulers: make([]*scheduler.Scheduler, 0),
22+
stateFilePath: stateFilePath,
2123
}
2224
}
2325

2426
func (s *Satellite) Run(ctx context.Context) error {
2527
log := logger.FromContext(ctx)
2628
log.Info().Msg("Starting Satellite")
2729

28-
fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(s.cm)
30+
fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(s.cm, s.stateFilePath, log)
2931

3032
// Create ZTR scheduler if not already done
3133
if !s.cm.IsZTRDone() {

internal/state/replicator.go

Lines changed: 106 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import (
1111
"github.com/container-registry/harbor-satellite/pkg/config"
1212
"github.com/google/go-containerregistry/pkg/authn"
1313
"github.com/google/go-containerregistry/pkg/crane"
14+
"github.com/google/go-containerregistry/pkg/name"
15+
v1 "github.com/google/go-containerregistry/pkg/v1"
1416
"github.com/google/go-containerregistry/pkg/v1/mutate"
17+
"github.com/google/go-containerregistry/pkg/v1/remote"
1518
"github.com/google/go-containerregistry/pkg/v1/types"
1619
)
1720

@@ -52,10 +55,10 @@ func NewBasicReplicatorWithTLS(sourceUsername, sourcePassword, sourceRegistry, r
5255

5356
// Entity represents an image or artifact which needs to be handled by the replicator
5457
type Entity struct {
55-
Name string
56-
Repository string
57-
Tag string
58-
Digest string
58+
Name string `json:"name"`
59+
Repository string `json:"repository"`
60+
Tag string `json:"tag"`
61+
Digest string `json:"digest"`
5962
}
6063

6164
func (e Entity) GetName() string {
@@ -70,35 +73,38 @@ func (e Entity) GetTag() string {
7073
return e.Tag
7174
}
7275

73-
// Replicate replicates images from the source registry to the Zot registry.
76+
// Replicate replicates images from the source registry to the local registry.
77+
// Before pulling, it checks which blobs already exist at the destination and
78+
// only downloads missing layers from source, saving bandwidth on crash recovery.
7479
func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []Entity) error {
7580
log := logger.FromContext(ctx)
76-
pullAuthConfig := authn.FromConfig(authn.AuthConfig{
81+
pullAuth := authn.FromConfig(authn.AuthConfig{
7782
Username: r.sourceUsername,
7883
Password: r.sourcePassword,
7984
})
80-
pushAuthConfig := authn.FromConfig(authn.AuthConfig{
85+
pushAuth := authn.FromConfig(authn.AuthConfig{
8186
Username: r.remoteUsername,
8287
Password: r.remotePassword,
8388
})
8489

85-
pullOptions := []crane.Option{crane.WithAuth(pullAuthConfig), crane.WithContext(ctx)}
86-
pushOptions := []crane.Option{crane.WithAuth(pushAuthConfig), crane.WithContext(ctx)}
90+
var nameOpts []name.Option
91+
pullOpts := []remote.Option{remote.WithAuth(pullAuth), remote.WithContext(ctx)}
92+
pushOpts := []remote.Option{remote.WithAuth(pushAuth), remote.WithContext(ctx)}
8793

8894
if r.useUnsecure {
89-
pullOptions = append(pullOptions, crane.Insecure)
90-
pushOptions = append(pushOptions, crane.Insecure)
95+
nameOpts = append(nameOpts, name.Insecure)
9196
} else {
9297
transport, err := r.buildTLSTransport()
9398
if err != nil {
9499
return fmt.Errorf("build TLS transport: %w", err)
95100
}
96101
if transport != nil {
97-
pullOptions = append(pullOptions, crane.WithTransport(transport))
102+
pullOpts = append(pullOpts, remote.WithTransport(transport))
103+
pushOpts = append(pushOpts, remote.WithTransport(transport))
98104
}
99105
}
100106

101-
for _, replicationEntity := range replicationEntities {
107+
for _, entity := range replicationEntities {
102108
// Check context cancellation before processing each image
103109
select {
104110
case <-ctx.Done():
@@ -107,29 +113,105 @@ func (r *BasicReplicator) Replicate(ctx context.Context, replicationEntities []E
107113
default:
108114
}
109115

110-
log.Info().Msgf("Pulling image %s from repository %s at registry %s with tag %s", replicationEntity.GetName(), replicationEntity.GetRepository(), r.sourceRegistry, replicationEntity.GetTag())
111-
// Pull the image from the source registry
112-
srcImage, err := crane.Pull(fmt.Sprintf("%s/%s/%s:%s", r.sourceRegistry, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), pullOptions...)
116+
srcRef := fmt.Sprintf("%s/%s/%s:%s", r.sourceRegistry, entity.GetRepository(), entity.GetName(), entity.GetTag())
117+
dstRef := fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, entity.GetRepository(), entity.GetName(), entity.GetTag())
118+
119+
src, err := name.ParseReference(srcRef, nameOpts...)
113120
if err != nil {
114-
log.Error().Msgf("Failed to pull image: %v", err)
115-
return err
121+
return fmt.Errorf("parse source ref %s: %w", srcRef, err)
116122
}
117123

118-
// Convert Docker manifest to OCI manifest
119-
ociImage := mutate.MediaType(srcImage, types.OCIManifestSchema1)
124+
dst, err := name.ParseReference(dstRef, nameOpts...)
125+
if err != nil {
126+
return fmt.Errorf("parse dest ref %s: %w", dstRef, err)
127+
}
120128

121-
// Push the converted OCI image to the Zot registry
122-
err = crane.Push(ociImage, fmt.Sprintf("%s/%s/%s:%s", r.remoteRegistryURL, replicationEntity.GetRepository(), replicationEntity.GetName(), replicationEntity.GetTag()), pushOptions...)
129+
// Lazy fetch: only the manifest is downloaded, no layer data yet
130+
desc, err := remote.Get(src, pullOpts...)
123131
if err != nil {
124-
log.Error().Msgf("Failed to push image: %v", err)
132+
log.Error().Msgf("Failed to fetch image descriptor: %v", err)
125133
return err
126134
}
127-
log.Info().Msgf("Image %s pushed successfully", replicationEntity.GetName())
128135

136+
img, err := desc.Image()
137+
if err != nil {
138+
log.Error().Msgf("Failed to resolve image: %v", err)
139+
return err
140+
}
141+
142+
// Lazy OCI conversion, no data materialized
143+
ociImage := mutate.MediaType(img, types.OCIManifestSchema1)
144+
145+
// Check if image already exists at destination with same digest
146+
srcDigest, err := ociImage.Digest()
147+
if err != nil {
148+
return fmt.Errorf("compute source digest: %w", err)
149+
}
150+
151+
dstDesc, dstErr := remote.Head(dst, pushOpts...)
152+
if dstErr == nil && dstDesc.Digest == srcDigest {
153+
log.Info().Msgf("Image %s already up-to-date at destination, skipping", entity.GetName())
154+
continue
155+
}
156+
157+
// Log which layers need pulling vs already present
158+
srcLayers, err := ociImage.Layers()
159+
if err != nil {
160+
return fmt.Errorf("get source layers: %w", err)
161+
}
162+
163+
missing := r.countMissingLayers(dst, srcLayers, pushOpts)
164+
log.Info().Msgf("Replicating image %s: %d/%d layers to pull", entity.GetName(), missing, len(srcLayers))
165+
166+
// remote.Write streams layers one-by-one. For each layer it HEAD-checks
167+
// the destination first; only missing blobs are pulled from source.
168+
// Manifest is pushed last.
169+
if err := remote.Write(dst, ociImage, pushOpts...); err != nil {
170+
log.Error().Msgf("Failed to replicate image: %v", err)
171+
return err
172+
}
173+
log.Info().Msgf("Image %s replicated successfully", entity.GetName())
129174
}
130175
return nil
131176
}
132177

178+
// countMissingLayers checks which source layers are absent from the destination
179+
// by comparing against the existing image's layer digests (if any).
180+
func (r *BasicReplicator) countMissingLayers(dst name.Reference, srcLayers []v1.Layer, pushOpts []remote.Option) int {
181+
dstImg, err := remote.Image(dst, pushOpts...)
182+
if err != nil {
183+
// No image at destination, all layers are missing
184+
return len(srcLayers)
185+
}
186+
187+
dstLayers, err := dstImg.Layers()
188+
if err != nil {
189+
return len(srcLayers)
190+
}
191+
192+
existing := make(map[v1.Hash]struct{}, len(dstLayers))
193+
for _, l := range dstLayers {
194+
d, err := l.Digest()
195+
if err != nil {
196+
continue
197+
}
198+
existing[d] = struct{}{}
199+
}
200+
201+
missing := 0
202+
for _, l := range srcLayers {
203+
d, err := l.Digest()
204+
if err != nil {
205+
missing++
206+
continue
207+
}
208+
if _, ok := existing[d]; !ok {
209+
missing++
210+
}
211+
}
212+
return missing
213+
}
214+
133215
func (r *BasicReplicator) DeleteReplicationEntity(ctx context.Context, replicationEntity []Entity) error {
134216
log := logger.FromContext(ctx)
135217
auth := authn.FromConfig(authn.AuthConfig{

0 commit comments

Comments
 (0)