Skip to content

Commit 8e724e3

Browse files
committed
rough-in of the refinements mentioned on operator-framework#2012
provides two options - to control if the annotation is used (to omit events that come too quickly) - to parse the resource version (to keep the cache up-to-date and omit events if they come too slowly)
1 parent dd2bafd commit 8e724e3

File tree

8 files changed

+85
-20
lines changed

8 files changed

+85
-20
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+5
Original file line numberDiff line numberDiff line change
@@ -345,4 +345,9 @@ default boolean ssaBasedCreateUpdateMatchForDependentResources() {
345345
return true;
346346
}
347347

348+
349+
default boolean previousAnnotationForDependentResources() {
350+
return true;
351+
}
352+
348353
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

+8
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
6262
return secondaryToPrimaryMapper;
6363
}
6464

65+
@Override
6566
public Optional<OnDeleteFilter<? super R>> onDeleteFilter() {
6667
return Optional.ofNullable(onDeleteFilter);
6768
}
@@ -83,6 +84,10 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
8384
*/
8485
boolean followControllerNamespaceChanges();
8586

87+
default boolean parseResourceVersions() {
88+
return false;
89+
}
90+
8691
/**
8792
* Returns the configured {@link SecondaryToPrimaryMapper} which will allow JOSDK to identify
8893
* which secondary resources are associated with a given primary resource in cases where there is
@@ -95,12 +100,15 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
95100
*/
96101
SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();
97102

103+
@Override
98104
Optional<OnAddFilter<? super R>> onAddFilter();
99105

106+
@Override
100107
Optional<OnUpdateFilter<? super R>> onUpdateFilter();
101108

102109
Optional<OnDeleteFilter<? super R>> onDeleteFilter();
103110

111+
@Override
104112
Optional<GenericFilter<? super R>> genericFilter();
105113

