Skip to content

eventing refinements mentioned on #2012 #2034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,31 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
return Set.of(ConfigMap.class, Secret.class);
}

/**
* If a javaoperatorsdk.io/previous annotation should be used so that the operator sdk can detect
* events from its own updates of dependent resources and then filter them.
* <p>
* Disable this if you want to react to your own dependent resource updates
*
* @since 4.5.0
*/
default boolean previousAnnotationForDependentResourcesEventFiltering() {
return true;
}

/**
* If the event logic should parse the resourceVersion to determine the ordering of dependent
* resource events. This is typically not needed.
* <p>
* Disabled by default as Kubernetes does not support, and discourages, this interpretation of
* resourceVersions. Enable only if your api server event processing seems to lag the operator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a more concrete example of when setting this to true would help. As is, it's not obvious when people might want to use this and I would rather not expose this at all if this isn't sufficiently / broadly applicable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's basically for feature parity with the mutable cache that is in go client - I'm not aware of user request for the feature, so if you feel strongly about it, then it could be hidden or removed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metacosm @csviri what do you think - would it be better to pull out or hide the resourceVersion parsing logic?

* logic and you want to further minimize the the amount of work done / updates issued by the
* operator.
*
* @since 4.5.0
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ConfigurationServiceOverrider {
private ResourceClassResolver resourceClassResolver;
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
private Boolean previousAnnotationForDependentResources;
private Boolean parseResourceVersions;

ConfigurationServiceOverrider(ConfigurationService original) {
this.original = original;
Expand Down Expand Up @@ -158,6 +160,18 @@ public ConfigurationServiceOverrider withDefaultNonSSAResource(
return this;
}

public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(
boolean value) {
this.previousAnnotationForDependentResources = value;
return this;
}

public ConfigurationServiceOverrider wihtParseResourceVersions(
boolean value) {
this.parseResourceVersions = value;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, client) {
@Override
Expand Down Expand Up @@ -270,6 +284,20 @@ public Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
return defaultNonSSAResource != null ? defaultNonSSAResource
: super.defaultNonSSAResource();
}

@Override
public boolean previousAnnotationForDependentResourcesEventFiltering() {
return previousAnnotationForDependentResources != null
? previousAnnotationForDependentResources
: super.previousAnnotationForDependentResourcesEventFiltering();
}

@Override
public boolean parseResourceVersionsForEventFilteringAndCaching() {
return parseResourceVersions != null
? parseResourceVersions
: super.parseResourceVersionsForEventFilteringAndCaching();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
return secondaryToPrimaryMapper;
}

@Override
public Optional<OnDeleteFilter<? super R>> onDeleteFilter() {
return Optional.ofNullable(onDeleteFilter);
}
Expand Down Expand Up @@ -95,12 +96,15 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
*/
SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();

@Override
Optional<OnAddFilter<? super R>> onAddFilter();

@Override
Optional<OnUpdateFilter<? super R>> onUpdateFilter();

Optional<OnDeleteFilter<? super R>> onDeleteFilter();

@Override
Optional<GenericFilter<? super R>> genericFilter();

