Skip to content

Commit b226145

Browse files
metacosmcsviri
andauthored
feat: provide de-duplicated secondary resources stream on Context (#3141)
Signed-off-by: Chris Laprun <[email protected]> Co-authored-by: Attila Mészáros <[email protected]>
1 parent 0f632f8 commit b226145

File tree

12 files changed

+628
-38
lines changed

12 files changed

+628
-38
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,83 @@ default <R> Optional<R> getSecondaryResource(Class<R> expectedType) {
3535
return getSecondaryResource(expectedType, null);
3636
}
3737

38-
<R> Set<R> getSecondaryResources(Class<R> expectedType);
38+
/**
39+
* Retrieves a {@link Set} of the secondary resources of the specified type, which are associated
40+
* with the primary resource being processed, possibly making sure that only the latest version of
41+
* each resource is retrieved.
42+
*
43+
* <p>Note: While this method returns a {@link Set}, it is possible to get several copies of a
44+
* given resource albeit all with different {@code resourceVersion}. If you want to avoid this
45+
* situation, call {@link #getSecondaryResources(Class, boolean)} with the {@code deduplicate}
46+
* parameter set to {@code true}.
47+
*
48+
* @param expectedType a class representing the type of secondary resources to retrieve
49+
* @param <R> the type of secondary resources to retrieve
50+
* @return a {@link Stream} of secondary resources of the specified type, possibly deduplicated
51+
*/
52+
default <R> Set<R> getSecondaryResources(Class<R> expectedType) {
53+
return getSecondaryResources(expectedType, false);
54+
}
55+
56+
/**
57+
* Retrieves a {@link Set} of the secondary resources of the specified type, which are associated
58+
* with the primary resource being processed, possibly making sure that only the latest version of
59+
* each resource is retrieved.
60+
*
61+
* <p>Note: While this method returns a {@link Set}, it is possible to get several copies of a
62+
* given resource albeit all with different {@code resourceVersion}. If you want to avoid this
63+
* situation, ask for the deduplicated version by setting the {@code deduplicate} parameter to
64+
* {@code true}.
65+
*
66+
* @param expectedType a class representing the type of secondary resources to retrieve
67+
* @param deduplicate {@code true} if only the latest version of each resource should be kept,
68+
* {@code false} otherwise
69+
* @param <R> the type of secondary resources to retrieve
70+
* @return a {@link Set} of secondary resources of the specified type, possibly deduplicated
71+
* @throws IllegalArgumentException if the secondary resource type cannot be deduplicated because
72+
* it's not extending {@link HasMetadata}, which is required to access the resource version
73+
* @since 5.3.0
74+
*/
75+
<R> Set<R> getSecondaryResources(Class<R> expectedType, boolean deduplicate);
3976

77+
/**
78+
* Retrieves a {@link Stream} of the secondary resources of the specified type, which are
79+
* associated with the primary resource being processed, possibly making sure that only the latest
80+
* version of each resource is retrieved.
81+
*
82+
* <p>Note: It is possible to get several copies of a given resource albeit all with different
83+
* {@code resourceVersion}. If you want to avoid this situation, call {@link
84+
* #getSecondaryResourcesAsStream(Class, boolean)} with the {@code deduplicate} parameter set to
85+
* {@code true}.
86+
*
87+
* @param expectedType a class representing the type of secondary resources to retrieve
88+
* @param <R> the type of secondary resources to retrieve
89+
* @return a {@link Stream} of secondary resources of the specified type, possibly deduplicated
90+
*/
4091
default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
41-
return getSecondaryResources(expectedType).stream();
92+
return getSecondaryResourcesAsStream(expectedType, false);
4293
}
4394

