Skip to content

Commit 535add0

Browse files
committed
rough-in of the refinements mentioned on #2012
provides two options - to control if the annotation is used (to omit events that come too quickly) - to parse the resource version (to keep the cache up-to-date and omit events if they come too slowly)
1 parent e9c2639 commit 535add0

File tree

8 files changed

+86
-20
lines changed

8 files changed

+86
-20
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+5
Original file line numberDiff line numberDiff line change
@@ -345,4 +345,9 @@ default boolean ssaBasedCreateUpdateMatchForDependentResources() {
345345
return true;
346346
}
347347

348+
349+
default boolean previousAnnotationForDependentResources() {
350+
return true;
351+
}
352+
348353
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

+8
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
6262
return secondaryToPrimaryMapper;
6363
}
6464

65+
@Override
6566
public Optional<OnDeleteFilter<? super R>> onDeleteFilter() {
6667
return Optional.ofNullable(onDeleteFilter);
6768
}
@@ -83,6 +84,10 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
8384
*/
8485
boolean followControllerNamespaceChanges();
8586

87+
default boolean parseResourceVersions() {
88+
return false;
89+
}
90+
8691
/**
8792
* Returns the configured {@link SecondaryToPrimaryMapper} which will allow JOSDK to identify
8893
* which secondary resources are associated with a given primary resource in cases where there is
@@ -95,12 +100,15 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
95100
*/
96101
SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();
97102

103+
@Override
98104
Optional<OnAddFilter<? super R>> onAddFilter();
99105

106+
@Override
100107
Optional<OnUpdateFilter<? super R>> onUpdateFilter();
101108

102109
Optional<OnDeleteFilter<? super R>> onDeleteFilter();
103110

111+
@Override
104112
Optional<GenericFilter<? super R>> genericFilter();
105113

