Skip to content

Commit 0cc0f5a

Browse files
committed
refinements mentioned on #2012
provides two options - to control if the annotation is used (to omit events that come too quickly) - to parse the resource version (to keep the cache up-to-date and omit events if they come too slowly)
1 parent a3e2c24 commit 0cc0f5a

File tree

10 files changed

+126
-30
lines changed

10 files changed

+126
-30
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+25
Original file line numberDiff line numberDiff line change
@@ -345,4 +345,29 @@ default boolean ssaBasedCreateUpdateMatchForDependentResources() {
345345
return true;
346346
}
347347

348+
/**
349+
* If an annotation should be used so that the operator sdk can detect events from its own updates
350+
* of dependent resources and then filter them.
351+
* <p>
352+
* Disable this if you want to react to your own dependent resource updates
353+
*
354+
* @since 4.5.0
355+
*/
356+
default boolean previousAnnotationForDependentResources() {
357+
return true;
358+
}
359+
360+
/**
361+
* If the event logic should parse the resourceVersion to determine the ordering of events. This is typically not needed.
362+
* <p>
363+
* Disabled by default as Kubernetes does not officially support this interpretation of resourceVersions. Enable
364+
* only if your api server event processing seems to lag the operator logic and you want to further minimize the
365+
* the amount of work done / updates issued by the operator.
366+
*
367+
* @since 4.5.0
368+
*/
369+
default boolean parseResourceVersions() {
370+
return false;
371+
}
372+
348373
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

+14
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ConfigurationServiceOverrider {
3535
private Duration cacheSyncTimeout;
3636
private ResourceClassResolver resourceClassResolver;
3737
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
38+
private Boolean previousAnnotationForDependentResources;
3839

3940
ConfigurationServiceOverrider(ConfigurationService original) {
4041
this.original = original;
@@ -150,6 +151,12 @@ public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentRe
150151
return this;
151152
}
152153

154+
public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(
155+
boolean value) {
156+
this.previousAnnotationForDependentResources = value;
157+
return this;
158+
}
159+
153160
public ConfigurationService build() {
154161
return new BaseConfigurationService(original.getVersion(), cloner, client) {
155162
@Override
@@ -256,6 +263,13 @@ public boolean ssaBasedCreateUpdateMatchForDependentResources() {
256263
? ssaBasedCreateUpdateMatchForDependentResources
257264
: super.ssaBasedCreateUpdateMatchForDependentResources();
258265
}
266+
267+
@Override
268+
public boolean previousAnnotationForDependentResources() {
269+
return previousAnnotationForDependentResources != null
270+
? previousAnnotationForDependentResources
271+
: super.previousAnnotationForDependentResources();
272+
}
259273
};
260274
}
261275

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
6262
return secondaryToPrimaryMapper;
6363
}
6464

65+
@Override
6566
public Optional<OnDeleteFilter<? super R>> onDeleteFilter() {
6667
return Optional.ofNullable(onDeleteFilter);
6768
}
@@ -95,12 +96,15 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
9596
*/
9697
SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();
9798

99+
@Override
98100
Optional<OnAddFilter<? super R>> onAddFilter();
99101

102+
@Override
100103
Optional<OnUpdateFilter<? super R>> onUpdateFilter();
101104

102105
Optional<OnDeleteFilter<? super R>> onDeleteFilter();
103106

107+
@Override
104108
Optional<GenericFilter<? super R>> genericFilter();
105109