95+
/**
96+
* Retrieves a {@link Stream} of the secondary resources of the specified type, which are
97+
* associated with the primary resource being processed, possibly making sure that only the latest
98+
* version of each resource is retrieved.
99+
*
100+
* <p>Note: It is possible to get several copies of a given resource albeit all with different
101+
* {@code resourceVersion}. If you want to avoid this situation, ask for the deduplicated version
102+
* by setting the {@code deduplicate} parameter to {@code true}.
103+
*
104+
* @param expectedType a class representing the type of secondary resources to retrieve
105+
* @param deduplicate {@code true} if only the latest version of each resource should be kept,
106+
* {@code false} otherwise
107+
* @param <R> the type of secondary resources to retrieve
108+
* @return a {@link Stream} of secondary resources of the specified type, possibly deduplicated
109+
* @throws IllegalArgumentException if the secondary resource type cannot be deduplicated because
110+
* it's not extending {@link HasMetadata}, which is required to access the resource version
111+
* @since 5.3.0
112+
*/
113+
<R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate);
114+
44115
<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);
45116

46117
ControllerConfiguration<P> getControllerConfiguration();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.reconciler;
1717

18+
import java.util.HashSet;
1819
import java.util.Map;
1920
import java.util.Optional;
2021
import java.util.Set;
@@ -26,6 +27,7 @@
2627

2728
import io.fabric8.kubernetes.api.model.HasMetadata;
2829
import io.fabric8.kubernetes.client.KubernetesClient;
30+
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
2931
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
3032
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
3133
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
@@ -36,7 +38,6 @@
3638
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3739

3840
public class DefaultContext<P extends HasMetadata> implements Context<P> {
39-
4041
private RetryInfo retryInfo;
4142
private final Controller<P> controller;
4243
private final P primaryResource;
@@ -71,15 +72,44 @@ public Optional<RetryInfo> getRetryInfo() {
7172
}
7273

7374
@Override
74-
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
75+
public <T> Set<T> getSecondaryResources(Class<T> expectedType, boolean deduplicate) {
76+
if (deduplicate) {
77+
final var deduplicatedMap = deduplicatedMap(getSecondaryResourcesAsStream(expectedType));
78+
return new HashSet<>(deduplicatedMap.values());
79+
}
7580
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
7681
}
7782

78-
@Override
79-
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
80-
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
81-
.map(es -> es.getSecondaryResources(primaryResource))
82-
.flatMap(Set::stream);
83+
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate) {
84+
final var stream =
85+
controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
86+
.<R>mapMulti(
87+
(es, consumer) -> es.getSecondaryResources(primaryResource).forEach(consumer));
88+
if (deduplicate) {
89+
if (!HasMetadata.class.isAssignableFrom(expectedType)) {
90+
throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants");
91+
}
92+
return deduplicatedMap(stream).values().stream();
93+
} else {
94+
return stream;
95+
}
96+
}
97+
98+
private <R> Map<ResourceID, R> deduplicatedMap(Stream<R> stream) {
99+
return stream.collect(
100+
Collectors.toUnmodifiableMap(
101+
DefaultContext::resourceID,
102+
Function.identity(),
103+
(existing, replacement) ->
104+
compareResourceVersions(existing, replacement) >= 0 ? existing : replacement));
105+
}
106+
107+
private static ResourceID resourceID(Object hasMetadata) {
108+
return ResourceID.fromResource((HasMetadata) hasMetadata);
109+
}
110+
111+
private static int compareResourceVersions(Object v1, Object v2) {
112+
return ReconcilerUtilsInternal.compareResourceVersions((HasMetadata) v1, (HasMetadata) v2);
83113
}
84114