<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public R create(R desired, P primary, Context<P> context) {
desired.getMetadata().setResourceVersion("1");
}
}
addMetadata(false, null, desired, primary);
addMetadata(false, null, desired, primary, context);
sanitizeDesired(desired, null, primary, context);
final var resource = prepare(desired, primary, "Creating");
return useSSA(context)
Expand All @@ -130,7 +130,7 @@ public R update(R actual, R desired, P primary, Context<P> context) {
actual.getMetadata().getResourceVersion());
}
R updatedResource;
addMetadata(false, actual, desired, primary);
addMetadata(false, actual, desired, primary, context);
sanitizeDesired(desired, actual, primary, context);
if (useSSA(context)) {
updatedResource = prepare(desired, primary, "Updating")
Expand Down Expand Up @@ -163,7 +163,7 @@ public Result<R> match(R actualResource, R desired, P primary, Context<P> contex
public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMatcher<R> matcher,
Context<P> context) {
final boolean matches;
addMetadata(true, actualResource, desired, primary);
addMetadata(true, actualResource, desired, primary, context);
if (useSSA(context)) {
matches = SSABasedGenericKubernetesResourceMatcher.getInstance()
.matches(actualResource, desired, context);
Expand All @@ -173,8 +173,9 @@ public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMa
return Result.computed(matches, desired);
}

protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary) {
if (forMatch) { // keep the current
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary,
Context<P> context) {
if (forMatch) { // keep the current previous annotation
String actual = actualResource.getMetadata().getAnnotations()
.get(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
Map<String, String> annotations = target.getMetadata().getAnnotations();
Expand All @@ -183,7 +184,7 @@ protected void addMetadata(boolean forMatch, R actualResource, final R target, P
} else {
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
}
} else { // set a new one
} else if (usePreviousAnnotation(context)) { // set a new one
eventSource().orElseThrow().addPreviousAnnotation(
Optional.ofNullable(actualResource).map(r -> r.getMetadata().getResourceVersion())
.orElse(null),
Expand All @@ -208,6 +209,11 @@ protected boolean useSSA(Context<P> context) {
.ssaBasedCreateUpdateMatchForDependentResources());
}

private boolean usePreviousAnnotation(Context<P> context) {
return context.getControllerConfiguration().getConfigurationService()
.previousAnnotationForDependentResourcesEventFiltering();
}

@Override
protected void handleDelete(P primary, R secondary, Context<P> context) {
if (secondary != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class ControllerResourceEventSource<T extends HasMetadata>

@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerResourceEventSource(Controller<T> controller) {
super(controller.getCRClient(), controller.getConfiguration());
super(controller.getCRClient(), controller.getConfiguration(), false);
this.controller = controller;

final var config = controller.getConfiguration();
OnUpdateFilter internalOnUpdateFilter =
(OnUpdateFilter<T>) onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
config.getFinalizerName())
.or(onUpdateGenerationAware(config.isGenerationAware()))
.or(onUpdateMarkedForDeletion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,18 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>

public InformerEventSource(
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
this(configuration, context.getClient());
this(configuration, context.getClient(),
context.getControllerConfiguration().getConfigurationService()
.parseResourceVersionsForEventFilteringAndCaching());
}

public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
super(client.resources(configuration.getResourceClass()), configuration);
this(configuration, client, false);
}

public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client,
boolean parseResourceVersions) {
super(client.resources(configuration.getResourceClass()), configuration, parseResourceVersions);
// If there is a primary to secondary mapper there is no need for primary to secondary index.
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (primaryToSecondaryMapper == null) {
Expand Down Expand Up @@ -169,6 +175,9 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
}

private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
if (temporaryResourceCache.isKnownResourceVersion(newObject)) {
return true;
}
var res = temporaryResourceCache.getResourceFromCache(resourceID);
if (res.isEmpty()) {
return isEventKnownFromAnnotation(newObject, oldObject);
Expand Down Expand Up @@ -275,7 +284,6 @@ public boolean allowsNamespaceChanges() {
return configuration().followControllerNamespaceChanges();
}


private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
if (genericFilter != null && !genericFilter.accept(newObject)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,27 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
protected MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client;

protected ManagedInformerEventSource(
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration,
boolean parseResourceVersions) {
super(configuration.getResourceClass());
this.client = client;
temporaryResourceCache = new TemporaryResourceCache<>(this);
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
this.cache = new InformerManager<>(client, configuration, this);
}

@Override
public void onAdd(R resource) {
temporaryResourceCache.removeResourceFromCache(resource);
temporaryResourceCache.onEvent(resource, false);
}

@Override
public void onUpdate(R oldObj, R newObj) {
temporaryResourceCache.removeResourceFromCache(newObj);
temporaryResourceCache.onEvent(newObj, false);
}

@Override
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
temporaryResourceCache.removeResourceFromCache(obj);
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
}

protected InformerManager<R, C> manager() {
Expand Down Expand Up @@ -127,6 +128,7 @@ void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache)
this.temporaryResourceCache = temporaryResourceCache;
}

@Override
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
cache.addIndexers(indexers);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

Expand All @@ -33,16 +37,33 @@
public class TemporaryResourceCache<T extends HasMetadata> {

private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
private static final int MAX_RESOURCE_VERSIONS = 256;

private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
private final boolean parseResourceVersions;
private final Set<String> knownResourceVersions;

public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
boolean parseResourceVersions) {
this.managedInformerEventSource = managedInformerEventSource;
this.parseResourceVersions = parseResourceVersions;
if (parseResourceVersions) {
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
@Override
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
return size() >= MAX_RESOURCE_VERSIONS;
}
});
} else {
knownResourceVersions = null;
}
}

public synchronized Optional<T> removeResourceFromCache(T resource) {
return Optional.ofNullable(cache.remove(ResourceID.fromResource(resource)));
public synchronized void onEvent(T resource, boolean unknownState) {
cache.computeIfPresent(ResourceID.fromResource(resource),
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null
: cached);
}

public synchronized void putAddedResource(T newResource) {
Expand All @@ -56,23 +77,50 @@ public synchronized void putAddedResource(T newResource) {
* @param previousResourceVersion null indicates an add
*/
public synchronized void putResource(T newResource, String previousResourceVersion) {
if (knownResourceVersions != null) {
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
}
var resourceId = ResourceID.fromResource(newResource);
var cachedResource = getResourceFromCache(resourceId)
.orElse(managedInformerEventSource.get(resourceId).orElse(null));

if ((previousResourceVersion == null && cachedResource == null)
|| (cachedResource != null && previousResourceVersion != null
&& cachedResource.getMetadata().getResourceVersion()
.equals(previousResourceVersion))) {
|| (cachedResource != null
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
log.debug(
"Temporarily moving ahead to target version {} for resource id: {}",
newResource.getMetadata().getResourceVersion(), resourceId);
putToCache(newResource, resourceId);
} else {
if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
} else if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
}
}

public boolean isKnownResourceVersion(T resource) {
return knownResourceVersions != null
&& knownResourceVersions.contains(resource.getMetadata().getResourceVersion());
}

/**
* @return true if {@link InformerConfiguration#parseResourceVersions()} is enabled and the
* resourceVersion of newResource is numerically greater than cachedResource, otherwise
* false
*/
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
try {
if (parseResourceVersions
&& Long.parseLong(newResource.getMetadata().getResourceVersion()) > Long
.parseLong(cachedResource.getMetadata().getResourceVersion())) {
return true;
}
} catch (NumberFormatException e) {
log.debug(
"Could not compare resourceVersions {} and {} for {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be this a warning? even if it could pollute cache? (no strong opinion)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't pollute the cache, it would exhibit the same behavior as it does now. It can of course be at whatever level you want.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean pollute logs :)

newResource.getMetadata().getResourceVersion(),
cachedResource.getMetadata().getResourceVersion(), resourceId);
}
return false;
}

private void putToCache(T resource, ResourceID resourceID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
informerEventSource.onUpdate(cachedDeployment, testDeployment());

verify(eventHandlerMock, times(1)).handleEvent(any());
verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any());
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
}

@Test
Expand Down
Loading