106110
<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public R create(R target, P primary, Context<P> context) {
113113
target.getMetadata().setResourceVersion("1");
114114
}
115115
}
116-
addMetadata(false, null, target, primary);
116+
addMetadata(false, null, target, primary, context);
117117
final var resource = prepare(target, primary, "Creating");
118118
return useSSA(context)
119119
? resource
@@ -129,7 +129,7 @@ public R update(R actual, R target, P primary, Context<P> context) {
129129
actual.getMetadata().getResourceVersion());
130130
}
131131
R updatedResource;
132-
addMetadata(false, actual, target, primary);
132+
addMetadata(false, actual, target, primary, context);
133133
if (useSSA(context)) {
134134
updatedResource = prepare(target, primary, "Updating")
135135
.fieldManager(context.getControllerConfiguration().fieldManager())
@@ -160,7 +160,7 @@ public Result<R> match(R actualResource, R desired, P primary, Context<P> contex
160160
public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMatcher<R> matcher,
161161
Context<P> context) {
162162
final boolean matches;
163-
addMetadata(true, actualResource, desired, primary);
163+
addMetadata(true, actualResource, desired, primary, context);
164164
if (useSSA(context)) {
165165
matches = SSABasedGenericKubernetesResourceMatcher.getInstance()
166166
.matches(actualResource, desired, context);
@@ -170,8 +170,9 @@ public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMa
170170
return Result.computed(matches, desired);
171171
}
172172

173-
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary) {
174-
if (forMatch) { // keep the current
173+
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary,
174+
Context<P> context) {
175+
if (forMatch) { // keep the current previous annotation
175176
String actual = actualResource.getMetadata().getAnnotations()
176177
.get(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
177178
Map<String, String> annotations = target.getMetadata().getAnnotations();
@@ -180,7 +181,7 @@ protected void addMetadata(boolean forMatch, R actualResource, final R target, P
180181
} else {
181182
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
182183
}
183-
} else { // set a new one
184+
} else if (usePreviousAnnotation(context)) { // set a new one
184185
eventSource().orElseThrow().addPreviousAnnotation(
185186
Optional.ofNullable(actualResource).map(r -> r.getMetadata().getResourceVersion())
186187
.orElse(null),
@@ -196,6 +197,11 @@ private boolean useSSA(Context<P> context) {
196197
.ssaBasedCreateUpdateMatchForDependentResources());
197198
}
198199

200+
private boolean usePreviousAnnotation(Context<P> context) {
201+
return context.getControllerConfiguration().getConfigurationService()
202+
.previousAnnotationForDependentResources();
203+
}
204+
199205
@Override
200206
protected void handleDelete(P primary, R secondary, Context<P> context) {
201207
if (secondary != null) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ public class ControllerResourceEventSource<T extends HasMetadata>
3232

3333
@SuppressWarnings({"unchecked", "rawtypes"})
3434
public ControllerResourceEventSource(Controller<T> controller) {
35-
super(controller.getCRClient(), controller.getConfiguration());
35+
super(controller.getCRClient(), controller.getConfiguration(), false);
3636
this.controller = controller;
3737

3838
final var config = controller.getConfiguration();
3939
OnUpdateFilter internalOnUpdateFilter =
40-
(OnUpdateFilter<T>) onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
41-
config.getFinalizerName())
42-
.or(onUpdateGenerationAware(config.isGenerationAware()))
43-
.or(onUpdateMarkedForDeletion());
40+
onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
41+
config.getFinalizerName())
42+
.or(onUpdateGenerationAware(config.isGenerationAware()))
43+
.or(onUpdateMarkedForDeletion());
4444

4545
legacyFilters = config.getEventFilter();
4646

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+27-3
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6969
extends ManagedInformerEventSource<R, P, InformerConfiguration<R>>
7070
implements ResourceEventHandler<R> {
7171

72+
private static final int MAX_RESOURCE_VERSIONS = 256;
73+
7274
public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
7375

7476
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
@@ -78,14 +80,30 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7880
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
7981
private Map<String, Function<R, List<String>>> indexerBuffer = new HashMap<>();
8082
private final String id = UUID.randomUUID().toString();
83+
private final Set<String> knownResourceVersions;
8184

8285
public InformerEventSource(
8386
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
84-
this(configuration, context.getClient());
87+
this(configuration, context.getClient(),
88+
context.getControllerConfiguration().getConfigurationService().parseResourceVersions());
8589
}
8690

8791
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
88-
super(client.resources(configuration.getResourceClass()), configuration);
92+
this(configuration, client, false);
93+
}
94+
95+
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client, boolean parseResourceVersions) {
96+
super(client.resources(configuration.getResourceClass()), configuration, parseResourceVersions);
97+
if (parseResourceVersions) {
98+
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
99+
@Override
100+
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
101+
return size() >= MAX_RESOURCE_VERSIONS;
102+
}
103+
});
104+
} else {
105+
knownResourceVersions = null;
106+
}
89107

90108
// If there is a primary to secondary mapper there is no need for primary to secondary index.
91109
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
@@ -169,6 +187,10 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
169187
}
170188

171189
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
190+
if (knownResourceVersions != null
191+
&& knownResourceVersions.contains(newObject.getMetadata().getResourceVersion())) {
192+
return true;
193+
}
172194
var res = temporaryResourceCache.getResourceFromCache(resourceID);
173195
if (res.isEmpty()) {
174196
return isEventKnownFromAnnotation(newObject, oldObject);
@@ -262,6 +284,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
262284

263285
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
264286
primaryToSecondaryIndex.onAddOrUpdate(newResource);
287+
if (knownResourceVersions != null) {
288+
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
289+
}
265290
temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource)
266291
.map(r -> r.getMetadata().getResourceVersion()).orElse(null));
267292
}
@@ -275,7 +300,6 @@ public boolean allowsNamespaceChanges() {
275300
return configuration().followControllerNamespaceChanges();
276301
}
277302

