Skip to content

Add support for zstd layer upload #1827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/crane/cmd/flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func flattenImage(old v1.Image, repo name.Repository, use string, o crane.Option
}

// TODO: Make compression configurable?
layer := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
layer, err := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
if err != nil {
return nil, fmt.Errorf("new layer: %w", err)
}

if err := remote.WriteLayer(repo, layer, o.Remote...); err != nil {
return nil, fmt.Errorf("uploading layer: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/zstd/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ func UnzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) {
}, nil
}

func NewReader(r io.Reader) (io.Reader, error) {
return zstd.NewReader(r)
}

func NewWriterLevel(w io.Writer, level int) (*zstd.Encoder, error) {
return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(level)))
}

// Is detects whether the input stream is compressed.
func Is(r io.Reader) (bool, error) {
magicHeader := make([]byte, 4)
Expand Down
32 changes: 32 additions & 0 deletions pkg/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// Package compression abstracts over gzip and zstd.
package compression

import (
"fmt"

"github.com/google/go-containerregistry/pkg/v1/types"
)

// Compression is an enumeration of the supported compression algorithms
type Compression string

Expand All @@ -24,3 +30,29 @@ const (
GZip Compression = "gzip"
ZStd Compression = "zstd"
)

func (compression Compression) ToMediaType(oci bool) (types.MediaType, error) {
if oci {
switch compression {
case ZStd:
return types.OCILayerZStd, nil
case GZip:
return types.OCILayer, nil
case None:
return types.OCIUncompressedLayer, nil
default:
return types.OCILayer, fmt.Errorf("unsupported compression: %s", compression)
}
} else {
switch compression {
case ZStd:
return types.DockerLayerZstd, nil
case GZip:
return types.DockerLayer, nil
case None:
return types.DockerUncompressedLayer, nil
default:
return types.DockerLayer, fmt.Errorf("unsupported compression: %s", compression)
}
}
}
14 changes: 5 additions & 9 deletions pkg/crane/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
return nil, fmt.Errorf("getting base image media type: %w", err)
}

layerType := types.DockerLayer

if baseMediaType == types.OCIManifestSchema1 {
layerType = types.OCILayer
}
oci := baseMediaType == types.OCIManifestSchema1

layers := make([]v1.Layer, 0, len(paths))
for _, path := range paths {
layer, err := getLayer(path, layerType)
layer, err := getLayer(path, oci)
if err != nil {
return nil, fmt.Errorf("reading layer %q: %w", path, err)
}
Expand All @@ -81,16 +77,16 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
return mutate.AppendLayers(base, layers...)
}

func getLayer(path string, layerType types.MediaType) (v1.Layer, error) {
func getLayer(path string, oci bool) (v1.Layer, error) {
f, err := streamFile(path)
if err != nil {
return nil, err
}
if f != nil {
return stream.NewLayer(f, stream.WithMediaType(layerType)), nil
return stream.NewLayer(f, stream.WithOCIMediaType(oci))
}

return tarball.LayerFromFile(path, tarball.WithMediaType(layerType))
return tarball.LayerFromFile(path, tarball.WithOCIMediaType(oci))
}

// If we're dealing with a named pipe, trying to open it multiple times will
Expand Down
6 changes: 5 additions & 1 deletion pkg/v1/layout/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,12 @@ func TestStreamingWriteLayer(t *testing.T) {
return tw.Close()
}())
}()
layer, err := stream.NewLayer(pr)
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
img, err := mutate.Append(empty.Image, mutate.Addendum{
Layer: stream.NewLayer(pr),
Layer: layer,
})
if err != nil {
t.Fatalf("creating random streaming image failed: %v", err)
Expand Down
19 changes: 13 additions & 6 deletions pkg/v1/mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,19 @@ func TestMutateMediaType(t *testing.T) {
}

func TestAppendStreamableLayer(t *testing.T) {
img, err := mutate.AppendLayers(
sourceImage(t),
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("a", 100)))),
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("b", 100)))),
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("c", 100)))),
)
l1, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("a", 100))))
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
l2, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("b", 100))))
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
l3, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("c", 100))))
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
img, err := mutate.AppendLayers(sourceImage(t), l1, l2, l3)
if err != nil {
t.Fatalf("AppendLayers: %v", err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/v1/remote/multi_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ func streamable(t *testing.T) v1.Layer {
t.Fatalf("Uncompressed(): %v", err)
}

return stream.NewLayer(rc)
l, err := stream.NewLayer(rc)
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}

return l
}