106114
<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public R create(R target, P primary, Context<P> context) {
113113
target.getMetadata().setResourceVersion("1");
114114
}
115115
}
116-
addMetadata(false, null, target, primary);
116+
addMetadata(false, null, target, primary, context);
117117
final var resource = prepare(target, primary, "Creating");
118118
return useSSA(context)
119119
? resource
@@ -129,7 +129,7 @@ public R update(R actual, R target, P primary, Context<P> context) {
129129
actual.getMetadata().getResourceVersion());
130130
}
131131
R updatedResource;
132-
addMetadata(false, actual, target, primary);
132+
addMetadata(false, actual, target, primary, context);
133133
if (useSSA(context)) {
134134
updatedResource = prepare(target, primary, "Updating")
135135
.fieldManager(context.getControllerConfiguration().fieldManager())
@@ -160,7 +160,7 @@ public Result<R> match(R actualResource, R desired, P primary, Context<P> contex
160160
public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMatcher<R> matcher,
161161
Context<P> context) {
162162
final boolean matches;
163-
addMetadata(true, actualResource, desired, primary);
163+
addMetadata(true, actualResource, desired, primary, context);
164164
if (useSSA(context)) {
165165
matches = SSABasedGenericKubernetesResourceMatcher.getInstance()
166166
.matches(actualResource, desired, context);
@@ -170,8 +170,9 @@ public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMa
170170
return Result.computed(matches, desired);
171171
}
172172

173-
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary) {
174-
if (forMatch) { // keep the current
173+
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary,
174+
Context<P> context) {
175+
if (forMatch) { // keep the current previous annotation
175176
String actual = actualResource.getMetadata().getAnnotations()
176177
.get(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
177178
Map<String, String> annotations = target.getMetadata().getAnnotations();
@@ -180,7 +181,7 @@ protected void addMetadata(boolean forMatch, R actualResource, final R target, P
180181
} else {
181182
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
182183
}
183-
} else { // set a new one
184+
} else if (usePreviousAnnotation(context)) { // set a new one
184185
eventSource().orElseThrow().addPreviousAnnotation(
185186
Optional.ofNullable(actualResource).map(r -> r.getMetadata().getResourceVersion())
186187
.orElse(null),
@@ -194,6 +195,11 @@ private boolean useSSA(Context<P> context) {
194195
.ssaBasedCreateUpdateMatchForDependentResources();
195196
}
196197

198+
private boolean usePreviousAnnotation(Context<P> context) {
199+
return context.getControllerConfiguration().getConfigurationService()
200+
.previousAnnotationForDependentResources();
201+
}
202+
197203
@Override
198204
protected void handleDelete(P primary, R secondary, Context<P> context) {
199205
if (secondary != null) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6969
extends ManagedInformerEventSource<R, P, InformerConfiguration<R>>
7070
implements ResourceEventHandler<R> {
7171

72+
private static final int MAX_RESOURCE_VERSIONS = 256;
73+
7274
public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
7375

7476
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
@@ -79,6 +81,7 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7981
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
8082
private Map<String, Function<R, List<String>>> indexerBuffer = new HashMap<>();
8183
private final String id = UUID.randomUUID().toString();
84+
private final Set<String> knownResourceVersions;
8285

8386
public InformerEventSource(
8487
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
@@ -88,7 +91,16 @@ public InformerEventSource(
8891
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
8992
super(client.resources(configuration.getResourceClass()), configuration);
9093
this.configuration = configuration;
91-
94+
if (this.configuration.parseResourceVersions()) {
95+
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
96+
@Override
97+
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
98+
return size() >= MAX_RESOURCE_VERSIONS;
99+
}
100+
});
101+
} else {
102+
knownResourceVersions = null;
103+
}
92104

93105
// If there is a primary to secondary mapper there is no need for primary to secondary index.
94106
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
@@ -172,6 +184,10 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
172184
}
173185

174186
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
187+
if (knownResourceVersions != null
188+
&& knownResourceVersions.contains(newObject.getMetadata().getResourceVersion())) {
189+
return true;
190+
}
175191
var res = temporaryResourceCache.getResourceFromCache(resourceID);
176192
if (res.isEmpty()) {
177193
return isEventKnownFromAnnotation(newObject, oldObject);
@@ -258,6 +274,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
258274

259275
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
260276
primaryToSecondaryIndex.onAddOrUpdate(newResource);
277+
if (knownResourceVersions != null) {
278+
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
279+
}
261280
temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource)
262281
.map(r -> r.getMetadata().getResourceVersion()).orElse(null));
263282
}
@@ -271,6 +290,9 @@ public boolean allowsNamespaceChanges() {
271290
return getConfiguration().followControllerNamespaceChanges();
272291
}
273292

293+
public boolean parseResourceVersions() {
294+
return getConfiguration().parseResourceVersions();
295+
}
274296

275297
private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
276298
if (genericFilter != null && !genericFilter.accept(newObject)) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,17 @@ protected ManagedInformerEventSource(
5252

5353
@Override
5454
public void onAdd(R resource) {
55-
temporaryResourceCache.removeResourceFromCache(resource);
55+
temporaryResourceCache.onEvent(resource, false);
5656
}
5757

5858
@Override
5959
public void onUpdate(R oldObj, R newObj) {
60-
temporaryResourceCache.removeResourceFromCache(newObj);
60+
temporaryResourceCache.onEvent(newObj, false);
6161
}
6262

6363
@Override
6464
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
65-
temporaryResourceCache.removeResourceFromCache(obj);
65+
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
6666
}
6767

6868
protected InformerManager<R, C> manager() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

+32-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.LoggerFactory;
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
11+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1112
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
1213
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1314

@@ -41,8 +42,15 @@ public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInforme
4142
this.managedInformerEventSource = managedInformerEventSource;
4243
}
4344

44-
public synchronized Optional<T> removeResourceFromCache(T resource) {
45-
return Optional.ofNullable(cache.remove(ResourceID.fromResource(resource)));
45+
public synchronized void onEvent(T resource, boolean unknownState) {
46+
ResourceID resourceId = ResourceID.fromResource(resource);
47+
48+
T cachedResource = cache.get(resourceId);
49+
50+
if (unknownState || (cachedResource != null
51+
&& !isLaterResourceVersion(resourceId, cachedResource, resource))) {
52+
cache.remove(resourceId);
53+
}
4654
}
4755

4856
public synchronized void putAddedResource(T newResource) {
@@ -61,18 +69,34 @@ public synchronized void putResource(T newResource, String previousResourceVersi
6169
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
6270

6371
if ((previousResourceVersion == null && cachedResource == null)
64-
|| (cachedResource != null && previousResourceVersion != null
65-
&& cachedResource.getMetadata().getResourceVersion()
66-
.equals(previousResourceVersion))) {
72+
|| (cachedResource != null
73+
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
74+
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
6775
log.debug(
6876
"Temporarily moving ahead to target version {} for resource id: {}",
6977
newResource.getMetadata().getResourceVersion(), resourceId);
7078
putToCache(newResource, resourceId);
71-
} else {
72-
if (cache.remove(resourceId) != null) {
73-
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
79+
} else if (cache.remove(resourceId) != null) {
80+
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
81+
}
82+
}
83+
84+
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
85+
try {
86+
if (managedInformerEventSource.getInformerConfiguration() instanceof InformerConfiguration
87+
&& ((InformerConfiguration) managedInformerEventSource.getInformerConfiguration())
88+
.parseResourceVersions()
89+
&& Long.compare(Long.parseLong(newResource.getMetadata().getResourceVersion()),
90+
Long.parseLong(cachedResource.getMetadata().getResourceVersion())) > 1) {
91+
return true;
7492
}
93+
} catch (NumberFormatException e) {
94+
log.debug(
95+
"Could not compare resourceVersions {} and {} for {}",
96+
newResource.getMetadata().getResourceVersion(),
97+
cachedResource.getMetadata().getResourceVersion(), resourceId);
7598
}
99+
return false;
76100
}
77101

78102
private void putToCache(T resource, ResourceID resourceID) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
119119
informerEventSource.onUpdate(cachedDeployment, testDeployment());
120120

121121
verify(eventHandlerMock, times(1)).handleEvent(any());
122-
verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any());
122+
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
123123
}
124124

125125
@Test

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
7575
void removesResourceFromCache() {
7676
ConfigMap testResource = propagateTestResourceToCache();
7777

78-
temporaryResourceCache.removeResourceFromCache(testResource());
78+
temporaryResourceCache.onEvent(testResource(), false);
7979

8080
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
8181
.isNotPresent();

0 commit comments

Comments
 (0)