Skip to content

Commit 0f991ce

Browse files
committed
KEP-5334: Image Pull Progress
1 parent 5be7941 commit 0f991ce

File tree

15 files changed

+1327
-567
lines changed

15 files changed

+1327
-567
lines changed

pkg/kubelet/container/runtime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ type StreamingRuntime interface {
151151
GetExec(ctx context.Context, id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
152152
GetAttach(ctx context.Context, id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
153153
GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
154+
GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID) (*url.URL, error)
154155
}
155156

156157
// ImageService interfaces allows to work with image service.

pkg/kubelet/container/testing/fake_runtime.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,14 @@ func (f *FakeStreamingRuntime) GetPortForward(_ context.Context, podName, podNam
511511
return &url.URL{Host: FakeHost}, f.Err
512512
}
513513

514+
func (f *FakeStreamingRuntime) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID) (*url.URL, error) {
515+
f.Lock()
516+
defer f.Unlock()
517+
518+
f.CalledFunctions = append(f.CalledFunctions, "GetImagePullProgress")
519+
return &url.URL{Host: FakeHost}, f.Err
520+
}
521+
514522
type FakeContainerCommandRunner struct {
515523
// what to return
516524
Stdout string

pkg/kubelet/kubelet_pods.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
utilfeature "k8s.io/apiserver/pkg/util/feature"
4545
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
4646
"k8s.io/klog/v2"
47+
"k8s.io/kubelet/pkg/cri/streaming/imagepullprogress"
4748
"k8s.io/kubelet/pkg/cri/streaming/portforward"
4849
remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
4950
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@@ -2450,6 +2451,24 @@ func (kl *Kubelet) GetPortForward(ctx context.Context, podName, podNamespace str
24502451
return kl.streamingRuntime.GetPortForward(ctx, podName, podNamespace, podUID, portForwardOpts.Ports)
24512452
}
24522453

2454+
// GetImagePullProgress gets the URL the image-pull-progress will be served from, or nil if the Kubelet will serve it.
2455+
func (kl *Kubelet) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts imagepullprogress.Options) (*url.URL, error) {
2456+
pods, err := kl.containerRuntime.GetPods(ctx, false)
2457+
if err != nil {
2458+
return nil, err
2459+
}
2460+
// Resolve and type convert back again.
2461+
// We need the static pod UID but the kubecontainer API works with types.UID.
2462+
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
2463+
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
2464+
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
2465+
if pod.IsEmpty() {
2466+
return nil, fmt.Errorf("pod not found (%q)", podFullName)
2467+
}
2468+
2469+
return kl.streamingRuntime.GetImagePullProgress(ctx, podName, podNamespace, podUID)
2470+
}
2471+
24532472
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist.
24542473
// it reconciles the cached state of cgroupPods with the specified list of runningPods
24552474
func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupPods map[types.UID]cm.CgroupName, possiblyRunningPods map[types.UID]sets.Empty) {

pkg/kubelet/kuberuntime/instrumented_services.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,15 @@ func (in instrumentedRuntimeService) PortForward(ctx context.Context, req *runti
272272
return resp, err
273273
}
274274

275+
func (in instrumentedRuntimeService) ImagePullProgress(ctx context.Context, req *runtimeapi.ImagePullProgressRequest) (*runtimeapi.ImagePullProgressResponse, error) {
276+
const operation = "image_pull_progress"
277+
defer recordOperation(operation, time.Now())
278+
279+
resp, err := in.service.ImagePullProgress(ctx, req)
280+
recordError(operation, err)
281+
return resp, err
282+
}
283+
275284
func (in instrumentedRuntimeService) UpdatePodSandboxResources(ctx context.Context, req *runtimeapi.UpdatePodSandboxResourcesRequest) (*runtimeapi.UpdatePodSandboxResourcesResponse, error) {
276285
const operation = "update_podsandbox_resources"
277286
defer recordOperation(operation, time.Now())

pkg/kubelet/kuberuntime/kuberuntime_sandbox.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,22 @@ func (m *kubeGenericRuntimeManager) GetPortForward(ctx context.Context, podName,
380380
}
381381
return url.Parse(resp.Url)
382382
}
383+
384+
// GetImagePullProgress gets the endpoint the runtime will serve the image-pull-progress request from.
385+
func (m *kubeGenericRuntimeManager) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) {
386+
sandboxIDs, err := m.getSandboxIDByPodUID(ctx, podUID, nil)
387+
if err != nil {
388+
return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)
389+
}
390+
if len(sandboxIDs) == 0 {
391+
return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))
392+
}
393+
req := &runtimeapi.ImagePullProgressRequest{
394+
PodSandboxId: sandboxIDs[0],
395+
}
396+
resp, err := m.runtimeService.ImagePullProgress(ctx, req)
397+
if err != nil {
398+
return nil, err
399+
}
400+
return url.Parse(resp.Url)
401+
}

