Skip to content

Commit 5a77d7b

Browse files
committed
Fix fialed to watch context canceled error when streaming logs
When streaming pipelineRun or taskRun logs, closing the stop channel to terminate the informer was causing "Failed to watch" error messages to be logged with "context canceled". This patch add custom watchErrorHandler function that filter out context.Canceled errors while passing other errors to the default handler. This prevents the error log messages when the CLI intentionally stops watching after a run completes. Signed-off-by: Shiv Verma <[email protected]>
1 parent a6c7c25 commit 5a77d7b

File tree

4 files changed

+86
-3
lines changed

4 files changed

+86
-3
lines changed

pkg/pipelinerun/tracker.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package pipelinerun
1616

1717
import (
1818
"context"
19+
"errors"
1920
"sync"
2021
"time"
2122

@@ -28,7 +29,7 @@ import (
2829
corev1 "k8s.io/api/core/v1"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/apimachinery/pkg/fields"
31-
"k8s.io/client-go/tools/cache"
32+
k8scache "k8s.io/client-go/tools/cache"
3233
)
3334

3435
// Tracker tracks the progress of a PipelineRun
@@ -68,6 +69,11 @@ func (t *Tracker) Monitor(allowed []string) <-chan []taskrunpkg.Run {
6869

6970
genericInformer, _ := factory.ForResource(*gvr)
7071
informer := genericInformer.Informer()
72+
73+
// Set a custom watch error handler that ignores context.Canceled errors
74+
// to prevent "Failed to watch" log messages when the informer is stopped intentionally
75+
_ = informer.SetWatchErrorHandlerWithContext(watchErrorHandler)
76+
7177
mu := &sync.Mutex{}
7278
stopC := make(chan struct{})
7379
trC := make(chan []taskrunpkg.Run)
@@ -105,7 +111,7 @@ func (t *Tracker) Monitor(allowed []string) <-chan []taskrunpkg.Run {
105111
}
106112

107113
_, err := informer.AddEventHandler(
108-
cache.ResourceEventHandlerFuncs{
114+
k8scache.ResourceEventHandlerFuncs{
109115
AddFunc: func(obj interface{}) {
110116
// To ensure synchonization and checks is the stopC channel has received a signal to stop
111117
// If it receives a signal then return and does nothing
@@ -154,7 +160,15 @@ func pipelinerunOpts(name string) func(opts *metav1.ListOptions) {
154160
return func(opts *metav1.ListOptions) {
155161
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
156162
}
163+
}
157164

165+
// watchErrorHandler is a custom watch error handler that filters out context.Canceled errors
166+
// to prevent "Failed to watch" log messages when the informer is stopped intentionally.
167+
// Other errors are passed to the default handler.
168+
func watchErrorHandler(ctx context.Context, r *k8scache.Reflector, err error) {
169+
if !errors.Is(err, context.Canceled) {
170+
k8scache.DefaultWatchErrorHandler(ctx, r, err)
171+
}
158172
}
159173

160174
// handles changes to pipelinerun and pushes the Run information to the

pkg/pipelinerun/tracker_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package pipelinerun
1616

1717
import (
18+
"context"
19+
"errors"
1820
"fmt"
1921
"testing"
2022
"time"
@@ -239,3 +241,28 @@ func startPipelineRun(t *testing.T, data pipelinetest.Data, prStatus ...v1.Pipel
239241
Dynamic: dynamic,
240242
}
241243
}
244+
245+
func TestTracker_watchErrorHandler(t *testing.T) {
246+
tests := []struct {
247+
name string
248+
err error
249+
}{
250+
{
251+
name: "context.Canceled should be filtered",
252+
err: context.Canceled,
253+
},
254+
{
255+
name: "wrapped context.Canceled should be filtered",
256+
err: errors.Join(errors.New("watch failed"), context.Canceled),
257+
},
258+
}
259+
260+
for _, tt := range tests {
261+
t.Run(tt.name, func(_ *testing.T) {
262+
// Call watchErrorHandler with context.Canceled errors
263+
// These should be filtered (not passed to DefaultWatchErrorHandler)
264+
// so passing nil reflector is safe
265+
watchErrorHandler(context.Background(), nil, tt.err)
266+
})
267+
}
268+
}

pkg/pods/pod.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package pods
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"io"
2122
"sync"
@@ -115,7 +116,12 @@ func (p *Pod) watcher(stopC chan struct{}, result *podResult, mu *sync.Mutex) {
115116
}
116117
}
117118

118-
_, err := factory.Core().V1().Pods().Informer().AddEventHandler(
119+
informer := factory.Core().V1().Pods().Informer()
120+
// Set a custom watch error handler that ignores context.Canceled errors
121+
// to prevent "Failed to watch" log messages when the informer is stopped intentionally
122+
_ = informer.SetWatchErrorHandlerWithContext(watchErrorHandler)
123+
124+
_, err := informer.AddEventHandler(
119125
cache.ResourceEventHandlerFuncs{
120126
AddFunc: func(obj interface{}) {
121127
select {
@@ -156,6 +162,15 @@ func podOpts(name string) func(opts *metav1.ListOptions) {
156162
}
157163
}
158164

165+
// watchErrorHandler is a custom watch error handler that filters out context.Canceled errors
166+
// to prevent "Failed to watch" log messages when the informer is stopped intentionally.
167+
// Other errors are passed to the default handler.
168+
func watchErrorHandler(ctx context.Context, r *cache.Reflector, err error) {
169+
if !errors.Is(err, context.Canceled) {
170+
cache.DefaultWatchErrorHandler(ctx, r, err)
171+
}
172+
}
173+
159174
func checkPodStatus(obj interface{}) (*corev1.Pod, error) {
160175
pod, ok := obj.(*corev1.Pod)
161176
if !ok {

pkg/pods/pod_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package pods
1616

1717
import (
18+
"context"
19+
"errors"
1820
"testing"
1921
"time"
2022

@@ -212,3 +214,28 @@ func simulateDeleteWatch(t *testing.T, initial *corev1.Pod, later *corev1.Pod) k
212214

213215
return clients.Kube
214216
}
217+
218+
func Test_watchErrorHandler(t *testing.T) {
219+
tests := []struct {
220+
name string
221+
err error
222+
}{
223+
{
224+
name: "context.Canceled should be filtered",
225+
err: context.Canceled,
226+
},
227+
{
228+
name: "wrapped context.Canceled should be filtered",
229+
err: errors.Join(errors.New("watch failed"), context.Canceled),
230+
},
231+
}
232+
233+
for _, tt := range tests {
234+
t.Run(tt.name, func(_ *testing.T) {
235+
// Call watchErrorHandler with context.Canceled errors
236+
// These should be filtered (not passed to DefaultWatchErrorHandler)
237+
// so passing nil reflector is safe
238+
watchErrorHandler(context.Background(), nil, tt.err)
239+
})
240+
}
241+
}

0 commit comments

Comments
 (0)