85115
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -535,13 +535,13 @@ public <R extends HasMetadata> R resourcePatch(R resource, UnaryOperator<R> upda
535535
if (esList.isEmpty()) {
536536
throw new IllegalStateException("No event source found for type: " + resource.getClass());
537537
}
538+
var es = esList.get(0);
538539
if (esList.size() > 1) {
539-
throw new IllegalStateException(
540-
"Multiple event sources found for: "
541-
+ resource.getClass()
542-
+ " please provide the target event source");
540+
log.warn(
541+
"Multiple event sources found for type: {}, selecting first with name {}",
542+
resource.getClass(),
543+
es.name());
543544
}
544-
var es = esList.get(0);
545545
if (es instanceof ManagedInformerEventSource mes) {
546546
return resourcePatch(resource, updateOperation, (ManagedInformerEventSource<R, P, ?>) mes);
547547
} else {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceID.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,28 @@ public boolean equals(Object o) {
6363
}
6464

6565
public boolean isSameResource(HasMetadata hasMetadata) {
66+
if (hasMetadata == null) {
67+
return false;
68+
}
6669
final var metadata = hasMetadata.getMetadata();
67-
return getName().equals(metadata.getName())
68-
&& getNamespace().map(ns -> ns.equals(metadata.getNamespace())).orElse(true);
70+
return isSameResource(metadata.getName(), metadata.getNamespace());
71+
}
72+
73+
/**
74+
* Whether this ResourceID points to the same resource as the one identified by the specified name
75+
* and namespace.
76+
*
77+
* <p>Note that this doesn't take API version or Kind into account so this should only be used
78+
* when checking resources that are reasonably expected to be of the same type.
79+
*
80+
* @param name the name of the resource we want to check
81+
* @param namespace the possibly {@code null} namespace of the resource we want to check
82+
* @return {@code true} if this resource points to the same resource as the one pointed to by the
83+
* specified name and namespace, {@code false} otherwise
84+
* @since 5.3.0
85+
*/
86+
public boolean isSameResource(String name, String namespace) {
87+
return Objects.equals(this.name, name) && Objects.equals(this.namespace, namespace);
6988
}
7089

7190
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ public Set<R> getSecondaryResources(P primary) {
218218
}
219219
return secondaryIDs.stream()
220220
.map(this::get)
221-
.flatMap(Optional::stream)
221+
.filter(Optional::isPresent)
222+
.map(Optional::get)
222223
.collect(Collectors.toSet());
223224
}
224225

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,23 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.reconciler;
1717

18+
import java.util.List;
19+
import java.util.Set;
20+
21+
import org.junit.jupiter.api.BeforeEach;
1822
import org.junit.jupiter.api.Test;
1923

2024
import io.fabric8.kubernetes.api.model.ConfigMap;
25+
import io.fabric8.kubernetes.api.model.HasMetadata;
26+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
27+
import io.fabric8.kubernetes.api.model.Pod;
28+
import io.fabric8.kubernetes.api.model.PodBuilder;
2129
import io.fabric8.kubernetes.api.model.Secret;
2230
import io.javaoperatorsdk.operator.processing.Controller;
2331
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
2432
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
33+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
34+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
2535

2636
import static org.assertj.core.api.Assertions.assertThat;
2737
import static org.mockito.ArgumentMatchers.any;
@@ -30,17 +40,21 @@
3040

3141
class DefaultContextTest {
3242

33-
private final Secret primary = new Secret();
34-
private final Controller<Secret> mockController = mock();
43+
private DefaultContext<?> context;
44+
private Controller<HasMetadata> mockController;
45+
private EventSourceManager<HasMetadata> mockManager;
3546

36-
private final DefaultContext<?> context =
37-
new DefaultContext<>(null, mockController, primary, false, false);
47+
@BeforeEach
48+
void setUp() {
49+
mockController = mock();
50+
mockManager = mock();
51+
when(mockController.getEventSourceManager()).thenReturn(mockManager);
52+
53+
context = new DefaultContext<>(null, mockController, new Secret(), false, false);
54+
}
3855

3956
@Test
40-
@SuppressWarnings("unchecked")
4157
void getSecondaryResourceReturnsEmptyOptionalOnNonActivatedDRType() {
42-
var mockManager = mock(EventSourceManager.class);
43-
when(mockController.getEventSourceManager()).thenReturn(mockManager);
4458
when(mockController.workflowContainsDependentForType(ConfigMap.class)).thenReturn(true);
4559
when(mockManager.getEventSourceFor(any(), any()))
4660
.thenThrow(new NoEventSourceForClassException(ConfigMap.class));
@@ -56,4 +70,101 @@ void setRetryInfo() {
5670
assertThat(newContext).isSameAs(context);
5771
assertThat(newContext.getRetryInfo()).hasValue(retryInfo);
5872
}
73+
74+
@Test
75+
void latestDistinctKeepsOnlyLatestResourceVersion() {
76+
// Create multiple resources with same name and namespace but different versions
77+
var pod1v1 = podWithNameAndVersion("pod1", "100");
78+
var pod1v2 = podWithNameAndVersion("pod1", "200");
79+
var pod1v3 = podWithNameAndVersion("pod1", "150");
80+
81+
// Create a resource with different name
82+
var pod2v1 = podWithNameAndVersion("pod2", "100");
83+
84+
// Create a resource with same name but different namespace
85+
var pod1OtherNsv1 = podWithNameAndVersion("pod1", "50", "other");
86+
87+
setUpEventSourceWith(pod1v1, pod1v2, pod1v3, pod1OtherNsv1, pod2v1);
88+
89+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
90+
91+
// Should have 3 resources: pod1 in default (latest version 200), pod2 in default, and pod1 in
92+
// other
93+
assertThat(result).hasSize(3);
94+
95+
// Find pod1 in default namespace - should have version 200
96+
final var pod1InDefault =
97+
result.stream()
98+
.filter(r -> ResourceID.fromResource(r).isSameResource("pod1", "default"))
99+
.findFirst()
100+
.orElseThrow();
101+
assertThat(pod1InDefault.getMetadata().getResourceVersion()).isEqualTo("200");
102+
103+
// Find pod2 in default namespace - should exist
104+
HasMetadata pod2InDefault =
105+
result.stream()
106+
.filter(r -> ResourceID.fromResource(r).isSameResource("pod2", "default"))
107+
.findFirst()
108+
.orElseThrow();
109+
assertThat(pod2InDefault.getMetadata().getResourceVersion()).isEqualTo("100");
110+
111+
// Find pod1 in other namespace - should exist
112+
HasMetadata pod1InOther =
113+
result.stream()
114+
.filter(r -> ResourceID.fromResource(r).isSameResource("pod1", "other"))
115+
.findFirst()
116+
.orElseThrow();
117+
assertThat(pod1InOther.getMetadata().getResourceVersion()).isEqualTo("50");
118+
}
119+
120+
private void setUpEventSourceWith(Pod... pods) {
121+
EventSource<Pod, HasMetadata> mockEventSource = mock();
122+
when(mockEventSource.getSecondaryResources(any())).thenReturn(Set.of(pods));
123+
when(mockManager.getEventSourcesFor(Pod.class)).thenReturn(List.of(mockEventSource));
124+
}
125+
126+
private static Pod podWithNameAndVersion(
127+
String name, String resourceVersion, String... namespace) {
128+
final var ns = namespace != null && namespace.length > 0 ? namespace[0] : "default";
129+
return new PodBuilder()
130+
.withMetadata(
131+
new ObjectMetaBuilder()
132+
.withName(name)
133+
.withNamespace(ns)
134+
.withResourceVersion(resourceVersion)
135+
.build())
136+
.build();
137+
}
138+
139+
@Test
140+
void latestDistinctHandlesEmptyStream() {
141+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
142+
143+
assertThat(result).isEmpty();
144+
}
145+
146+
@Test
147+
void latestDistinctHandlesSingleResource() {
148+
final var pod = podWithNameAndVersion("pod1", "100");
149+
setUpEventSourceWith(pod);
150+
151+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
152+
153+
assertThat(result).hasSize(1);
154+
assertThat(result).contains(pod);
155+
}
156+
157+
@Test
158+
void latestDistinctComparesNumericVersionsCorrectly() {
159+
// Test that version 1000 is greater than version 999 (not lexicographic)
160+
final var podV999 = podWithNameAndVersion("pod1", "999");
161+
final var podV1000 = podWithNameAndVersion("pod1", "1000");
162+
setUpEventSourceWith(podV999, podV1000);
163+
164+
var result = context.getSecondaryResourcesAsStream(Pod.class, true).toList();
165+
166+
assertThat(result).hasSize(1);
167+
HasMetadata resultPod = result.iterator().next();
168+
assertThat(resultPod.getMetadata().getResourceVersion()).isEqualTo("1000");
169+
}
59170
}

0 commit comments

Comments
 (0)