pkg/kubelet/server/server_test.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ import (
5757
// Do some initialization to decode the query parameters correctly.
5858
"k8s.io/apiserver/pkg/server/healthz"
5959
utilfeature "k8s.io/apiserver/pkg/util/feature"
60+
imagepullprogress "k8s.io/client-go/tools/imagepullprogress"
6061
featuregatetesting "k8s.io/component-base/featuregate/testing"
6162
zpagesfeatures "k8s.io/component-base/zpages/features"
6263
"k8s.io/component-base/zpages/flagz"
6364
"k8s.io/kubelet/pkg/cri/streaming"
65+
imagepullprogressserver "k8s.io/kubelet/pkg/cri/streaming/imagepullprogress"
6466
"k8s.io/kubelet/pkg/cri/streaming/portforward"
6567
remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
6668
_ "k8s.io/kubernetes/pkg/apis/core/install"
@@ -78,15 +80,16 @@ const (
7880
)
7981

8082
type fakeKubelet struct {
81-
podByNameFunc func(namespace, name string) (*v1.Pod, bool)
82-
machineInfoFunc func() (*cadvisorapi.MachineInfo, error)
83-
podsFunc func() []*v1.Pod
84-
runningPodsFunc func(ctx context.Context) ([]*v1.Pod, error)
85-
logFunc func(w http.ResponseWriter, req *http.Request)
86-
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
87-
getExecCheck func(string, types.UID, string, []string, remotecommandserver.Options)
88-
getAttachCheck func(string, types.UID, string, remotecommandserver.Options)
89-
getPortForwardCheck func(string, string, types.UID, portforward.V4Options)
83+
podByNameFunc func(namespace, name string) (*v1.Pod, bool)
84+
machineInfoFunc func() (*cadvisorapi.MachineInfo, error)
85+
podsFunc func() []*v1.Pod
86+
runningPodsFunc func(ctx context.Context) ([]*v1.Pod, error)
87+
logFunc func(w http.ResponseWriter, req *http.Request)
88+
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
89+
getExecCheck func(string, types.UID, string, []string, remotecommandserver.Options)
90+
getAttachCheck func(string, types.UID, string, remotecommandserver.Options)
91+
getPortForwardCheck func(string, string, types.UID, portforward.V4Options)
92+
getImagePullProgressCheck func(podName, podNamespace string, podUID types.UID, imagePullProgressOpts imagepullprogressserver.Options)
9093

9194
containerLogsFunc func(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
9295
hostnameFunc func() string
@@ -169,9 +172,10 @@ func (fk *fakeKubelet) SyncLoopHealthCheck(req *http.Request) error {
169172
}
170173

171174
type fakeRuntime struct {
172-
execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
173-
attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
174-
portForwardFunc func(string, int32, io.ReadWriteCloser) error
175+
execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
176+
attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
177+
portForwardFunc func(string, int32, io.ReadWriteCloser) error
178+
imagePullProgressFunc func(string, chan<- imagepullprogress.Progress) error
175179
}
176180

177181
func (f *fakeRuntime) Exec(_ context.Context, containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
@@ -186,6 +190,10 @@ func (f *fakeRuntime) PortForward(_ context.Context, podSandboxID string, port i
186190
return f.portForwardFunc(podSandboxID, port, stream)
187191
}
188192

193+
func (f *fakeRuntime) ImagePullProgress(_ context.Context, podSandboxID string, progress chan<- imagepullprogress.Progress) error {
194+
return f.imagePullProgressFunc(podSandboxID, progress)
195+
}
196+
189197
type testStreamingServer struct {
190198
streaming.Server
191199
fakeRuntime *fakeRuntime
@@ -273,6 +281,20 @@ func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace
273281
return url.Parse(resp.GetUrl())
274282
}
275283

284+
func (fk *fakeKubelet) GetImagePullProgress(ctx context.Context, podName, podNamespace string, podUID types.UID, imagePullProgressOpts imagepullprogressserver.Options) (*url.URL, error) {
285+
if fk.getImagePullProgressCheck != nil {
286+
fk.getImagePullProgressCheck(podName, podNamespace, podUID, imagePullProgressOpts)
287+
}
288+
// Always use testPodSandboxID
289+
resp, err := fk.streamingRuntime.GetImagePullProgress(&runtimeapi.ImagePullProgressRequest{
290+
PodSandboxId: testPodSandboxID,
291+
})
292+
if err != nil {
293+
return nil, err
294+
}
295+
return url.Parse(resp.GetUrl())
296+
}
297+
276298
// Unused functions
277299
func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
278300
func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package imagepullprogress
18+
19+
type Progress struct {
20+
Name string `json:"name"`
21+
Progress int64 `json:"progress"`
22+
Total int64 `json:"total"`
23+
Error string `json:"error,omitempty"`
24+
}

0 commit comments

Comments
 (0)