278-
279303
private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
280304
if (genericFilter != null && !genericFilter.accept(newObject)) {
281305
return false;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,26 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
4242
protected MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client;
4343

4444
protected ManagedInformerEventSource(
45-
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
45+
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration, boolean parseResourceVersions) {
4646
super(configuration.getResourceClass());
4747
this.client = client;
48-
temporaryResourceCache = new TemporaryResourceCache<>(this);
48+
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
4949
this.cache = new InformerManager<>(client, configuration, this);
5050
}
5151

5252
@Override
5353
public void onAdd(R resource) {
54-
temporaryResourceCache.removeResourceFromCache(resource);
54+
temporaryResourceCache.onEvent(resource, false);
5555
}
5656

5757
@Override
5858
public void onUpdate(R oldObj, R newObj) {
59-
temporaryResourceCache.removeResourceFromCache(newObj);
59+
temporaryResourceCache.onEvent(newObj, false);
6060
}
6161

6262
@Override
6363
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
64-
temporaryResourceCache.removeResourceFromCache(obj);
64+
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
6565
}
6666

6767
protected InformerManager<R, C> manager() {
@@ -127,6 +127,7 @@ void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache)
127127
this.temporaryResourceCache = temporaryResourceCache;
128128
}
129129

130+
@Override
130131
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
131132
cache.addIndexers(indexers);
132133
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

+31-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.LoggerFactory;
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
11+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1112
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
1213
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1314

@@ -36,13 +37,16 @@ public class TemporaryResourceCache<T extends HasMetadata> {
3637

3738
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
3839
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
40+
private final boolean parseResourceVersions;
3941

40-
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
42+
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource, boolean parseResourceVersions) {
4143
this.managedInformerEventSource = managedInformerEventSource;
44+
this.parseResourceVersions = parseResourceVersions;
4245
}
4346

44-
public synchronized Optional<T> removeResourceFromCache(T resource) {
45-
return Optional.ofNullable(cache.remove(ResourceID.fromResource(resource)));
47+
public synchronized void onEvent(T resource, boolean unknownState) {
48+
cache.computeIfPresent(ResourceID.fromResource(resource),
49+
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
4650
}
4751

4852
public synchronized void putAddedResource(T newResource) {
@@ -61,18 +65,36 @@ public synchronized void putResource(T newResource, String previousResourceVersi
6165
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
6266

6367
if ((previousResourceVersion == null && cachedResource == null)
64-
|| (cachedResource != null && previousResourceVersion != null
65-
&& cachedResource.getMetadata().getResourceVersion()
66-
.equals(previousResourceVersion))) {
68+
|| (cachedResource != null
69+
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
70+
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
6771
log.debug(
6872
"Temporarily moving ahead to target version {} for resource id: {}",
6973
newResource.getMetadata().getResourceVersion(), resourceId);
7074
putToCache(newResource, resourceId);
71-
} else {
72-
if (cache.remove(resourceId) != null) {
73-
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
75+
} else if (cache.remove(resourceId) != null) {
76+
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
77+
}
78+
}
79+
80+
/**
81+
* @return true if {@link InformerConfiguration#parseResourceVersions()} is enabled and the
82+
* resourceVersion of newResource is numerically greater than cachedResource, otherwise
83+
* false
84+
*/
85+
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
86+
try {
87+
if (parseResourceVersions && Long.compare(Long.parseLong(newResource.getMetadata().getResourceVersion()),
88+
Long.parseLong(cachedResource.getMetadata().getResourceVersion())) > 0) {
89+
return true;
7490
}
91+
} catch (NumberFormatException e) {
92+
log.debug(
93+
"Could not compare resourceVersions {} and {} for {}",
94+
newResource.getMetadata().getResourceVersion(),
95+
cachedResource.getMetadata().getResourceVersion(), resourceId);
7596
}
97+
return false;
7698
}
7799

78100
private void putToCache(T resource, ResourceID resourceID) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
119119
informerEventSource.onUpdate(cachedDeployment, testDeployment());
120120

121121
verify(eventHandlerMock, times(1)).handleEvent(any());
122-
verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any());
122+
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
123123
}
124124

125125
@Test

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
7575
void removesResourceFromCache() {
7676
ConfigMap testResource = propagateTestResourceToCache();
7777

78-
temporaryResourceCache.removeResourceFromCache(testResource());
78+
temporaryResourceCache.onEvent(testResource(), false);
7979

8080
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
8181
.isNotPresent();

0 commit comments

Comments
 (0)