@@ -14,6 +14,7 @@ import (
14
14
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15
15
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
16
16
"k8s.io/apimachinery/pkg/types"
17
+ "k8s.io/apimachinery/pkg/util/sets"
17
18
"k8s.io/client-go/rest"
18
19
"sigs.k8s.io/controller-runtime/pkg/cache"
19
20
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -47,6 +48,10 @@ var _ = Describe("NewClusterAwareClient", Ordered, func() {
47
48
return
48
49
}
49
50
fallthrough
51
+ case "/api/v1/namespaces/default/pods/foo" , "/clusters/root/api/v1/namespaces/default/pods/foo" :
52
+ w .Header ().Set ("Content-Type" , "application/json" )
53
+ w .WriteHeader (http .StatusOK )
54
+ _ , _ = w .Write ([]byte (`{"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126176"}}` ))
50
55
default :
51
56
_ , _ = w .Write ([]byte (fmt .Sprintf ("Not found %q" , req .RequestURI )))
52
57
w .WriteHeader (http .StatusNotFound )
@@ -78,9 +83,13 @@ var _ = Describe("NewClusterAwareClient", Ordered, func() {
78
83
err = cl .List (ctx , pods )
79
84
Expect (err ).NotTo (HaveOccurred ())
80
85
86
+ pod := & corev1.Pod {}
87
+ err = cl .Get (ctx , types.NamespacedName {Namespace : "default" , Name : "foo" }, pod )
88
+ Expect (err ).NotTo (HaveOccurred ())
89
+
81
90
mu .Lock ()
82
91
defer mu .Unlock ()
83
- Expect (paths ).To (Equal ([]string {"/api/v1" , "/api/v1/pods" }))
92
+ Expect (paths ).To (Equal ([]string {"/api/v1" , "/api/v1/pods" , "/api/v1/namespaces/default/pods/foo" }))
84
93
})
85
94
86
95
It ("should work with a cluster in the kontext" , func (ctx context.Context ) {
@@ -91,9 +100,13 @@ var _ = Describe("NewClusterAwareClient", Ordered, func() {
91
100
err = cl .List (kontext .WithCluster (ctx , "root" ), pods )
92
101
Expect (err ).NotTo (HaveOccurred ())
93
102
103
+ pod := & corev1.Pod {}
104
+ err = cl .Get (kontext .WithCluster (ctx , "root" ), types.NamespacedName {Namespace : "default" , Name : "foo" }, pod )
105
+ Expect (err ).NotTo (HaveOccurred ())
106
+
94
107
mu .Lock ()
95
108
defer mu .Unlock ()
96
- Expect (paths ).To (Equal ([]string {"/clusters/root/api/v1" , "/clusters/root/api/v1/pods" }))
109
+ Expect (paths ).To (Equal ([]string {"/clusters/root/api/v1" , "/clusters/root/api/v1/pods" , "/clusters/root/api/v1/namespaces/default/pods/foo" }))
97
110
})
98
111
99
112
It ("should work with a wildcard cluster in the kontext" , func (ctx context.Context ) {
@@ -121,9 +134,15 @@ var _ = Describe("NewClusterAwareClient", Ordered, func() {
121
134
err = cl .List (ctx , pods )
122
135
Expect (err ).NotTo (HaveOccurred ())
123
136
137
+ pod := & unstructured.Unstructured {}
138
+ pod .SetAPIVersion ("v1" )
139
+ pod .SetKind ("Pod" )
140
+ err = cl .Get (ctx , types.NamespacedName {Namespace : "default" , Name : "foo" }, pod )
141
+ Expect (err ).NotTo (HaveOccurred ())
142
+
124
143
mu .Lock ()
125
144
defer mu .Unlock ()
126
- Expect (paths ).To (Equal ([]string {"/api/v1" , "/api/v1/pods" }))
145
+ Expect (paths ).To (Equal ([]string {"/api/v1" , "/api/v1/pods" , "/api/v1/namespaces/default/pods/foo" }))
127
146
})
128
147
129
148
It ("should work with a cluster in the kontext" , func (ctx context.Context ) {
@@ -136,9 +155,15 @@ var _ = Describe("NewClusterAwareClient", Ordered, func() {
136
155
err = cl .List (kontext .WithCluster (ctx , "root" ), pods )
137
156
Expect (err ).NotTo (HaveOccurred ())
138
157
158
+ pod := & unstructured.Unstructured {}
159
+ pod .SetAPIVersion ("v1" )
160
+ pod .SetKind ("Pod" )
161
+ err = cl .Get (kontext .WithCluster (ctx , "root" ), types.NamespacedName {Namespace : "default" , Name : "foo" }, pod )
162
+ Expect (err ).NotTo (HaveOccurred ())
163
+
139
164
mu .Lock ()
140
165
defer mu .Unlock ()
141
- Expect (paths ).To (Equal ([]string {"/clusters/root/api/v1" , "/clusters/root/api/v1/pods" }))
166
+ Expect (paths ).To (Equal ([]string {"/clusters/root/api/v1" , "/clusters/root/api/v1/pods" , "/clusters/root/api/v1/namespaces/default/pods/foo" }))
142
167
})
143
168
144
169
It ("should work with a wildcard cluster in the kontext" , func (ctx context.Context ) {
@@ -201,13 +226,18 @@ var _ = Describe("NewClusterAwareClient", Ordered, func() {
201
226
202
227
var _ = Describe ("NewClusterAwareCache" , Ordered , func () {
203
228
var (
204
- srv * httptest.Server
205
- mu sync.Mutex
206
- paths []string
207
- cfg * rest.Config
229
+ cancelCtx context.CancelFunc
230
+ srv * httptest.Server
231
+ mu sync.Mutex
232
+ paths []string
233
+ cfg * rest.Config
234
+ c cache.Cache
208
235
)
209
236
210
237
BeforeAll (func () {
238
+ var ctx context.Context
239
+ ctx , cancelCtx = context .WithCancel (context .Background ())
240
+
211
241
srv = httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , req * http.Request ) {
212
242
mu .Lock ()
213
243
pth := req .URL .Path
@@ -225,11 +255,16 @@ var _ = Describe("NewClusterAwareCache", Ordered, func() {
225
255
case req .URL .Path == "/clusters/*/api/v1/pods" && req .URL .Query ().Get ("watch" ) != "true" :
226
256
w .Header ().Set ("Content-Type" , "application/json" )
227
257
w .WriteHeader (http .StatusOK )
228
- _ , _ = w .Write ([]byte (`{"kind": "PodList","apiVersion": "v1","metadata": {"resourceVersion": "184126176"}, "items": [{"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126176","annotations":{"kcp.io/cluster":"root"}}}]}` ))
258
+ _ , _ = w .Write ([]byte (`{"kind": "PodList","apiVersion": "v1","metadata": {"resourceVersion": "184126176"}, "items": [
259
+ {"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126176","annotations":{"kcp.io/cluster":"root"}}},
260
+ {"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126093","annotations":{"kcp.io/cluster":"ws"}}}
261
+ ]}` ))
229
262
case req .URL .Path == "/clusters/*/api/v1/pods" && req .URL .Query ().Get ("watch" ) == "true" :
230
263
w .Header ().Set ("Content-Type" , "application/json" )
231
264
w .Header ().Set ("Transfer-Encoding" , "chunked" )
232
265
w .WriteHeader (http .StatusOK )
266
+ _ , _ = w .Write ([]byte (`{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"bar","namespace":"default","resourceVersion":"184126177","annotations":{"kcp.io/cluster":"root"}}}}` ))
267
+ _ , _ = w .Write ([]byte (`{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"bar","namespace":"default","resourceVersion":"184126178","annotations":{"kcp.io/cluster":"ws"}}}}` ))
233
268
if w , ok := w .(http.Flusher ); ok {
234
269
w .Flush ()
235
270
}
@@ -239,11 +274,25 @@ var _ = Describe("NewClusterAwareCache", Ordered, func() {
239
274
_ , _ = w .Write ([]byte (fmt .Sprintf ("Not found %q" , req .RequestURI )))
240
275
}
241
276
}))
277
+ go func () {
278
+ <- ctx .Done ()
279
+ srv .Close ()
280
+ }()
242
281
243
282
cfg = & rest.Config {
244
283
Host : srv .URL ,
245
284
}
246
285
Expect (rest .SetKubernetesDefaults (cfg )).To (Succeed ())
286
+
287
+ var err error
288
+ c , err = NewClusterAwareCache (cfg , cache.Options {})
289
+ Expect (err ).NotTo (HaveOccurred ())
290
+ go func () {
291
+ if err := c .Start (ctx ); err != nil {
292
+ Expect (err ).NotTo (HaveOccurred ())
293
+ }
294
+ }()
295
+ c .WaitForCacheSync (ctx )
247
296
})
248
297
249
298
BeforeEach (func () {
@@ -253,25 +302,38 @@ var _ = Describe("NewClusterAwareCache", Ordered, func() {
253
302
})
254
303
255
304
AfterAll (func () {
256
- srv . Close ()
305
+ cancelCtx ()
257
306
})
258
307
259
308
It ("should always access wildcard clusters and serve other clusters from memory" , func (ctx context.Context ) {
260
- c , err := NewClusterAwareCache (cfg , cache.Options {})
261
- Expect (err ).NotTo (HaveOccurred ())
262
- go func () {
263
- if err := c .Start (ctx ); err != nil {
264
- Expect (err ).NotTo (HaveOccurred ())
265
- }
266
- }()
267
- c .WaitForCacheSync (ctx )
268
-
269
309
pod := & corev1.Pod {}
270
- err = c .Get (kontext .WithCluster (ctx , "root" ), types.NamespacedName {Namespace : "default" , Name : "foo" }, pod )
310
+ err : = c .Get (kontext .WithCluster (ctx , "root" ), types.NamespacedName {Namespace : "default" , Name : "foo" }, pod )
271
311
Expect (err ).NotTo (HaveOccurred ())
272
312
273
313
mu .Lock ()
274
314
defer mu .Unlock ()
275
315
Expect (paths ).To (Equal ([]string {"/clusters/*/api/v1" , "/clusters/*/api/v1/pods" , "/clusters/*/api/v1/pods?watch=true" }))
276
316
})
317
+
318
+ It ("should return only the pods from the requested cluster" , func (ctx context.Context ) {
319
+ pod := & corev1.Pod {}
320
+ err := c .Get (kontext .WithCluster (ctx , "root" ), types.NamespacedName {Namespace : "default" , Name : "foo" }, pod )
321
+ Expect (err ).NotTo (HaveOccurred ())
322
+ Expect (pod .Annotations ).To (HaveKeyWithValue ("kcp.io/cluster" , "root" ))
323
+
324
+ pods := & corev1.PodList {}
325
+ err = c .List (kontext .WithCluster (ctx , "root" ), pods )
326
+ Expect (err ).NotTo (HaveOccurred ())
327
+ Expect (pods .Items ).To (HaveLen (2 ))
328
+ Expect (pods .Items [0 ].Annotations ).To (HaveKeyWithValue ("kcp.io/cluster" , "root" ))
329
+ Expect (pods .Items [1 ].Annotations ).To (HaveKeyWithValue ("kcp.io/cluster" , "root" ))
330
+ Expect (sets .New (pods .Items [0 ].Name , pods .Items [1 ].Name )).To (Equal (sets .New ("foo" , "bar" )))
331
+ })
332
+
333
+ It ("should return all pods from all clusters without cluster in context" , func (ctx context.Context ) {
334
+ pods := & corev1.PodList {}
335
+ err := c .List (ctx , pods )
336
+ Expect (err ).NotTo (HaveOccurred ())
337
+ Expect (pods .Items ).To (HaveLen (4 ))
338
+ })
277
339
})
0 commit comments