type rawManifest struct {
Expand Down
15 changes: 12 additions & 3 deletions pkg/v1/remote/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,10 @@ func TestDedupeLayers(t *testing.T) {
// Append three identical stream.Layers, whose uploads will *not* be
// deduped since Write can't tell they're identical ahead of time.
for i := 0; i < 3; i++ {
sl := stream.NewLayer(newBlob())
sl, err := stream.NewLayer(newBlob())
if err != nil {
t.Fatalf("stream.NewLayer(#%d): %v", i, err)
}
img, err = mutate.AppendLayers(img, sl)
if err != nil {
t.Fatalf("mutate.AppendLayer(#%d): %v", i, err)
Expand Down Expand Up @@ -697,7 +700,10 @@ func TestStreamLayer(t *testing.T) {
defer closer.Close()

streamLocation := w.url(expectedPath)
sl := stream.NewLayer(newBlob())
sl, err := stream.NewLayer(newBlob())
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}

commitLocation, err := w.streamBlob(context.Background(), sl, streamLocation.String())
if err != nil {
Expand Down Expand Up @@ -856,7 +862,10 @@ func TestUploadOneStreamedLayer(t *testing.T) {
newBlob := func() io.ReadCloser { return io.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, int(n)))) }
wantDigest := "sha256:3d7c465be28d9e1ed810c42aeb0e747b44441424f566722ba635dc93c947f30e"
wantDiffID := "sha256:27dd1f61b867b6a0f6e9d8a41c43231de52107e53ae424de8f847b821db4b711"
l := stream.NewLayer(newBlob())
l, err := stream.NewLayer(newBlob())
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
if err := w.uploadOne(ctx, l); err != nil {
t.Fatalf("uploadOne: %v", err)
}
Expand Down
95 changes: 73 additions & 22 deletions pkg/v1/stream/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"os"
"sync"

internalcomp "github.com/google/go-containerregistry/internal/compression"
"github.com/google/go-containerregistry/internal/zstd"
"github.com/google/go-containerregistry/pkg/compression"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
)
Expand All @@ -42,14 +45,19 @@ var (

// Layer is a streaming implementation of v1.Layer.
type Layer struct {
blob io.ReadCloser
consumed bool
compression int
closer io.Closer
uncompressedReader io.Reader

consumed bool

compression compression.Compression
compressionLevel int

mu sync.Mutex
digest, diffID *v1.Hash
size int64
mediaType types.MediaType

oci bool
}

var _ v1.Layer = (*Layer)(nil)
Expand All @@ -60,32 +68,54 @@ type LayerOption func(*Layer)
// WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values.
func WithCompressionLevel(level int) LayerOption {
return func(l *Layer) {
l.compression = level
l.compressionLevel = level
}
}

// WithMediaType is a functional option for overriding the layer's media type.
func WithMediaType(mt types.MediaType) LayerOption {
// WithOCIMediaType is a functional option for overriding the layer's media type.
func WithOCIMediaType(oci bool) LayerOption {
return func(l *Layer) {
l.mediaType = mt
l.oci = oci
}
}

// NewLayer creates a Layer from an io.ReadCloser.
func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer {
func NewLayer(rc io.ReadCloser, opts ...LayerOption) (*Layer, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to avoid this breaking API change if we can.

We could add an err field to the Layer struct and return it on the first access to any of methods. (I don't love it, but I also don't want to break stuff.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so weird... :D When later someone adds a method to Layer, they will definitely forget about this hack and introduce a bug. And what do we do if Layer has a method that doesn't return error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So. Do you really want me to do this and this is the only thing preventing this PR from being merged?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Friendly Ping :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonjohnsonjr if I pop open a new PR with the requested changes can I get buy-in that we'll try and get this feature added?

comp, peekReader, err := internalcomp.PeekCompression(rc)
if err != nil {
return nil, err
}

layer := &Layer{
blob: rc,
compression: gzip.BestSpeed,
// We use DockerLayer for now as uncompressed layers
// are unimplemented
mediaType: types.DockerLayer,
closer: rc,
compression: comp,
compressionLevel: gzip.BestSpeed,
}

switch comp {
case compression.ZStd:
layer.compression = comp
layer.uncompressedReader, err = zstd.NewReader(peekReader)
if err != nil {
return nil, err
}
case compression.GZip:
layer.compression = comp
layer.uncompressedReader, err = gzip.NewReader(peekReader)
if err != nil {
return nil, err
}
default:
// No support for uncompressed layers for now
layer.compression = compression.GZip
layer.uncompressedReader = peekReader
}

for _, opt := range opts {
opt(layer)
}

return layer
return layer, nil
}

// Digest implements v1.Layer.
Expand Down Expand Up @@ -120,7 +150,7 @@ func (l *Layer) Size() (int64, error) {

// MediaType implements v1.Layer
func (l *Layer) MediaType() (types.MediaType, error) {
return l.mediaType, nil
return l.compression.ToMediaType(l.oci)
}

// Uncompressed implements v1.Layer.
Expand Down Expand Up @@ -183,9 +213,27 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
// 64K ought to be small enough for anybody.
bw := bufio.NewWriterSize(mw, 2<<16)
zw, err := gzip.NewWriterLevel(bw, l.compression)
if err != nil {
return nil, err

var compressedWriter io.Writer
var compressedCloser io.Closer

switch l.compression {
case compression.ZStd:
w, err := zstd.NewWriterLevel(bw, l.compressionLevel)
if err != nil {
return nil, err
}
compressedWriter = w
compressedCloser = w
case compression.GZip:
w, err := gzip.NewWriterLevel(bw, l.compressionLevel)
if err != nil {
return nil, err
}
compressedWriter = w
compressedCloser = w
case compression.None:
compressedWriter = bw
}

doneDigesting := make(chan struct{})
Expand All @@ -211,7 +259,7 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
//
// NOTE: net/http will call close on success, so if we've already
// closed the inner rc, it's not an error.
if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
if err := l.closer.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
return err
}

Expand All @@ -223,13 +271,16 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
go func() {
// Copy blob into the gzip writer, which also hashes and counts the
// size of the compressed output, and hasher of the raw contents.
_, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob)
_, copyErr := io.Copy(io.MultiWriter(h, compressedWriter), l.uncompressedReader)

// Close the gzip writer once copying is done. If this is done in the
// Close method of compressedReader instead, then it can cause a panic
// when the compressedReader is closed before the blob is fully
// consumed and io.Copy in this goroutine is still blocking.
closeErr := zw.Close()
var closeErr error
if compressedCloser != nil {
closeErr = compressedCloser.Close()
}

// Check errors from writing and closing streams.
if copyErr != nil {
Expand Down
Loading