106114
<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public R create(R target, P primary, Context<P> context) {
113113
target.getMetadata().setResourceVersion("1");
114114
}
115115
}
116-
addMetadata(false, null, target, primary);
116+
addMetadata(false, null, target, primary, context);
117117
final var resource = prepare(target, primary, "Creating");
118118
return useSSA(context)
119119
? resource
@@ -129,7 +129,7 @@ public R update(R actual, R target, P primary, Context<P> context) {
129129
actual.getMetadata().getResourceVersion());
130130
}
131131
R updatedResource;
132-
addMetadata(false, actual, target, primary);
132+
addMetadata(false, actual, target, primary, context);
133133
if (useSSA(context)) {
134134
updatedResource = prepare(target, primary, "Updating")
135135
.fieldManager(context.getControllerConfiguration().fieldManager())
@@ -160,7 +160,7 @@ public Result<R> match(R actualResource, R desired, P primary, Context<P> contex
160160
public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMatcher<R> matcher,
161161
Context<P> context) {
162162
final boolean matches;
163-
addMetadata(true, actualResource, desired, primary);
163+
addMetadata(true, actualResource, desired, primary, context);
164164
if (useSSA(context)) {
165165
matches = SSABasedGenericKubernetesResourceMatcher.getInstance()
166166
.matches(actualResource, desired, context);
@@ -170,8 +170,9 @@ public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMa
170170
return Result.computed(matches, desired);
171171
}
172172

173-
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary) {
174-
if (forMatch) { // keep the current
173+
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary,
174+
Context<P> context) {
175+
if (forMatch) { // keep the current previous annotation
175176
String actual = actualResource.getMetadata().getAnnotations()
176177
.get(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
177178
Map<String, String> annotations = target.getMetadata().getAnnotations();
@@ -180,7 +181,7 @@ protected void addMetadata(boolean forMatch, R actualResource, final R target, P
180181
} else {
181182
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
182183
}
183-
} else { // set a new one
184+
} else if (usePreviousAnnotation(context)) { // set a new one
184185
eventSource().orElseThrow().addPreviousAnnotation(
185186
Optional.ofNullable(actualResource).map(r -> r.getMetadata().getResourceVersion())
186187
.orElse(null),
@@ -194,6 +195,11 @@ private boolean useSSA(Context<P> context) {
194195
.ssaBasedCreateUpdateMatchForDependentResources();
195196
}
196197

198+
private boolean usePreviousAnnotation(Context<P> context) {
199+
return context.getControllerConfiguration().getConfigurationService()
200+
.previousAnnotationForDependentResources();
201+
}
202+
197203
@Override
198204
protected void handleDelete(P primary, R secondary, Context<P> context) {
199205
if (secondary != null) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6969
extends ManagedInformerEventSource<R, P, InformerConfiguration<R>>
7070
implements ResourceEventHandler<R> {
7171

72+
private static final int MAX_RESOURCE_VERSIONS = 256;
73+
7274
public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
7375

7476
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
@@ -78,6 +80,7 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7880
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
7981
private Map<String, Function<R, List<String>>> indexerBuffer = new HashMap<>();
8082
private final String id = UUID.randomUUID().toString();
83+
private final Set<String> knownResourceVersions;
8184

8285
public InformerEventSource(
8386
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
@@ -86,6 +89,16 @@ public InformerEventSource(
8689

8790
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
8891
super(client.resources(configuration.getResourceClass()), configuration);
92+
if (configuration.parseResourceVersions()) {
93+
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
94+
@Override
95+
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
96+
return size() >= MAX_RESOURCE_VERSIONS;
97+
}
98+
});
99+
} else {
100+
knownResourceVersions = null;
101+
}
89102

90103
// If there is a primary to secondary mapper there is no need for primary to secondary index.
91104
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
@@ -169,6 +182,10 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
169182
}
170183

171184
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
185+
if (knownResourceVersions != null
186+
&& knownResourceVersions.contains(newObject.getMetadata().getResourceVersion())) {
187+
return true;
188+
}
172189
var res = temporaryResourceCache.getResourceFromCache(resourceID);
173190
if (res.isEmpty()) {
174191
return isEventKnownFromAnnotation(newObject, oldObject);
@@ -262,6 +279,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
262279

263280
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
264281
primaryToSecondaryIndex.onAddOrUpdate(newResource);
282+
if (knownResourceVersions != null) {
283+
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
284+
}
265285
temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource)
266286
.map(r -> r.getMetadata().getResourceVersion()).orElse(null));
267287
}
@@ -275,7 +295,6 @@ public boolean allowsNamespaceChanges() {
275295
return configuration().followControllerNamespaceChanges();
276296
}
277297

278-
279298
private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
280299
if (genericFilter != null && !genericFilter.accept(newObject)) {
281300
return false;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,17 @@ protected ManagedInformerEventSource(
5151

5252
@Override
5353
public void onAdd(R resource) {
54-
temporaryResourceCache.removeResourceFromCache(resource);
54+
temporaryResourceCache.onEvent(resource, false);
5555
}
5656

5757
@Override
5858
public void onUpdate(R oldObj, R newObj) {
59-
temporaryResourceCache.removeResourceFromCache(newObj);
59+
temporaryResourceCache.onEvent(newObj, false);
6060
}
6161

6262
@Override
6363
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
64-
temporaryResourceCache.removeResourceFromCache(obj);
64+
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
6565
}
6666

6767
protected InformerManager<R, C> manager() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

+36-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.LoggerFactory;
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
11+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1112
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
1213
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1314

@@ -41,8 +42,15 @@ public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInforme
4142
this.managedInformerEventSource = managedInformerEventSource;
4243
}
4344

44-
public synchronized Optional<T> removeResourceFromCache(T resource) {
45-
return Optional.ofNullable(cache.remove(ResourceID.fromResource(resource)));
45+
public synchronized void onEvent(T resource, boolean unknownState) {
46+
ResourceID resourceId = ResourceID.fromResource(resource);
47+
48+
T cachedResource = cache.get(resourceId);
49+
50+
if (unknownState || (cachedResource != null
51+
&& !isLaterResourceVersion(resourceId, cachedResource, resource))) {
52+
cache.remove(resourceId);
53+
}
4654
}
4755

4856
public synchronized void putAddedResource(T newResource) {
@@ -61,18 +69,38 @@ public synchronized void putResource(T newResource, String previousResourceVersi
6169
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
6270

6371
if ((previousResourceVersion == null && cachedResource == null)
64-
|| (cachedResource != null && previousResourceVersion != null
65-
&& cachedResource.getMetadata().getResourceVersion()
66-
.equals(previousResourceVersion))) {
72+
|| (cachedResource != null
73+
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
74+
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
6775
log.debug(
6876
"Temporarily moving ahead to target version {} for resource id: {}",
6977
newResource.getMetadata().getResourceVersion(), resourceId);
7078
putToCache(newResource, resourceId);
71-
} else {
72-
if (cache.remove(resourceId) != null) {
73-
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
79+
} else if (cache.remove(resourceId) != null) {
80+
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
81+
}
82+
}
83+
84+
/**
85+
* @return true if {@link InformerConfiguration#parseResourceVersions()} is enabled and the resourceVersion of
86+
* newResource is numerically greater than cachedResource, otherwise false
87+
*/
88+
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
89+
try {
90+
if (managedInformerEventSource.getInformerConfiguration() instanceof InformerConfiguration
91+
&& ((InformerConfiguration) managedInformerEventSource.getInformerConfiguration())
92+
.parseResourceVersions()
93+
&& Long.compare(Long.parseLong(newResource.getMetadata().getResourceVersion()),
94+
Long.parseLong(cachedResource.getMetadata().getResourceVersion())) > 0) {
95+
return true;
7496
}
97+
} catch (NumberFormatException e) {
98+
log.debug(
99+
"Could not compare resourceVersions {} and {} for {}",
100+
newResource.getMetadata().getResourceVersion(),
101+
cachedResource.getMetadata().getResourceVersion(), resourceId);
75102
}
103+
return false;
76104
}
77105

78106
private void putToCache(T resource, ResourceID resourceID) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
119119
informerEventSource.onUpdate(cachedDeployment, testDeployment());
120120

121121
verify(eventHandlerMock, times(1)).handleEvent(any());
122-
verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any());
122+
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
123123
}
124124

125125
@Test

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
7575
void removesResourceFromCache() {
7676
ConfigMap testResource = propagateTestResourceToCache();
7777

78-
temporaryResourceCache.removeResourceFromCache(testResource());
78+
temporaryResourceCache.onEvent(testResource(), false);
7979

8080
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
8181
.isNotPresent();

0 commit comments

Comments
 (0)