diff --git a/micrometer-support/pom.xml b/micrometer-support/pom.xml index 9ca97c7186..73c8e7ed7d 100644 --- a/micrometer-support/pom.xml +++ b/micrometer-support/pom.xml @@ -5,7 +5,7 @@ java-operator-sdk io.javaoperatorsdk - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT 4.0.0 diff --git a/operator-framework-core/pom.xml b/operator-framework-core/pom.xml index a615428790..a1804086d6 100644 --- a/operator-framework-core/pom.xml +++ b/operator-framework-core/pom.xml @@ -6,7 +6,7 @@ io.javaoperatorsdk java-operator-sdk - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT ../pom.xml diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index b85df49eba..d4739f8267 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -1,103 +1,36 @@ package io.javaoperatorsdk.operator.api.config; -import java.lang.reflect.ParameterizedType; import java.util.Collections; -import java.util.Set; +import java.util.List; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.reconciler.Constants; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceControllerFactory; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters; -public interface ControllerConfiguration { +@SuppressWarnings("rawtypes") +public interface ControllerConfiguration extends ResourceConfiguration { default String getName() { return ReconcilerUtils.getDefaultReconcilerName(getAssociatedReconcilerClassName()); } - default String getResourceTypeName() { - return ReconcilerUtils.getResourceTypeName(getResourceClass()); - } - default String getFinalizer() { return ReconcilerUtils.getDefaultFinalizerName(getResourceClass()); } - /** - * Retrieves the label selector that is used to filter which custom resources are actually watched - * by the associated controller. See - * https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ for more details on - * syntax. - * - * @return the label selector filtering watched custom resources - */ - default String getLabelSelector() { - return null; - } - default boolean isGenerationAware() { return true; } - default Class getResourceClass() { - ParameterizedType type = (ParameterizedType) getClass().getGenericInterfaces()[0]; - return (Class) type.getActualTypeArguments()[0]; - } - String getAssociatedReconcilerClassName(); - default Set getNamespaces() { - return Collections.emptySet(); - } - - default boolean watchAllNamespaces() { - return allNamespacesWatched(getNamespaces()); - } - - static boolean allNamespacesWatched(Set namespaces) { - return namespaces == null || namespaces.isEmpty(); - } - - default boolean watchCurrentNamespace() { - return currentNamespaceWatched(getNamespaces()); - } - - static boolean currentNamespaceWatched(Set namespaces) { - return namespaces != null - && namespaces.size() == 1 - && namespaces.contains( - Constants.WATCH_CURRENT_NAMESPACE); - } - - /** - * Computes the effective namespaces based on the set specified by the user, in particular - * retrieves the current namespace from the client when the user specified that they wanted to - * watch the current namespace only. - * - * @return a Set of namespace names the associated controller will watch - */ - default Set getEffectiveNamespaces() { - var targetNamespaces = getNamespaces(); - if (watchCurrentNamespace()) { - final var parent = getConfigurationService(); - if (parent == null) { - throw new IllegalStateException( - "Parent ConfigurationService must be set before calling this method"); - } - targetNamespaces = Collections.singleton(parent.getClientConfiguration().getNamespace()); - } - return targetNamespaces; - } - default RetryConfiguration getRetryConfiguration() { return RetryConfiguration.DEFAULT; } - ConfigurationService getConfigurationService(); - - default void setConfigurationService(ConfigurationService service) {} - default boolean useFinalizer() { return !Constants.NO_FINALIZER .equals(getFinalizer()); @@ -114,4 +47,12 @@ default boolean useFinalizer() { default ResourceEventFilter getEventFilter() { return ResourceEventFilters.passthrough(); } + + default List getDependentResources() { + return Collections.emptyList(); + } + + default DependentResourceControllerFactory dependentFactory() { + return new DependentResourceControllerFactory<>() {}; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index e8e2ef1162..fee1212deb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -86,7 +86,8 @@ public ControllerConfiguration build() { labelSelector, customResourcePredicate, original.getResourceClass(), - original.getConfigurationService()); + original.getConfigurationService(), + original.getDependentResources()); } public static ControllerConfigurationOverrider override( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java index 860152745b..bc10002ebd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.api.config; import java.util.Collections; +import java.util.List; import java.util.Set; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -20,8 +21,10 @@ public class DefaultControllerConfiguration private final String labelSelector; private final ResourceEventFilter resourceEventFilter; private final Class resourceClass; + private final List dependents; private ConfigurationService service; + // NOSONAR constructor is meant to provide all information public DefaultControllerConfiguration( String associatedControllerClassName, String name, @@ -33,7 +36,8 @@ public DefaultControllerConfiguration( String labelSelector, ResourceEventFilter resourceEventFilter, Class resourceClass, - ConfigurationService service) { + ConfigurationService service, + List dependents) { this.associatedControllerClassName = associatedControllerClassName; this.name = name; this.crdName = crdName; @@ -52,6 +56,7 @@ public DefaultControllerConfiguration( resourceClass == null ? ControllerConfiguration.super.getResourceClass() : resourceClass; setConfigurationService(service); + this.dependents = dependents != null ? dependents : Collections.emptyList(); } @Override @@ -102,7 +107,7 @@ public ConfigurationService getConfigurationService() { @Override public void setConfigurationService(ConfigurationService service) { if (this.service != null) { - throw new RuntimeException("A ConfigurationService is already associated with '" + name + throw new IllegalStateException("A ConfigurationService is already associated with '" + name + "' ControllerConfiguration. Cannot change it once set!"); } this.service = service; @@ -122,4 +127,9 @@ public Class getResourceClass() { public ResourceEventFilter getEventFilter() { return resourceEventFilter; } + + @Override + public List getDependentResources() { + return dependents; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java new file mode 100644 index 0000000000..d7b5f76815 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java @@ -0,0 +1,65 @@ +package io.javaoperatorsdk.operator.api.config; + +import java.util.Collections; +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class DefaultResourceConfiguration + implements ResourceConfiguration { + + private final String labelSelector; + private final Set namespaces; + private final boolean watchAllNamespaces; + private final Class resourceClass; + private ConfigurationService service; + + public DefaultResourceConfiguration(String labelSelector, Class resourceClass, + String... namespaces) { + this(labelSelector, resourceClass, + namespaces != null ? Set.of(namespaces) : Collections.emptySet()); + } + + public DefaultResourceConfiguration(String labelSelector, Class resourceClass, + Set namespaces) { + this.labelSelector = labelSelector; + this.resourceClass = resourceClass; + this.namespaces = namespaces != null ? namespaces : Collections.emptySet(); + this.watchAllNamespaces = this.namespaces.isEmpty(); + } + + @Override + public String getResourceTypeName() { + return ResourceConfiguration.super.getResourceTypeName(); + } + + @Override + public String getLabelSelector() { + return labelSelector; + } + + @Override + public Set getNamespaces() { + return namespaces; + } + + @Override + public boolean watchAllNamespaces() { + return watchAllNamespaces; + } + + @Override + public ConfigurationService getConfigurationService() { + return service; + } + + @Override + public Class getResourceClass() { + return resourceClass; + } + + @Override + public void setConfigurationService(ConfigurationService service) { + this.service = service; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Dependent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Dependent.java new file mode 100644 index 0000000000..ed013fafb0 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Dependent.java @@ -0,0 +1,8 @@ +package io.javaoperatorsdk.operator.api.config; + +public @interface Dependent { + + Class resourceType(); + + Class type(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DependentResource.java new file mode 100644 index 0000000000..f4d5df89fb --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DependentResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.api.config; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +public interface DependentResource { + default EventSource initEventSource(EventSourceContext

context) { + throw new IllegalStateException("Must be implemented if not automatically provided by the SDK"); + }; + + default Class resourceType() { + return (Class) Utils.getFirstTypeArgumentFromInterface(getClass()); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index dfe64d97d0..254a539ac3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -36,6 +36,9 @@ public static void init(ConfigurationService configuration) { log.debug("Initialized ExecutorServiceManager executor: {}, timeout: {}", configuration.getExecutorService().getClass(), configuration.getTerminationTimeoutSeconds()); + log.debug("Initialized ExecutorServiceManager executor: {}, timeout: {}", + configuration.getExecutorService().getClass(), + configuration.getTerminationTimeoutSeconds()); } else { log.debug("Already started, reusing already setup instance!"); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/KubernetesDependent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/KubernetesDependent.java new file mode 100644 index 0000000000..587fd58f85 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/KubernetesDependent.java @@ -0,0 +1,34 @@ +package io.javaoperatorsdk.operator.api.config; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +import static io.javaoperatorsdk.operator.api.reconciler.Constants.EMPTY_STRING; + +@Target({ElementType.TYPE}) +public @interface KubernetesDependent { + + boolean OWNED_DEFAULT = true; + boolean SKIP_UPDATE_DEFAULT = true; + + boolean owned() default OWNED_DEFAULT; + + boolean skipUpdateIfUnchanged() default SKIP_UPDATE_DEFAULT; + + /** + * Specified which namespaces this Controller monitors for custom resources events. If no + * namespace is specified then the controller will monitor all namespaces by default. + * + * @return the list of namespaces this controller monitors + */ + String[] namespaces() default {}; + + /** + * Optional label selector used to identify the set of custom resources the controller will acc + * upon. The label selector can be made of multiple comma separated requirements that acts as a + * logical AND operator. + * + * @return the label selector + */ + String labelSelector() default EMPTY_STRING; +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java new file mode 100644 index 0000000000..1e1d58b917 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java @@ -0,0 +1,78 @@ +package io.javaoperatorsdk.operator.api.config; + +import java.util.Collections; +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.ReconcilerUtils; +import io.javaoperatorsdk.operator.api.reconciler.Constants; + +public interface ResourceConfiguration { + + default String getResourceTypeName() { + return ReconcilerUtils.getResourceTypeName(getResourceClass()); + } + + /** + * Retrieves the label selector that is used to filter which resources are actually watched by the + * associated event source. See + * https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ for more details on + * syntax. + * + * @return the label selector filtering watched resources + */ + default String getLabelSelector() { + return null; + } + + @SuppressWarnings("unchecked") + default Class getResourceClass() { + return (Class) Utils.getFirstTypeArgumentFromInterface(getClass()); + } + + default Set getNamespaces() { + return Collections.emptySet(); + } + + default boolean watchAllNamespaces() { + return allNamespacesWatched(getNamespaces()); + } + + static boolean allNamespacesWatched(Set namespaces) { + return namespaces == null || namespaces.isEmpty(); + } + + default boolean watchCurrentNamespace() { + return currentNamespaceWatched(getNamespaces()); + } + + static boolean currentNamespaceWatched(Set namespaces) { + return namespaces != null + && namespaces.size() == 1 + && namespaces.contains(Constants.WATCH_CURRENT_NAMESPACE); + } + + /** + * Computes the effective namespaces based on the set specified by the user, in particular + * retrieves the current namespace from the client when the user specified that they wanted to + * watch the current namespace only. + * + * @return a Set of namespace names the associated controller will watch + */ + default Set getEffectiveNamespaces() { + var targetNamespaces = getNamespaces(); + if (watchCurrentNamespace()) { + final var parent = getConfigurationService(); + if (parent == null) { + throw new IllegalStateException( + "Parent ConfigurationService must be set before calling this method"); + } + targetNamespaces = Collections.singleton(parent.getClientConfiguration().getNamespace()); + } + return targetNamespaces; + } + + ConfigurationService getConfigurationService(); + + void setConfigurationService(ConfigurationService service); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java index b36c0468cd..3944cd3ecc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.api.config; import java.io.IOException; +import java.lang.reflect.ParameterizedType; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.Date; @@ -67,4 +68,9 @@ public static boolean shouldCheckCRDAndValidateLocalModel() { public static boolean debugThreadPool() { return Boolean.getBoolean(System.getProperty(DEBUG_THREAD_POOL_ENV_KEY, "false")); } + + public static Class getFirstTypeArgumentFromInterface(Class clazz) { + ParameterizedType type = (ParameterizedType) clazz.getGenericInterfaces()[0]; + return (Class) type.getActualTypeArguments()[0]; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/AttributeHolder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/AttributeHolder.java new file mode 100644 index 0000000000..8fee3c2557 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/AttributeHolder.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.Optional; + +public interface AttributeHolder { + + Optional get(Object key, Class expectedType); + + T getMandatory(Object key, Class expectedType); + + Optional put(Object key, Object value); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java index d697325219..6870fef80e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java @@ -2,7 +2,7 @@ import java.util.Optional; -public interface Context { +public interface Context extends AttributeHolder { Optional getRetryInfo(); @@ -11,4 +11,11 @@ default Optional getSecondaryResource(Class expectedType) { } Optional getSecondaryResource(Class expectedType, String eventSourceName); + + @Override + default T getMandatory(Object key, Class expectedType) { + return get(key, expectedType).orElseThrow(() -> new IllegalStateException( + "Mandatory attribute (key: " + key + ", type: " + expectedType.getName() + + ") is missing or not of the expected type")); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ContextInitializer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ContextInitializer.java new file mode 100644 index 0000000000..e076723d0c --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ContextInitializer.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public interface ContextInitializer

{ + void initContext(P primary, Context context); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index fad4bc8573..5870820f0f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -5,6 +5,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import io.javaoperatorsdk.operator.api.config.Dependent; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceController; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; @Retention(RetentionPolicy.RUNTIME) @@ -56,4 +58,14 @@ */ @SuppressWarnings("rawtypes") Class[] eventFilters() default {}; + + /** + * Optional list of classes providing {@link DependentResourceController} implementations + * encapsulating logic to handle the associated + * {@link io.javaoperatorsdk.operator.processing.Controller}'s reconciliation of dependent + * resources + * + * @return the list of {@link DependentResourceController} implementations + */ + Dependent[] dependents() default {}; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index 3d924c2753..c2553a5d57 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -5,7 +5,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.Controller; -public class DefaultContext

implements Context { +public class DefaultContext

extends MapAttributeHolder implements Context { private final RetryInfo retryInfo; private final Controller

controller; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContext.java index 0a93d33d40..026af88923 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContext.java @@ -11,7 +11,7 @@ * * @param

the type associated with the primary resource that is handled by your reconciler */ -public class EventSourceContext

{ +public class EventSourceContext

extends MapAttributeHolder { private final ResourceCache

primaryCache; private final ConfigurationService configurationService; @@ -49,7 +49,7 @@ public ConfigurationService getConfigurationService() { /** * Provides access to the {@link KubernetesClient} used by the current * {@link io.javaoperatorsdk.operator.Operator} instance. - * + * * @return the {@link KubernetesClient} used by the current * {@link io.javaoperatorsdk.operator.Operator} instance. */ diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContextInjector.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContextInjector.java new file mode 100644 index 0000000000..af57fe1b32 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceContextInjector.java @@ -0,0 +1,5 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +public interface EventSourceContextInjector { + void injectInto(EventSourceContext context); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/MapAttributeHolder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/MapAttributeHolder.java new file mode 100644 index 0000000000..aee5ee7e54 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/MapAttributeHolder.java @@ -0,0 +1,28 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class MapAttributeHolder { + + private final ConcurrentHashMap attributes = new ConcurrentHashMap(); + + public Optional get(Object key, Class expectedType) { + return Optional.ofNullable(attributes.get(key)) + .filter(expectedType::isInstance) + .map(expectedType::cast); + } + + public Optional put(Object key, Object value) { + if (value == null) { + return Optional.ofNullable(attributes.remove(key)); + } + return Optional.ofNullable(attributes.put(key, value)); + } + + public T getMandatory(Object key, Class expectedType) { + return get(key, expectedType).orElseThrow(() -> new IllegalStateException( + "Mandatory attribute (key: " + key + ", type: " + expectedType.getName() + + ") is missing or not of the expected type")); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Builder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Builder.java new file mode 100644 index 0000000000..4dfaf91e64 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Builder.java @@ -0,0 +1,9 @@ +package io.javaoperatorsdk.operator.api.reconciler.dependent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.Context; + +@FunctionalInterface +public interface Builder { + R buildFor(P primary, Context context); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Cleaner.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Cleaner.java new file mode 100644 index 0000000000..a6a3d691be --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Cleaner.java @@ -0,0 +1,9 @@ +package io.javaoperatorsdk.operator.api.reconciler.dependent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.Context; + +public interface Cleaner { + + void delete(R fetched, P primary, Context context); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResourceController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResourceController.java new file mode 100644 index 0000000000..043aec2f53 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResourceController.java @@ -0,0 +1,87 @@ +package io.javaoperatorsdk.operator.api.reconciler.dependent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +public class DependentResourceController + implements DependentResource, Builder, Updater, Persister, + Cleaner { + + private final Builder builder; + private final Updater updater; + private final Cleaner cleaner; + private final Persister persister; + private final DependentResource delegate; + + @SuppressWarnings("unchecked") + public DependentResourceController(DependentResource delegate) { + this.delegate = delegate; + builder = (delegate instanceof Builder) ? (Builder) delegate : null; + updater = (delegate instanceof Updater) ? (Updater) delegate : null; + cleaner = (delegate instanceof Cleaner) ? (Cleaner) delegate : null; + persister = initPersister(delegate); + } + + @SuppressWarnings("unchecked") + protected Persister initPersister(DependentResource delegate) { + if (delegate instanceof Persister) { + return (Persister) delegate; + } else { + throw new IllegalArgumentException( + "DependentResource '" + delegate.getClass().getName() + "' must implement Persister"); + } + } + + public String descriptionFor(R resource) { + return resource.toString(); + } + + @Override + public R buildFor(P primary, Context context) { + return builder.buildFor(primary, context); + } + + @Override + public R update(R fetched, P primary, Context context) { + return updater.update(fetched, primary, context); + } + + @Override + public void delete(R fetched, P primary, Context context) { + cleaner.delete(fetched, primary, context); + } + + public Class getResourceType() { + return delegate.resourceType(); + } + + @Override + public EventSource initEventSource(EventSourceContext

context) { + return delegate.initEventSource(context); + } + + public boolean creatable() { + return builder != null; + } + + public boolean updatable() { + return updater != null; + } + + public boolean deletable() { + return cleaner != null; + } + + @Override + public void createOrReplace(R dependentResource, Context context) { + persister.createOrReplace(dependentResource, context); + } + + @Override + public R getFor(P primary, Context context) { + return persister.getFor(primary, context); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResourceControllerFactory.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResourceControllerFactory.java new file mode 100644 index 0000000000..f814cd7a1a --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResourceControllerFactory.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.api.reconciler.dependent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.DependentResource; + +public interface DependentResourceControllerFactory

{ + + default DependentResourceController from(DependentResource dependent) { + // todo: this needs to be cleaned-up / redesigned + return dependent instanceof DependentResourceController + ? (DependentResourceController) dependent + : new DependentResourceController<>(dependent); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/KubernetesDependentResourceController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/KubernetesDependentResourceController.java new file mode 100644 index 0000000000..da11c4f938 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/KubernetesDependentResourceController.java @@ -0,0 +1,74 @@ +package io.javaoperatorsdk.operator.api.reconciler.dependent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.processing.event.source.AssociatedSecondaryResourceIdentifier; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; + +public class KubernetesDependentResourceController + extends DependentResourceController { + private final InformerConfiguration configuration; + private final boolean owned; + private KubernetesClient client; + private InformerEventSource informer; + + + public KubernetesDependentResourceController(DependentResource delegate, + InformerConfiguration configuration, boolean owned) { + super(delegate); + // todo: check if we can validate that types actually match properly + final var associatedPrimaries = + (delegate instanceof PrimaryResourcesRetriever) + ? (PrimaryResourcesRetriever) delegate + : configuration.getPrimaryResourcesRetriever(); + final var associatedSecondary = + (delegate instanceof AssociatedSecondaryResourceIdentifier) + ? (AssociatedSecondaryResourceIdentifier

) delegate + : configuration.getAssociatedResourceIdentifier(); + + this.configuration = InformerConfiguration.from(configuration) + .withPrimaryResourcesRetriever(associatedPrimaries) + .withAssociatedSecondaryResourceIdentifier(associatedSecondary) + .build(); + this.owned = owned; + } + + @Override + protected Persister initPersister(DependentResource delegate) { + return (delegate instanceof Persister) ? (Persister) delegate : this; + } + + @Override + public String descriptionFor(R resource) { + return String.format("'%s' %s dependent in namespace %s", resource.getMetadata().getName(), + resource.getFullResourceName(), + resource.getMetadata().getNamespace()); + } + + @Override + public EventSource initEventSource(EventSourceContext

context) { + this.client = context.getClient(); + informer = new InformerEventSource<>(configuration, context); + return informer; + } + + @Override + public void createOrReplace(R dependentResource, Context context) { + client.resource(dependentResource).createOrReplace(); + } + + @Override + public R getFor(P primary, Context context) { + return informer.getAssociated(primary).orElse(null); + } + + public boolean owned() { + return owned; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Persister.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Persister.java new file mode 100644 index 0000000000..ab34372917 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Persister.java @@ -0,0 +1,11 @@ +package io.javaoperatorsdk.operator.api.reconciler.dependent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.Context; + +public interface Persister { + + void createOrReplace(R dependentResource, Context context); + + R getFor(P primary, Context context); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Updater.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Updater.java new file mode 100644 index 0000000000..c92f3c5ced --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Updater.java @@ -0,0 +1,10 @@ +package io.javaoperatorsdk.operator.api.reconciler.dependent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.Context; + +@FunctionalInterface +public interface Updater { + + R update(R fetched, P primary, Context context); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 5e93b0d7af..e0b9efae77 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -1,8 +1,12 @@ package io.javaoperatorsdk.operator.processing; +import java.util.LinkedList; import java.util.List; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition; @@ -25,16 +29,23 @@ import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.dependent.DependentResourceManager; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +@SuppressWarnings({"rawtypes", "unchecked"}) public class Controller implements Reconciler, LifecycleAware, EventSourceInitializer { + + private static final Logger log = LoggerFactory.getLogger(Controller.class); + private final Reconciler reconciler; private final ControllerConfiguration configuration; private final KubernetesClient kubernetesClient; - private EventSourceManager eventSourceManager; - private volatile ConfigurationService configurationService; + private final EventSourceManager eventSourceManager; + private final DependentResourceManager dependents; + + private ConfigurationService configurationService; public Controller(Reconciler reconciler, ControllerConfiguration configuration, @@ -42,10 +53,15 @@ public Controller(Reconciler reconciler, this.reconciler = reconciler; this.configuration = configuration; this.kubernetesClient = kubernetesClient; + + eventSourceManager = new EventSourceManager<>(this); + dependents = new DependentResourceManager<>(this); } @Override public DeleteControl cleanup(R resource, Context context) { + dependents.cleanup(resource, context); + return metrics().timeControllerExecution( new ControllerExecution<>() { @Override @@ -72,6 +88,8 @@ public DeleteControl execute() { @Override public UpdateControl reconcile(R resource, Context context) { + dependents.reconcile(resource, context); + return metrics().timeControllerExecution( new ControllerExecution<>() { @Override @@ -103,6 +121,7 @@ public UpdateControl execute() { }); } + private Metrics metrics() { final var metrics = configurationService().getMetrics(); return metrics != null ? metrics : Metrics.NOOP; @@ -110,7 +129,14 @@ private Metrics metrics() { @Override public List prepareEventSources(EventSourceContext context) { - throw new UnsupportedOperationException("This method should never be called directly"); + final var dependentSources = dependents.prepareEventSources(context); + List sources = new LinkedList<>(dependentSources); + + // add manually defined event sources + if (reconciler instanceof EventSourceInitializer) { + sources.addAll(((EventSourceInitializer) reconciler).prepareEventSources(context)); + } + return sources; } @Override @@ -165,6 +191,8 @@ public void start() throws OperatorException { final String controllerName = configuration.getName(); final var crdName = configuration.getResourceTypeName(); final var specVersion = "v1"; + log.info("Starting '{}' controller for reconciler: {}, resource: {}", controllerName, + reconciler.getClass().getCanonicalName(), resClass.getCanonicalName()); try { // check that the custom resource is known by the cluster if configured that way final CustomResourceDefinition crd; // todo: check proper CRD spec version based on config @@ -180,21 +208,23 @@ public void start() throws OperatorException { CustomResourceUtils.assertCustomResource(resClass, crd); } - eventSourceManager = new EventSourceManager<>(this); - if (reconciler instanceof EventSourceInitializer) { - ((EventSourceInitializer) reconciler) - .prepareEventSources(new EventSourceContext<>( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), - configurationService(), kubernetesClient)) - .forEach(eventSourceManager::registerEventSource); - } if (failOnMissingCurrentNS()) { throw new OperatorException( "Controller '" + controllerName + "' is configured to watch the current namespace but it couldn't be inferred from the current configuration."); } + + final var context = new EventSourceContext<>( + eventSourceManager.getControllerResourceEventSource().getResourceCache(), + configurationService(), kubernetesClient); + + dependents.injectInto(context); + prepareEventSources(context).forEach(eventSourceManager::registerEventSource); + eventSourceManager.start(); + + log.info("'{}' controller started, pending event sources initialization", controllerName); } catch (MissingCRDException e) { throwMissingCRDException(crdName, specVersion, controllerName); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DependentResourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DependentResourceManager.java new file mode 100644 index 0000000000..655113d251 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DependentResourceManager.java @@ -0,0 +1,140 @@ +package io.javaoperatorsdk.operator.processing.dependent; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContextInjector; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceController; +import io.javaoperatorsdk.operator.api.reconciler.dependent.KubernetesDependentResourceController; +import io.javaoperatorsdk.operator.processing.Controller; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class DependentResourceManager implements EventSourceInitializer, + EventSourceContextInjector, Reconciler { + + private static final Logger log = LoggerFactory.getLogger(DependentResourceManager.class); + + private final Reconciler reconciler; + private final ControllerConfiguration configuration; + private List dependents; + + + public DependentResourceManager(Controller controller) { + this.reconciler = controller.getReconciler(); + this.configuration = controller.getConfiguration(); + } + + @Override + public List prepareEventSources(EventSourceContext context) { + final List configured = configuration.getDependentResources(); + dependents = new ArrayList<>(configured.size()); + + List sources = new ArrayList<>(configured.size() + 5); + configured.forEach(dependent -> { + dependents.add(configuration.dependentFactory().from(dependent)); + sources.add(dependent.initEventSource(context)); + }); + + return sources; + } + + @Override + public void injectInto(EventSourceContext context) { + if (reconciler instanceof EventSourceContextInjector) { + EventSourceContextInjector injector = (EventSourceContextInjector) reconciler; + injector.injectInto(context); + } + } + + @Override + public UpdateControl reconcile(R resource, Context context) { + initContextIfNeeded(resource, context); + + dependents.stream() + .filter(dependent -> dependent.creatable() || dependent.updatable()) + .forEach(dependent -> { + var dependentResource = dependent.getFor(resource, context); + if (dependent.creatable() && dependentResource == null) { + // we need to create the dependent + dependentResource = dependent.buildFor(resource, context); + createOrReplaceDependent(resource, context, dependent, dependentResource, "Creating"); + } else if (dependent.updatable()) { + dependentResource = dependent.update(dependentResource, resource, context); + createOrReplaceDependent(resource, context, dependent, dependentResource, "Updating"); + } else { + logOperationInfo(resource, dependent, dependentResource, "Ignoring"); + } + }); + + return UpdateControl.noUpdate(); + } + + @Override + public DeleteControl cleanup(R resource, Context context) { + initContextIfNeeded(resource, context); + + dependents.stream() + .filter(DependentResourceController::deletable) + .forEach(dependent -> { + var dependentResource = dependent.getFor(resource, context); + if (dependentResource != null) { + dependent.delete(dependentResource, resource, context); + logOperationInfo(resource, dependent, dependentResource, "Deleting"); + } else { + log.info("Ignoring already deleted {} for '{}' {}", + dependent.getResourceType().getName(), + resource.getMetadata().getName(), + configuration.getResourceTypeName()); + } + }); + + return Reconciler.super.cleanup(resource, context); + } + + private void createOrReplaceDependent(R primaryResource, + Context context, DependentResourceController dependentController, + Object dependentResource, String operationDescription) { + // add owner reference if needed + if (dependentResource instanceof HasMetadata + && ((KubernetesDependentResourceController) dependentController).owned()) { + ((HasMetadata) dependentResource).addOwnerReference(primaryResource); + } + + logOperationInfo(primaryResource, dependentController, dependentResource, operationDescription); + + // commit the changes + // todo: add metrics timing for dependent resource + dependentController.createOrReplace(dependentResource, context); + } + + private void logOperationInfo(R resource, DependentResourceController dependent, + Object dependentResource, String operationDescription) { + if (log.isInfoEnabled()) { + log.info("{} {} for '{}' {}", operationDescription, + dependent.descriptionFor(dependentResource), + resource.getMetadata().getName(), + configuration.getResourceTypeName()); + } + } + + private void initContextIfNeeded(R resource, Context context) { + if (reconciler instanceof ContextInitializer) { + final var initializer = (ContextInitializer) reconciler; + initializer.initContext(resource, context); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 55f42cd438..6e0b218d55 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -48,6 +48,9 @@ public EventSourceManager(Controller controller) { // controller event source needs to be available before we create the event processor final var controllerEventSource = eventSources.initControllerEventSource(controller); this.eventProcessor = new EventProcessor<>(this); + + // sources need to be registered after the event processor is created since it's set on the + // event source registerEventSource(eventSources.retryEventSource()); registerEventSource(controllerEventSource); } @@ -66,12 +69,15 @@ public EventSourceManager(Controller controller) { public void start() { lock.lock(); try { - log.debug("Starting event sources."); for (var eventSource : eventSources) { try { + logEventSourceEvent(eventSource, "Starting"); eventSource.start(); + logEventSourceEvent(eventSource, "Started"); + } catch (MissingCRDException e) { + throw e; // leave untouched } catch (Exception e) { - log.warn("Error starting {} -> {}", eventSource, e); + throw new OperatorException("Couldn't start source " + eventSource.name(), e); } } eventProcessor.start(); @@ -80,16 +86,30 @@ public void start() { } } + @SuppressWarnings("rawtypes") + private void logEventSourceEvent(EventSource eventSource, String event) { + if (log.isDebugEnabled()) { + if (eventSource instanceof ResourceEventSource) { + ResourceEventSource source = (ResourceEventSource) eventSource; + log.debug("{} event source {} for {}", event, eventSource.name(), + source.getResourceClass()); + } else { + log.debug("{} event source {}", event, eventSource.name()); + } + } + } + @Override public void stop() { lock.lock(); try { - log.debug("Closing event sources."); for (var eventSource : eventSources) { try { + logEventSourceEvent(eventSource, "Stopping"); eventSource.stop(); + logEventSourceEvent(eventSource, "Stopped"); } catch (Exception e) { - log.warn("Error closing {} -> {}", eventSource, e); + log.warn("Error closing {} -> {}", eventSource.name(), e); } } eventSources.clear(); @@ -106,13 +126,10 @@ public final void registerEventSource(EventSource eventSource) try { eventSources.add(eventSource); eventSource.setEventHandler(eventProcessor); - } catch (Throwable e) { - if (e instanceof IllegalStateException || e instanceof MissingCRDException) { - // leave untouched - throw e; - } - throw new OperatorException( - "Couldn't register event source: " + eventSource.getClass().getName(), e); + } catch (IllegalStateException | MissingCRDException e) { + throw e; // leave untouched + } catch (Exception e) { + throw new OperatorException("Couldn't register event source: " + eventSource.name(), e); } finally { lock.unlock(); } @@ -219,7 +236,7 @@ public void add(EventSource eventSource) { sources.computeIfAbsent(keyFor(eventSource), k -> new ArrayList<>()).add(eventSource); } - private Class getDependentType(EventSource source) { + private Class getDependentType(EventSource source) { return source instanceof ResourceEventSource ? ((ResourceEventSource) source).getResourceClass() : source.getClass(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java index 5fa45e0a25..6767e974e2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java @@ -5,7 +5,7 @@ public abstract class AbstractEventSource implements EventSource { - private volatile EventHandler handler; + private EventHandler handler; private volatile boolean running = false; protected EventHandler getEventHandler() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java index 486ffb81a6..aac04115a3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java @@ -7,7 +7,6 @@ import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -28,7 +27,7 @@ public abstract class CachingEventSource protected UpdatableCache cache; - public CachingEventSource(Class resourceClass) { + protected CachingEventSource(Class resourceClass) { super(resourceClass); cache = initCache(); } @@ -84,11 +83,6 @@ public Optional getCachedValue(ResourceID resourceID) { return cache.get(resourceID); } - @Override - public void stop() throws OperatorException { - super.stop(); - } - @Override public Optional getAssociated(P primary) { return cache.get(ResourceID.fromResource(primary)); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceContextAware.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceContextAware.java new file mode 100644 index 0000000000..95042d0c41 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSourceContextAware.java @@ -0,0 +1,8 @@ +package io.javaoperatorsdk.operator.processing.event.source; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; + +public interface EventSourceContextAware

{ + void initWith(EventSourceContext

context); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 9feabf40bf..2a224a87df 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -1,34 +1,27 @@ package io.javaoperatorsdk.operator.processing.event.source.controller; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.MissingCRDException; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; +import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; public class ControllerResourceEventSource - extends AbstractResourceEventSource + extends ManagedInformerEventSource> implements ResourceEventHandler { public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace"; @@ -36,20 +29,12 @@ public class ControllerResourceEventSource private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class); private final Controller controller; - private final Map> sharedIndexInformers = - new ConcurrentHashMap<>(); - private final ResourceEventFilter filter; private final OnceWhitelistEventFilterEventFilter onceWhitelistEventFilterEventFilter; - private final ControllerResourceCache cache; public ControllerResourceEventSource(Controller controller) { - super(controller.getConfiguration().getResourceClass()); + super(controller.getCRClient(), controller.getConfiguration()); this.controller = controller; - final var configurationService = controller.getConfiguration().getConfigurationService(); - var cloner = configurationService != null ? configurationService.getResourceCloner() - : ConfigurationService.DEFAULT_CLONER; - this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner); var filters = new ResourceEventFilter[] { ResourceEventFilters.finalizerNeededAndApplied(), @@ -73,70 +58,27 @@ public ControllerResourceEventSource(Controller controller) { @Override public void start() { - final var configuration = controller.getConfiguration(); - final var targetNamespaces = configuration.getEffectiveNamespaces(); - final var client = controller.getCRClient(); - final var labelSelector = configuration.getLabelSelector(); - try { - if (ControllerConfiguration.allNamespacesWatched(targetNamespaces)) { - final var informer = - createAndRunInformerFor(client.inAnyNamespace() - .withLabelSelector(labelSelector), ANY_NAMESPACE_MAP_KEY); - log.debug("Registered {} -> {} for any namespace", controller, informer); - } else { - targetNamespaces.forEach(ns -> { - final var informer = createAndRunInformerFor( - client.inNamespace(ns).withLabelSelector(labelSelector), ns); - log.debug("Registered {} -> {} for namespace: {}", controller, informer, ns); - }); - } + super.start(); } catch (Exception e) { if (e instanceof KubernetesClientException) { handleKubernetesClientException(e); } throw e; } - super.start(); - } - - private SharedIndexInformer createAndRunInformerFor( - FilterWatchListDeletable> filteredBySelectorClient, String key) { - var informer = filteredBySelectorClient.runnableInformer(0); - informer.addEventHandler(this); - sharedIndexInformers.put(key, informer); - informer.run(); - return informer; } - @Override - public void stop() { - for (SharedIndexInformer informer : sharedIndexInformers.values()) { - try { - log.info("Stopping informer {} -> {}", controller, informer); - informer.stop(); - } catch (Exception e) { - log.warn("Error stopping informer {} -> {}", controller, informer, e); - } - } - super.stop(); - } - - public void eventReceived(ResourceAction action, T customResource, T oldResource) { + public void eventReceived(ResourceAction action, T resource, T oldResource) { try { - log.debug( - "Event received for resource: {}", getName(customResource)); - MDCUtils.addResourceInfo(customResource); - controller.getEventSourceManager().broadcastOnResourceEvent(action, customResource, - oldResource); - if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) { + log.debug("Event received for resource: {}", getName(resource)); + MDCUtils.addResourceInfo(resource); + controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource); + if (filter.acceptChange(controller.getConfiguration(), oldResource, resource)) { getEventHandler().handleEvent( - new ResourceEvent(action, ResourceID.fromResource(customResource))); + new ResourceEvent(action, ResourceID.fromResource(resource))); } else { - log.debug( - "Skipping event handling resource {} with version: {}", - getUID(customResource), - getVersion(customResource)); + log.debug("Skipping event handling resource {} with version: {}", getUID(resource), + getVersion(resource)); } } finally { MDCUtils.removeResourceInfo(); @@ -158,24 +100,8 @@ public void onDelete(T resource, boolean b) { eventReceived(ResourceAction.DELETED, resource, null); } - public Optional get(ResourceID resourceID) { - return cache.get(resourceID); - } - - public ControllerResourceCache getResourceCache() { - return cache; - } - - /** - * @return shared informers by namespace. If custom resource is not namespace scoped use - * CustomResourceEventSource.ANY_NAMESPACE_MAP_KEY - */ - public Map> getInformers() { - return Collections.unmodifiableMap(sharedIndexInformers); - } - - public SharedIndexInformer getInformer(String namespace) { - return getInformers().get(Objects.requireNonNullElse(namespace, ANY_NAMESPACE_MAP_KEY)); + public ResourceCache getResourceCache() { + return manager(); } /** @@ -204,6 +130,6 @@ private void handleKubernetesClientException(Exception e) { @Override public Optional getAssociated(T primary) { - return cache.get(ResourceID.fromResource(primary)); + return manager().get(ResourceID.fromResource(primary)); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerConfiguration.java new file mode 100644 index 0000000000..10bfdcdcb8 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerConfiguration.java @@ -0,0 +1,132 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.DefaultResourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.AssociatedSecondaryResourceIdentifier; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever; + +public class InformerConfiguration + extends DefaultResourceConfiguration { + + private final PrimaryResourcesRetriever secondaryToPrimaryResourcesIdSet; + private final AssociatedSecondaryResourceIdentifier

associatedWith; + private final boolean skipUpdateEventPropagationIfNoChange; + + private InformerConfiguration(ConfigurationService service, String labelSelector, + Class resourceClass, + PrimaryResourcesRetriever secondaryToPrimaryResourcesIdSet, + AssociatedSecondaryResourceIdentifier

associatedWith, + boolean skipUpdateEventPropagationIfNoChange, Set namespaces) { + super(labelSelector, resourceClass, namespaces); + setConfigurationService(service); + this.secondaryToPrimaryResourcesIdSet = + Objects.requireNonNullElse(secondaryToPrimaryResourcesIdSet, Mappers.fromOwnerReference()); + this.associatedWith = + Objects.requireNonNullElseGet(associatedWith, () -> ResourceID::fromResource); + this.skipUpdateEventPropagationIfNoChange = skipUpdateEventPropagationIfNoChange; + } + + public PrimaryResourcesRetriever getPrimaryResourcesRetriever() { + return secondaryToPrimaryResourcesIdSet; + } + + public AssociatedSecondaryResourceIdentifier

getAssociatedResourceIdentifier() { + return associatedWith; + } + + public boolean isSkipUpdateEventPropagationIfNoChange() { + return skipUpdateEventPropagationIfNoChange; + } + + public static class InformerConfigurationBuilder { + + private PrimaryResourcesRetriever secondaryToPrimaryResourcesIdSet; + private AssociatedSecondaryResourceIdentifier

associatedWith; + private boolean skipUpdateEventPropagationIfNoChange = true; + private Set namespaces; + private String labelSelector; + private final Class resourceClass; + private final ConfigurationService configurationService; + + private InformerConfigurationBuilder(Class resourceClass, + ConfigurationService configurationService) { + this.resourceClass = resourceClass; + this.configurationService = configurationService; + } + + public InformerConfigurationBuilder withPrimaryResourcesRetriever( + PrimaryResourcesRetriever primaryResourcesRetriever) { + this.secondaryToPrimaryResourcesIdSet = primaryResourcesRetriever; + return this; + } + + public InformerConfigurationBuilder withAssociatedSecondaryResourceIdentifier( + AssociatedSecondaryResourceIdentifier

associatedWith) { + this.associatedWith = associatedWith; + return this; + } + + public InformerConfigurationBuilder withoutSkippingEventPropagationIfUnchanged() { + this.skipUpdateEventPropagationIfNoChange = false; + return this; + } + + public InformerConfigurationBuilder skippingEventPropagationIfUnchanged( + boolean skipIfUnchanged) { + this.skipUpdateEventPropagationIfNoChange = skipIfUnchanged; + return this; + } + + public InformerConfigurationBuilder withNamespaces(String... namespaces) { + this.namespaces = namespaces != null ? Set.of(namespaces) : Collections.emptySet(); + return this; + } + + public InformerConfigurationBuilder withNamespaces(Set namespaces) { + this.namespaces = namespaces != null ? namespaces : Collections.emptySet(); + return this; + } + + + public InformerConfigurationBuilder withLabelSelector(String labelSelector) { + this.labelSelector = labelSelector; + return this; + } + + public InformerConfiguration build() { + return new InformerConfiguration<>(configurationService, labelSelector, resourceClass, + secondaryToPrimaryResourcesIdSet, associatedWith, skipUpdateEventPropagationIfNoChange, + namespaces); + } + } + + public static InformerConfigurationBuilder from( + EventSourceContext

context, Class resourceClass) { + return new InformerConfigurationBuilder<>(resourceClass, context.getConfigurationService()); + } + + public static InformerConfigurationBuilder from(ConfigurationService configurationService, + Class resourceClass) { + return new InformerConfigurationBuilder<>(resourceClass, configurationService); + } + + public static InformerConfigurationBuilder from( + InformerConfiguration configuration) { + return new InformerConfigurationBuilder(configuration.getResourceClass(), + configuration.getConfigurationService()) + .withNamespaces(configuration.getNamespaces()) + .withLabelSelector(configuration.getLabelSelector()) + .skippingEventPropagationIfUnchanged( + configuration.isSkipUpdateEventPropagationIfNoChange()) + .withAssociatedSecondaryResourceIdentifier( + configuration.getAssociatedResourceIdentifier()) + .withPrimaryResourcesRetriever(configuration.getPrimaryResourcesRetriever()); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index eada096713..fa9e516585 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,112 +1,70 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.informers.SharedInformer; -import io.fabric8.kubernetes.client.informers.cache.Store; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; -import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource; -import io.javaoperatorsdk.operator.processing.event.source.AssociatedSecondaryResourceIdentifier; -import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever; +import io.javaoperatorsdk.operator.processing.event.source.EventSourceContextAware; import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; public class InformerEventSource - extends AbstractResourceEventSource - implements ResourceCache { - - private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); + extends ManagedInformerEventSource> + implements ResourceCache, ResourceEventHandler { - private final SharedInformer sharedInformer; - private final PrimaryResourcesRetriever secondaryToPrimaryResourcesIdSet; - private final AssociatedSecondaryResourceIdentifier

associatedWith; - private final boolean skipUpdateEventPropagationIfNoChange; + private final InformerConfiguration configuration; - public InformerEventSource(SharedInformer sharedInformer, - PrimaryResourcesRetriever resourceToTargetResourceIDSet) { - this(sharedInformer, resourceToTargetResourceIDSet, null, true); - } + public InformerEventSource(InformerConfiguration configuration, + EventSourceContext

context) { + super(context.getClient().resources(configuration.getResourceClass()), configuration); + this.configuration = configuration; - public InformerEventSource(KubernetesClient client, Class type, - PrimaryResourcesRetriever resourceToTargetResourceIDSet) { - this(client, type, resourceToTargetResourceIDSet, false); - } + // init mappers with context if needed + final var primaryResourcesRetriever = configuration.getPrimaryResourcesRetriever(); + if (primaryResourcesRetriever instanceof EventSourceContextAware) { + ((EventSourceContextAware) primaryResourcesRetriever).initWith(context); + } - public InformerEventSource(KubernetesClient client, Class type, - PrimaryResourcesRetriever resourceToTargetResourceIDSet, - AssociatedSecondaryResourceIdentifier

associatedWith, - boolean skipUpdateEventPropagationIfNoChange) { - this(client.informers().sharedIndexInformerFor(type, 0), resourceToTargetResourceIDSet, - associatedWith, - skipUpdateEventPropagationIfNoChange); + final var associatedResourceIdentifier = configuration.getAssociatedResourceIdentifier(); + if (associatedResourceIdentifier instanceof EventSourceContextAware) { + ((EventSourceContextAware) associatedResourceIdentifier).initWith(context); + } } - InformerEventSource(KubernetesClient client, Class type, - PrimaryResourcesRetriever resourceToTargetResourceIDSet, - boolean skipUpdateEventPropagationIfNoChange) { - this(client.informers().sharedIndexInformerFor(type, 0), resourceToTargetResourceIDSet, null, - skipUpdateEventPropagationIfNoChange); + @Override + public void onAdd(T t) { + propagateEvent(t); } - public InformerEventSource(SharedInformer sharedInformer, - PrimaryResourcesRetriever resourceToTargetResourceIDSet, - AssociatedSecondaryResourceIdentifier

associatedWith, - boolean skipUpdateEventPropagationIfNoChange) { - super(sharedInformer.getApiTypeClass()); - this.sharedInformer = sharedInformer; - this.secondaryToPrimaryResourcesIdSet = resourceToTargetResourceIDSet; - this.skipUpdateEventPropagationIfNoChange = skipUpdateEventPropagationIfNoChange; - if (sharedInformer.isRunning()) { - log.warn( - "Informer is already running on event source creation, this is not desirable and may " + - "lead to non deterministic behavior."); + @Override + public void onUpdate(T oldObject, T newObject) { + if (newObject == null) { + // this is a fix for this potential issue with informer: + // https://github.com/java-operator-sdk/java-operator-sdk/issues/830 + propagateEvent(oldObject); + return; } - this.associatedWith = - Objects.requireNonNullElseGet(associatedWith, () -> ResourceID::fromResource); - - sharedInformer.addEventHandler(new ResourceEventHandler<>() { - @Override - public void onAdd(T t) { - propagateEvent(t); - } - - @Override - public void onUpdate(T oldObject, T newObject) { - if (newObject == null) { - // this is a fix for this potential issue with informer: - // https://github.com/java-operator-sdk/java-operator-sdk/issues/830 - propagateEvent(oldObject); - return; - } - - if (InformerEventSource.this.skipUpdateEventPropagationIfNoChange && - oldObject.getMetadata().getResourceVersion() - .equals(newObject.getMetadata().getResourceVersion())) { - return; - } - propagateEvent(newObject); - } + if (configuration.isSkipUpdateEventPropagationIfNoChange() && + oldObject.getMetadata().getResourceVersion() + .equals(newObject.getMetadata().getResourceVersion())) { + return; + } + propagateEvent(newObject); + } - @Override - public void onDelete(T t, boolean b) { - propagateEvent(t); - } - }); + @Override + public void onDelete(T t, boolean b) { + propagateEvent(t); } private void propagateEvent(T object) { - var primaryResourceIdSet = secondaryToPrimaryResourcesIdSet.associatedPrimaryResources(object); + var primaryResourceIdSet = + configuration.getPrimaryResourcesRetriever().associatedPrimaryResources(object); if (primaryResourceIdSet.isEmpty()) { return; } @@ -126,16 +84,12 @@ private void propagateEvent(T object) { @Override public void start() { - sharedInformer.run(); + manager().start(); } @Override public void stop() { - sharedInformer.close(); - } - - private Store getStore() { - return sharedInformer.getStore(); + manager().stop(); } /** @@ -145,37 +99,14 @@ private Store getStore() { * @param resource the primary resource we want to retrieve the associated resource for * @return the informed resource associated with the specified primary resource */ + @Override public Optional getAssociated(P resource) { - final var id = associatedWith.associatedSecondaryID(resource); + final var id = configuration.getAssociatedResourceIdentifier().associatedSecondaryID(resource); return get(id); } - - public SharedInformer getSharedInformer() { - return sharedInformer; - } - - @Override - public Optional get(ResourceID resourceID) { - return Optional.ofNullable(sharedInformer.getStore() - .getByKey(io.fabric8.kubernetes.client.informers.cache.Cache.namespaceKeyFunc( - resourceID.getNamespace().orElse(null), - resourceID.getName()))); - } - - @Override - public Stream list(Predicate predicate) { - return getStore().list().stream().filter(predicate); - } - @Override public Stream list(String namespace, Predicate predicate) { - return getStore().list().stream() - .filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v)); - } - - @Override - public Stream keys() { - return getStore().listKeys().stream().map(Mappers::fromString); + return manager().list(namespace, predicate); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java new file mode 100644 index 0000000000..e65f34df48 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -0,0 +1,144 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.config.Cloner; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; +import io.javaoperatorsdk.operator.processing.LifecycleAware; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.Cache; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; +import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache; + +public class InformerManager> + implements LifecycleAware, ResourceCache, UpdatableCache { + + private static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace"; + private static final Logger log = LoggerFactory.getLogger(InformerManager.class); + + private final Map> sources = new ConcurrentHashMap<>(); + private Cloner cloner; + + @Override + public void start() throws OperatorException { + sources.values().parallelStream().forEach(LifecycleAware::start); + } + + void initSources(MixedOperation, Resource> client, + C configuration, ResourceEventHandler eventHandler) { + final var service = configuration.getConfigurationService(); + cloner = service == null ? ConfigurationService.DEFAULT_CLONER : service.getResourceCloner(); + + final var targetNamespaces = configuration.getEffectiveNamespaces(); + final var labelSelector = configuration.getLabelSelector(); + + if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) { + final var filteredBySelectorClient = + client.inAnyNamespace().withLabelSelector(labelSelector); + final var source = + createEventSource(filteredBySelectorClient, eventHandler, ANY_NAMESPACE_MAP_KEY); + log.debug("Registered {} -> {} for any namespace", this, source); + } else { + targetNamespaces.forEach( + ns -> { + final var source = + createEventSource(client.inNamespace(ns).withLabelSelector(labelSelector), + eventHandler, ns); + log.debug("Registered {} -> {} for namespace: {}", this, source, + ns); + }); + } + } + + + private InformerWrapper createEventSource( + FilterWatchListDeletable> filteredBySelectorClient, + ResourceEventHandler eventHandler, String key) { + var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0)); + source.addEventHandler(eventHandler); + sources.put(key, source); + return source; + } + + @Override + public void stop() { + for (InformerWrapper source : sources.values()) { + try { + log.info("Stopping informer {} -> {}", this, source); + source.stop(); + } catch (Exception e) { + log.warn("Error stopping informer {} -> {}", this, source, e); + } + } + } + + @Override + public Stream list(Predicate predicate) { + if (predicate == null) { + return sources.values().stream().flatMap(ResourceCache::list); + } + return sources.values().stream().flatMap(i -> i.list(predicate)); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + if (isWatchingAllNamespaces()) { + return getSource(ANY_NAMESPACE_MAP_KEY) + .map(source -> source.list(namespace, predicate)) + .orElse(Stream.empty()); + } else { + return getSource(namespace) + .map(source -> source.list(predicate)) + .orElse(Stream.empty()); + } + } + + @Override + public Optional get(ResourceID resourceID) { + return getSource(resourceID.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY)) + .flatMap(source -> source.get(resourceID)) + .map(cloner::clone); + } + + @Override + public Stream keys() { + return sources.values().stream().flatMap(Cache::keys); + } + + private boolean isWatchingAllNamespaces() { + return sources.containsKey(ANY_NAMESPACE_MAP_KEY); + } + + private Optional> getSource(String namespace) { + namespace = isWatchingAllNamespaces() || namespace == null ? ANY_NAMESPACE_MAP_KEY : namespace; + return Optional.ofNullable(sources.get(namespace)); + } + + @Override + public T remove(ResourceID key) { + return getSource(key.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY)) + .map(c -> c.remove(key)) + .orElse(null); + } + + @Override + public void put(ResourceID key, T resource) { + getSource(key.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY)) + .ifPresent(c -> c.put(key, resource)); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java new file mode 100644 index 0000000000..5199b4fb0d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerResourceCache.java @@ -0,0 +1,64 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Objects; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.SharedInformer; +import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; +import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache; + +class InformerResourceCache implements ResourceCache, UpdatableCache { + + private final Cache cache; + + public InformerResourceCache(SharedInformer informer) { + this.cache = (Cache) informer.getStore(); + } + + @Override + public Optional get(ResourceID resourceID) { + return Optional.ofNullable(cache.getByKey(getKey(resourceID))); + } + + private String getKey(ResourceID resourceID) { + return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName()); + } + + @Override + public Stream list(Predicate predicate) { + return cache.list().stream().filter(predicate); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + final var stream = cache.list().stream() + .filter(r -> namespace.equals(r.getMetadata().getNamespace())); + return predicate != null ? stream.filter(predicate) : stream; + } + + @Override + public Stream keys() { + return cache.listKeys().stream().map(Mappers::fromString); + } + + @Override + public T remove(ResourceID key) { + return cache.remove(cache.getByKey(getKey(key))); + } + + @Override + public void put(ResourceID key, T resource) { + // check that key matches the resource + final var fromResource = ResourceID.fromResource(resource); + if (!Objects.equals(key, fromResource)) { + throw new IllegalArgumentException( + "Key and resource don't match. Key: " + key + ", resource: " + fromResource); + } + cache.put(resource); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java new file mode 100644 index 0000000000..c7de41f331 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -0,0 +1,84 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.processing.LifecycleAware; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; +import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache; + +class InformerWrapper + implements LifecycleAware, ResourceCache, UpdatableCache { + private final SharedIndexInformer informer; + private final InformerResourceCache cache; + + public InformerWrapper(SharedIndexInformer informer) { + this.informer = informer; + this.cache = new InformerResourceCache<>(informer); + } + + @Override + public void start() throws OperatorException { + informer.run(); + } + + @Override + public void stop() throws OperatorException { + informer.stop(); + } + + @Override + public Optional get(ResourceID resourceID) { + return cache.get(resourceID); + } + + @Override + public boolean contains(ResourceID resourceID) { + return cache.contains(resourceID); + } + + @Override + public Stream keys() { + return cache.keys(); + } + + @Override + public Stream list() { + return cache.list(); + } + + @Override + public Stream list(Predicate predicate) { + return cache.list(predicate); + } + + @Override + public Stream list(String namespace) { + return cache.list(namespace); + } + + @Override + public Stream list(String namespace, Predicate predicate) { + return cache.list(namespace, predicate); + } + + public void addEventHandler(ResourceEventHandler eventHandler) { + informer.addEventHandler(eventHandler); + } + + @Override + public T remove(ResourceID key) { + return cache.remove(key); + } + + @Override + public void put(ResourceID key, T resource) { + cache.put(key, resource); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java new file mode 100644 index 0000000000..f0a1a1185d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -0,0 +1,43 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; +import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; +import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache; + +public abstract class ManagedInformerEventSource> + extends CachingEventSource + implements ResourceEventHandler { + + protected ManagedInformerEventSource( + MixedOperation, Resource> client, C configuration) { + super(configuration.getResourceClass()); + manager().initSources(client, configuration, this); + } + + @Override + protected UpdatableCache initCache() { + return new InformerManager<>(); + } + + protected InformerManager manager() { + return (InformerManager) cache; + } + + @Override + public void start() { + manager().start(); + super.start(); + } + + + @Override + public void stop() { + super.stop(); + manager().stop(); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java index 053528e0ff..21cd3b162a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/Mappers.java @@ -9,6 +9,8 @@ public class Mappers { + private Mappers() {} + public static PrimaryResourcesRetriever fromAnnotation( String nameKey) { return fromMetadata(nameKey, null, false); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java index a1cfa3e82d..73d3382cc4 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java @@ -44,8 +44,10 @@ private void checkException( TestControllerConfiguration duplicated) { final var exception = assertThrows(OperatorException.class, () -> { final var controllerManager = new ControllerManager(); - controllerManager.add(new Controller<>(registered.controller, registered, null)); - controllerManager.add(new Controller<>(duplicated.controller, duplicated, null)); + controllerManager.add(new Controller<>(registered.controller, registered, + MockKubernetesClient.client(registered.getResourceClass()))); + controllerManager.add(new Controller<>(duplicated.controller, duplicated, + MockKubernetesClient.client(duplicated.getResourceClass()))); }); final var msg = exception.getMessage(); assertTrue( @@ -60,7 +62,8 @@ private static class TestControllerConfiguration public TestControllerConfiguration(Reconciler controller, Class crClass) { super(null, getControllerName(controller), - CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass, null); + CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass, null, + null); this.controller = controller; } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java new file mode 100644 index 0000000000..0c2db0f6b1 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java @@ -0,0 +1,52 @@ +package io.javaoperatorsdk.operator; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.V1ApiextensionAPIGroupDSL; +import io.fabric8.kubernetes.client.dsl.ApiextensionsAPIGroupDSL; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MockKubernetesClient { + + public static KubernetesClient client(Class clazz) { + final var client = mock(KubernetesClient.class); + MixedOperation, Resource> resources = + mock(MixedOperation.class); + NonNamespaceOperation, Resource> nonNamespaceOperation = + mock(NonNamespaceOperation.class); + FilterWatchListMultiDeletable> inAnyNamespace = mock( + FilterWatchListMultiDeletable.class); + FilterWatchListDeletable> filterable = + mock(FilterWatchListDeletable.class); + when(resources.inNamespace(anyString())).thenReturn(nonNamespaceOperation); + when(nonNamespaceOperation.withLabelSelector(nullable(String.class))).thenReturn(filterable); + when(resources.inAnyNamespace()).thenReturn(inAnyNamespace); + when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable); + SharedIndexInformer informer = mock(SharedIndexInformer.class); + when(filterable.runnableInformer(anyLong())).thenReturn(informer); + when(client.resources(clazz)).thenReturn(resources); + + final var apiGroupDSL = mock(ApiextensionsAPIGroupDSL.class); + when(client.apiextensions()).thenReturn(apiGroupDSL); + final var v1 = mock(V1ApiextensionAPIGroupDSL.class); + when(apiGroupDSL.v1()).thenReturn(v1); + final var operation = mock(NonNamespaceOperation.class); + when(v1.customResourceDefinitions()).thenReturn(operation); + when(operation.withName(any())).thenReturn(mock(Resource.class)); + + return client; + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/OperatorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/OperatorTest.java index c234a0b9b2..f532756f58 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/OperatorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/OperatorTest.java @@ -19,7 +19,8 @@ class OperatorTest { - private final KubernetesClient kubernetesClient = mock(KubernetesClient.class); + private final KubernetesClient kubernetesClient = + MockKubernetesClient.client(FooCustomResource.class); private final ConfigurationService configurationService = mock(ConfigurationService.class); private final ControllerConfiguration configuration = mock(ControllerConfiguration.class); private final Operator operator = new Operator(kubernetesClient, configurationService); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationTest.java index 3bc72d81f2..65c73b6b31 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationTest.java @@ -20,6 +20,9 @@ public String getAssociatedReconcilerClassName() { public ConfigurationService getConfigurationService() { return null; } + + @Override + public void setConfigurationService(ConfigurationService service) {} }; assertEquals(TestCustomResource.class, conf.getResourceClass()); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java index d61b50d583..2954af5aa4 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java @@ -3,19 +3,14 @@ import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.Secret; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.V1ApiextensionAPIGroupDSL; -import io.fabric8.kubernetes.client.dsl.ApiextensionsAPIGroupDSL; -import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; -import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.MissingCRDException; +import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -26,7 +21,7 @@ class ControllerTest { @Test void crdShouldNotBeCheckedForNativeResources() { - final var client = mock(KubernetesClient.class); + final var client = MockKubernetesClient.client(Secret.class); final var configurationService = mock(ConfigurationService.class); final var reconciler = mock(Reconciler.class); final var configuration = mock(ControllerConfiguration.class); @@ -40,7 +35,7 @@ void crdShouldNotBeCheckedForNativeResources() { @Test void crdShouldNotBeCheckedForCustomResourcesIfDisabled() { - final var client = mock(KubernetesClient.class); + final var client = MockKubernetesClient.client(TestCustomResource.class); final var configurationService = mock(ConfigurationService.class); when(configurationService.checkCRDAndValidateLocalModel()).thenReturn(false); final var reconciler = mock(Reconciler.class); @@ -55,20 +50,13 @@ void crdShouldNotBeCheckedForCustomResourcesIfDisabled() { @Test void crdShouldBeCheckedForCustomResourcesByDefault() { - final var client = mock(KubernetesClient.class); + final var client = MockKubernetesClient.client(TestCustomResource.class); final var configurationService = mock(ConfigurationService.class); when(configurationService.checkCRDAndValidateLocalModel()).thenCallRealMethod(); final var reconciler = mock(Reconciler.class); final var configuration = mock(ControllerConfiguration.class); when(configuration.getResourceClass()).thenReturn(TestCustomResource.class); when(configuration.getConfigurationService()).thenReturn(configurationService); - final var apiGroupDSL = mock(ApiextensionsAPIGroupDSL.class); - when(client.apiextensions()).thenReturn(apiGroupDSL); - final var v1 = mock(V1ApiextensionAPIGroupDSL.class); - when(apiGroupDSL.v1()).thenReturn(v1); - final var operation = mock(NonNamespaceOperation.class); - when(v1.customResourceDefinitions()).thenReturn(operation); - when(operation.withName(any())).thenReturn(mock(Resource.class)); final var controller = new Controller(reconciler, configuration, client); // since we're not really connected to a cluster and the CRD wouldn't be deployed anyway, we diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index a330b4130c..64e742d381 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -8,8 +8,10 @@ import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -169,10 +171,10 @@ void timerAndControllerEventSourcesShouldBeListedFirst() { } private EventSourceManager initManager() { - final Controller controller = mock(Controller.class); final ControllerConfiguration configuration = mock(ControllerConfiguration.class); when(configuration.getResourceClass()).thenReturn(HasMetadata.class); - when(controller.getConfiguration()).thenReturn(configuration); + final Controller controller = new Controller(mock(Reconciler.class), configuration, + MockKubernetesClient.client(HasMetadata.class)); return new EventSourceManager(controller); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index 9da51337d3..9cd7d9481b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -12,6 +12,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.api.config.Cloner; import io.javaoperatorsdk.operator.api.config.ConfigurationService; @@ -90,7 +91,9 @@ public T clone(T object) { }); when(reconciler.cleanup(eq(customResource), any())) .thenReturn(DeleteControl.defaultDelete()); - Controller controller = new Controller<>(reconciler, configuration, null); + Controller controller = new Controller<>(reconciler, configuration, + MockKubernetesClient.client(customResource.getClass())); + controller.start(); return new ReconciliationDispatcher<>(controller, customResourceFacade); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index ce865b7888..446e654501 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -158,6 +158,9 @@ public Set getNamespaces() { public ConfigurationService getConfigurationService() { return service; } + + @Override + public void setConfigurationService(ConfigurationService service) {} } @ControllerConfiguration(namespaces = NAMESPACE) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java index 17168876a5..6c5e306263 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java @@ -7,10 +7,8 @@ import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.Resource; +import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; @@ -171,6 +169,7 @@ public ControllerConfig(String finalizer, boolean generationAware, null, eventFilter, customResourceClass, + null, null); } } @@ -178,18 +177,13 @@ public ControllerConfig(String finalizer, boolean generationAware, private static class TestController extends Controller { public TestController(ControllerConfiguration configuration) { - super(null, configuration, null); + super(null, configuration, MockKubernetesClient.client(TestCustomResource.class)); } @Override public EventSourceManager getEventSourceManager() { return mock(EventSourceManager.class); } - - @Override - public MixedOperation, Resource> getCRClient() { - return mock(MixedOperation.class); - } } private static class ObservedGenController @@ -197,17 +191,12 @@ private static class ObservedGenController public ObservedGenController( ControllerConfiguration configuration) { - super(null, configuration, null); + super(null, configuration, MockKubernetesClient.client(ObservedGenCustomResource.class)); } @Override public EventSourceManager getEventSourceManager() { return mock(EventSourceManager.class); } - - @Override - public MixedOperation, Resource> getCRClient() { - return mock(MixedOperation.class); - } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java index 5cdc85c553..f67ea44a5d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java @@ -6,9 +6,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.Resource; +import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; import io.javaoperatorsdk.operator.processing.Controller; @@ -28,8 +26,6 @@ class ControllerResourceEventSourceTest extends AbstractEventSourceTestBase, EventHandler> { public static final String FINALIZER = "finalizer"; - private static final MixedOperation, Resource> client = - mock(MixedOperation.class); private TestController testController = new TestController(true); @@ -141,18 +137,14 @@ private static class TestController extends Controller { mock(EventSourceManager.class); public TestController(boolean generationAware) { - super(null, new TestConfiguration(generationAware), null); + super(null, new TestConfiguration(generationAware), + MockKubernetesClient.client(TestCustomResource.class)); } @Override public EventSourceManager getEventSourceManager() { return eventSourceManager; } - - @Override - public MixedOperation, Resource> getCRClient() { - return client; - } } private static class TestConfiguration extends @@ -170,6 +162,7 @@ public TestConfiguration(boolean generationAware) { null, null, TestCustomResource.class, + null, null); } } diff --git a/operator-framework-junit5/pom.xml b/operator-framework-junit5/pom.xml index 69a163b7d7..6f66b97810 100644 --- a/operator-framework-junit5/pom.xml +++ b/operator-framework-junit5/pom.xml @@ -5,7 +5,7 @@ java-operator-sdk io.javaoperatorsdk - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT 4.0.0 diff --git a/operator-framework/pom.xml b/operator-framework/pom.xml index fd124ea222..8e7ff3fe8f 100644 --- a/operator-framework/pom.xml +++ b/operator-framework/pom.xml @@ -5,7 +5,7 @@ java-operator-sdk io.javaoperatorsdk - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT 4.0.0 diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/runtime/AnnotationConfiguration.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/runtime/AnnotationConfiguration.java index 1000cb61c4..89a100741f 100644 --- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/runtime/AnnotationConfiguration.java +++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/runtime/AnnotationConfiguration.java @@ -1,22 +1,33 @@ package io.javaoperatorsdk.operator.config.runtime; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.function.Function; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.Dependent; +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.config.KubernetesDependent; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.dependent.KubernetesDependentResourceController; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerConfiguration; +@SuppressWarnings("rawtypes") public class AnnotationConfiguration implements io.javaoperatorsdk.operator.api.config.ControllerConfiguration { private final Reconciler reconciler; private final ControllerConfiguration annotation; private ConfigurationService service; + private List dependents; public AnnotationConfiguration(Reconciler reconciler) { this.reconciler = reconciler; @@ -108,14 +119,54 @@ public ResourceEventFilter getEventFilter() { : ResourceEventFilters.passthrough(); } - public static T valueOrDefault(ControllerConfiguration controllerConfiguration, - Function mapper, - T defaultValue) { - if (controllerConfiguration == null) { - return defaultValue; - } else { - return mapper.apply(controllerConfiguration); + @Override + public List getDependentResources() { + if (dependents == null) { + final var dependentConfigs = valueOrDefault(annotation, + ControllerConfiguration::dependents, new Dependent[] {}); + if (dependentConfigs.length > 0) { + dependents = new ArrayList<>(dependentConfigs.length); + for (Dependent dependentConfig : dependentConfigs) { + final Class dependentType = dependentConfig.type(); + DependentResource dependent; + try { + dependent = dependentType.getConstructor().newInstance(); + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException + | IllegalAccessException e) { + throw new IllegalArgumentException(e); + } + + final var resourceType = dependentConfig.resourceType(); + if (HasMetadata.class.isAssignableFrom(resourceType)) { + final var kubeDependent = dependentType.getAnnotation(KubernetesDependent.class); + final var namespaces = + valueOrDefault(kubeDependent, KubernetesDependent::namespaces, new String[] {}); + final var labelSelector = + valueOrDefault(kubeDependent, KubernetesDependent::labelSelector, null); + final var owned = valueOrDefault(kubeDependent, KubernetesDependent::owned, + KubernetesDependent.OWNED_DEFAULT); + final var skipIfUnchanged = + valueOrDefault(kubeDependent, KubernetesDependent::skipUpdateIfUnchanged, + KubernetesDependent.SKIP_UPDATE_DEFAULT); + final var configuration = InformerConfiguration.from(service, resourceType) + .withLabelSelector(labelSelector) + .skippingEventPropagationIfUnchanged(skipIfUnchanged) + .withNamespaces(namespaces) + .build(); + dependent = new KubernetesDependentResourceController(dependent, configuration, owned); + } + + dependents.add(dependent); + } + } else { + dependents = Collections.emptyList(); + } } + return dependents; + } + + private static T valueOrDefault(C annotation, Function mapper, T defaultValue) { + return annotation == null ? defaultValue : mapper.apply(annotation); } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomReconciler.java index b7ceb4c11c..2a04b85d19 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informereventsource/InformerEventSourceTestCustomReconciler.java @@ -1,24 +1,23 @@ package io.javaoperatorsdk.operator.sample.informereventsource; -import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.Dependent; +import io.javaoperatorsdk.operator.api.config.DependentResource; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import io.javaoperatorsdk.operator.junit.KubernetesClientAware; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever; import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; +import io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomReconciler.ConfigMapDR; import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_FINALIZER; @@ -26,10 +25,10 @@ * Copies the config map value from spec into status. The main purpose is to test and demonstrate * sample usage of InformerEventSource */ -@ControllerConfiguration(finalizerName = NO_FINALIZER) -public class InformerEventSourceTestCustomReconciler implements - Reconciler, KubernetesClientAware, - EventSourceInitializer { +@ControllerConfiguration(finalizerName = NO_FINALIZER, + dependents = @Dependent(resourceType = ConfigMap.class, type = ConfigMapDR.class)) +public class InformerEventSourceTestCustomReconciler + implements Reconciler { private static final Logger LOGGER = LoggerFactory.getLogger(InformerEventSourceTestCustomReconciler.class); @@ -38,14 +37,18 @@ public class InformerEventSourceTestCustomReconciler implements public static final String TARGET_CONFIG_MAP_KEY = "targetStatus"; public static final String MISSING_CONFIG_MAP = "Missing Config Map"; - private KubernetesClient kubernetesClient; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); - @Override - public List prepareEventSources( - EventSourceContext context) { - return List.of(new InformerEventSource<>(kubernetesClient, ConfigMap.class, - Mappers.fromAnnotation(RELATED_RESOURCE_NAME))); + public static class ConfigMapDR + implements DependentResource, + PrimaryResourcesRetriever { + private final PrimaryResourcesRetriever retriever = Mappers.fromAnnotation( + RELATED_RESOURCE_NAME); + + @Override + public Set associatedPrimaryResources(ConfigMap dependentResource) { + return retriever.associatedPrimaryResources(dependentResource); + } } @Override @@ -69,16 +72,6 @@ public UpdateControl reconcile( return UpdateControl.updateStatus(resource); } - @Override - public KubernetesClient getKubernetesClient() { - return kubernetesClient; - } - - @Override - public void setKubernetesClient(KubernetesClient kubernetesClient) { - this.kubernetesClient = kubernetesClient; - } - public int getNumberOfExecutions() { return numberOfExecutions.get(); } diff --git a/pom.xml b/pom.xml index 22bc63c36b..3fc23d67a9 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ io.javaoperatorsdk java-operator-sdk - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT Operator SDK for Java Java SDK for implementing Kubernetes operators pom diff --git a/sample-operators/mysql-schema/pom.xml b/sample-operators/mysql-schema/pom.xml index 58d1f3155a..64be615e03 100644 --- a/sample-operators/mysql-schema/pom.xml +++ b/sample-operators/mysql-schema/pom.xml @@ -7,7 +7,7 @@ io.javaoperatorsdk sample-operators - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT sample-mysql-schema-operator diff --git a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperator.java b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperator.java index ebccbd4fe5..d9012637cf 100644 --- a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperator.java +++ b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperator.java @@ -32,11 +32,10 @@ public static void main(String[] args) throws IOException { new ConfigurationServiceOverrider(DefaultConfigurationService.instance()) .withMetrics(new MicrometerMetrics(new LoggingMeterRegistry())) .build()); - operator.register(new MySQLSchemaReconciler(client, MySQLDbConfig.loadFromEnvironmentVars())); + operator.register(new MySQLSchemaReconciler(MySQLDbConfig.loadFromEnvironmentVars())); operator.installShutdownHook(); operator.start(); - new FtBasic(new TkFork(new FkRegex("/health", "ALL GOOD!")), 8080).start(Exit.NEVER); } } diff --git a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java index af77b6ce97..ecc2aa9be7 100644 --- a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java +++ b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaReconciler.java @@ -1,101 +1,107 @@ package io.javaoperatorsdk.operator.sample; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; import java.util.Base64; -import java.util.List; import java.util.Optional; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.SecretBuilder; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.reconciler.*; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; +import io.javaoperatorsdk.operator.api.config.Dependent; +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContextInjector; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Builder; +import io.javaoperatorsdk.operator.sample.MySQLSchemaReconciler.SecretDependentResource; import io.javaoperatorsdk.operator.sample.schema.Schema; -import io.javaoperatorsdk.operator.sample.schema.SchemaService; import static java.lang.String.format; -@ControllerConfiguration +@ControllerConfiguration( + dependents = { + @Dependent(resourceType = Secret.class, type = SecretDependentResource.class), + @Dependent(resourceType = Schema.class, type = SchemaDependentResource.class) + }) public class MySQLSchemaReconciler implements Reconciler, ErrorStatusHandler, - EventSourceInitializer { - public static final String SECRET_FORMAT = "%s-secret"; - public static final String USERNAME_FORMAT = "%s-user"; - public static final int POLL_PERIOD = 500; - private final Logger log = LoggerFactory.getLogger(getClass()); + ContextInitializer, EventSourceContextInjector { + + private static final String SECRET_FORMAT = "%s-secret"; + private static final String USERNAME_FORMAT = "%s-user"; + + protected static final String MYSQL_SECRET_NAME = "mysql.secret.name"; + protected static final String MYSQL_SECRET_USERNAME = "mysql.secret.user.name"; + protected static final String MYSQL_SECRET_PASSWORD = "mysql.secret.user.password"; + protected static final String MYSQL_DB_CONFIG = "mysql.db.config"; + protected static final String BUILT_SCHEMA = "built schema"; + static final Logger log = LoggerFactory.getLogger(MySQLSchemaReconciler.class); - private final KubernetesClient kubernetesClient; private final MySQLDbConfig mysqlDbConfig; - public MySQLSchemaReconciler(KubernetesClient kubernetesClient, MySQLDbConfig mysqlDbConfig) { - this.kubernetesClient = kubernetesClient; + public MySQLSchemaReconciler(MySQLDbConfig mysqlDbConfig) { this.mysqlDbConfig = mysqlDbConfig; } + public static class SecretDependentResource + implements DependentResource, Builder { + + @Override + public Secret buildFor(MySQLSchema schema, Context context) { + return new SecretBuilder() + .withNewMetadata() + .withName(context.getMandatory(MYSQL_SECRET_NAME, String.class)) + .withNamespace(schema.getMetadata().getNamespace()) + .endMetadata() + .addToData("MYSQL_USERNAME", encode( + context.getMandatory(MYSQL_SECRET_USERNAME, String.class))) + .addToData("MYSQL_PASSWORD", encode( + context.getMandatory(MYSQL_SECRET_PASSWORD, String.class))) + .build(); + } + + private static String encode(String value) { + return Base64.getEncoder().encodeToString(value.getBytes()); + } + } + @Override - public List prepareEventSources( - EventSourceContext context) { - return List.of(new PerResourcePollingEventSource<>( - new SchemaPollingResourceSupplier(mysqlDbConfig), context.getPrimaryCache(), POLL_PERIOD, - Schema.class)); + public void injectInto(EventSourceContext context) { + context.put(MYSQL_DB_CONFIG, mysqlDbConfig); } @Override - public UpdateControl reconcile(MySQLSchema schema, Context context) { - log.info("Reconciling MySQLSchema with name: {}", schema.getMetadata().getName()); - var dbSchema = context.getSecondaryResource(Schema.class); - log.debug("Schema: {} found for: {} ", dbSchema, schema.getMetadata().getName()); - try (Connection connection = getConnection()) { - if (dbSchema.isEmpty()) { - log.debug("Creating Schema and related resources for: {}", schema.getMetadata().getName()); - var schemaName = schema.getMetadata().getName(); - String password = RandomStringUtils.randomAlphanumeric(16); - String secretName = String.format(SECRET_FORMAT, schemaName); - String userName = String.format(USERNAME_FORMAT, schemaName); - - SchemaService.createSchemaAndRelatedUser(connection, schemaName, - schema.getSpec().getEncoding(), userName, password); - createSecret(schema, password, secretName, userName); - updateStatusPojo(schema, secretName, userName); - log.info("Schema {} created - updating CR status", schema.getMetadata().getName()); - return UpdateControl.updateStatus(schema); - } else { - log.debug("No update on MySQLSchema with name: {}", schema.getMetadata().getName()); - return UpdateControl.noUpdate(); - } - } catch (SQLException e) { - log.error("Error while creating Schema", e); - throw new IllegalStateException(e); - } + public void initContext(MySQLSchema primary, Context context) { + final var name = primary.getMetadata().getName(); + // NOSONAR we don't need cryptographically-strong randomness here + final var password = RandomStringUtils.randomAlphanumeric(16); + final var secretName = String.format(SECRET_FORMAT, name); + final var userName = String.format(USERNAME_FORMAT, name); + + // put information in context for other dependents and reconciler to use + context.put(MYSQL_SECRET_PASSWORD, password); + context.put(MYSQL_SECRET_NAME, secretName); + context.put(MYSQL_SECRET_USERNAME, userName); } @Override - public DeleteControl cleanup(MySQLSchema schema, Context context) { - log.info("Cleaning up for: {}", schema.getMetadata().getName()); - try (Connection connection = getConnection()) { - var dbSchema = SchemaService.getSchema(connection, schema.getMetadata().getName()); - if (dbSchema.isPresent()) { - var userName = schema.getStatus() != null ? schema.getStatus().getUserName() : null; - SchemaService.deleteSchemaAndRelatedUser(connection, schema.getMetadata().getName(), - userName); - } else { - log.info( - "Delete event ignored for schema '{}', real schema doesn't exist", - schema.getMetadata().getName()); - } - return DeleteControl.defaultDelete(); - } catch (SQLException e) { - log.error("Error while trying to delete Schema", e); - return DeleteControl.noFinalizerRemoval(); - } + public UpdateControl reconcile(MySQLSchema schema, Context context) { + // we only need to update the status if we just built the schema, i.e. when it's present in the + // context + return context.get(BUILT_SCHEMA, Schema.class).map(s -> { + updateStatusPojo(schema, context.getMandatory(MYSQL_SECRET_NAME, String.class), + context.getMandatory(MYSQL_SECRET_USERNAME, String.class)); + log.info("Schema {} created - updating CR status", schema.getMetadata().getName()); + return UpdateControl.updateStatus(schema); + }).orElse(UpdateControl.noUpdate()); } @Override @@ -110,14 +116,6 @@ public Optional updateErrorStatus(MySQLSchema schema, RetryInfo ret return Optional.empty(); } - private Connection getConnection() throws SQLException { - String connectionString = - format("jdbc:mysql://%1$s:%2$s", mysqlDbConfig.getHost(), mysqlDbConfig.getPort()); - - log.debug("Connecting to '{}' with user '{}'", connectionString, mysqlDbConfig.getUser()); - return DriverManager.getConnection(connectionString, mysqlDbConfig.getUser(), - mysqlDbConfig.getPassword()); - } private void updateStatusPojo(MySQLSchema schema, String secretName, String userName) { SchemaStatus status = new SchemaStatus(); @@ -130,33 +128,4 @@ private void updateStatusPojo(MySQLSchema schema, String secretName, String user status.setStatus("CREATED"); schema.setStatus(status); } - - private void createSecret(MySQLSchema schema, String password, String secretName, - String userName) { - - var currentSecret = kubernetesClient.secrets().inNamespace(schema.getMetadata().getNamespace()) - .withName(secretName).get(); - if (currentSecret != null) { - return; - } - Secret credentialsSecret = - new SecretBuilder() - .withNewMetadata() - .withName(secretName) - .withOwnerReferences(new OwnerReference("mysql.sample.javaoperatorsdk/v1", - false, false, "MySQLSchema", - schema.getMetadata().getName(), schema.getMetadata().getUid())) - .endMetadata() - .addToData( - "MYSQL_USERNAME", Base64.getEncoder().encodeToString(userName.getBytes())) - .addToData( - "MYSQL_PASSWORD", Base64.getEncoder().encodeToString(password.getBytes())) - .build(); - this.kubernetesClient - .secrets() - .inNamespace(schema.getMetadata().getNamespace()) - .create(credentialsSecret); - } - - } diff --git a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/SchemaDependentResource.java b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/SchemaDependentResource.java new file mode 100644 index 0000000000..de4b0ece58 --- /dev/null +++ b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/SchemaDependentResource.java @@ -0,0 +1,88 @@ +package io.javaoperatorsdk.operator.sample; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Builder; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Persister; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; +import io.javaoperatorsdk.operator.sample.schema.Schema; +import io.javaoperatorsdk.operator.sample.schema.SchemaService; + +import static java.lang.String.format; + +public class SchemaDependentResource + implements DependentResource, Builder, + Cleaner, Persister { + + private static final int POLL_PERIOD = 500; + private MySQLDbConfig dbConfig; + + @Override + public EventSource initEventSource(EventSourceContext context) { + dbConfig = context.getMandatory(MySQLSchemaReconciler.MYSQL_DB_CONFIG, MySQLDbConfig.class); + return new PerResourcePollingEventSource<>( + new SchemaPollingResourceSupplier(dbConfig), context.getPrimaryCache(), POLL_PERIOD, + Schema.class); + } + + @Override + public Schema buildFor(MySQLSchema primary, Context context) { + try (Connection connection = getConnection()) { + final var schema = SchemaService.createSchemaAndRelatedUser( + connection, + primary.getMetadata().getName(), + primary.getSpec().getEncoding(), + context.getMandatory(MySQLSchemaReconciler.MYSQL_SECRET_USERNAME, String.class), + context.getMandatory(MySQLSchemaReconciler.MYSQL_SECRET_PASSWORD, String.class)); + + // put the newly built schema in the context to let the reconciler know we just built it + context.put(MySQLSchemaReconciler.BUILT_SCHEMA, schema); + return schema; + } catch (SQLException e) { + MySQLSchemaReconciler.log.error("Error while creating Schema", e); + throw new IllegalStateException(e); + } + } + + private Connection getConnection() throws SQLException { + String connectURL = format("jdbc:mysql://%1$s:%2$s", dbConfig.getHost(), dbConfig.getPort()); + + MySQLSchemaReconciler.log.debug("Connecting to '{}' with user '{}'", connectURL, + dbConfig.getUser()); + return DriverManager.getConnection(connectURL, dbConfig.getUser(), dbConfig.getPassword()); + } + + @Override + public void delete(Schema fetched, MySQLSchema primary, Context context) { + try (Connection connection = getConnection()) { + var userName = primary.getStatus() != null ? primary.getStatus().getUserName() : null; + SchemaService.deleteSchemaAndRelatedUser(connection, primary.getMetadata().getName(), + userName); + } catch (SQLException e) { + throw new RuntimeException("Error while trying to delete Schema", e); + } + } + + @Override + public void createOrReplace(Schema dependentResource, Context context) { + // this is actually implemented in buildFor, the cleaner way to do this would be to have all + // the needed information in Schema instead of creating both the schema and user from + // heterogeneous information + } + + @Override + public Schema getFor(MySQLSchema primary, Context context) { + try (Connection connection = getConnection()) { + return SchemaService.getSchema(connection, primary.getMetadata().getName()).orElse(null); + } catch (SQLException e) { + throw new RuntimeException("Error while trying to delete Schema", e); + } + } +} diff --git a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/schema/SchemaService.java b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/schema/SchemaService.java index 8c6cd31b70..6db1c9b03b 100644 --- a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/schema/SchemaService.java +++ b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/schema/SchemaService.java @@ -29,7 +29,7 @@ public Optional getSchema(String name) { } } - public static void createSchemaAndRelatedUser(Connection connection, String schemaName, + public static Schema createSchemaAndRelatedUser(Connection connection, String schemaName, String encoding, String userName, String password) { @@ -49,6 +49,8 @@ public static void createSchemaAndRelatedUser(Connection connection, String sche statement.execute( format("GRANT ALL ON `%1$s`.* TO '%2$s'", schemaName, userName)); } + + return new Schema(schemaName, encoding); } catch (SQLException e) { throw new IllegalStateException(e); } diff --git a/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java b/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java index f4c529426b..31b5304028 100644 --- a/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java +++ b/sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java @@ -37,7 +37,7 @@ public void test() throws IOException { if ("true".equals(System.getenv("RUN_OPERATOR_IN_TEST"))) { Operator operator = new Operator(client, DefaultConfigurationService.instance()); MySQLDbConfig dbConfig = new MySQLDbConfig("mysql", null, "root", "password"); - operator.register(new MySQLSchemaReconciler(client, dbConfig)); + operator.register(new MySQLSchemaReconciler(dbConfig)); operator.start(); } diff --git a/sample-operators/pom.xml b/sample-operators/pom.xml index 9877aaef0d..e5819f32a8 100644 --- a/sample-operators/pom.xml +++ b/sample-operators/pom.xml @@ -7,7 +7,7 @@ io.javaoperatorsdk java-operator-sdk - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT sample-operators diff --git a/sample-operators/tomcat-operator/pom.xml b/sample-operators/tomcat-operator/pom.xml index ffcd7038e7..5c75031b16 100644 --- a/sample-operators/tomcat-operator/pom.xml +++ b/sample-operators/tomcat-operator/pom.xml @@ -7,7 +7,7 @@ io.javaoperatorsdk sample-operators - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT sample-tomcat-operator diff --git a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/DeploymentDependentResource.java b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/DeploymentDependentResource.java new file mode 100644 index 0000000000..245011a1cf --- /dev/null +++ b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/DeploymentDependentResource.java @@ -0,0 +1,52 @@ +package io.javaoperatorsdk.operator.sample; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.config.KubernetesDependent; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Builder; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Updater; + +@KubernetesDependent(labelSelector = "app.kubernetes.io/managed-by=tomcat-operator") +public class DeploymentDependentResource + implements DependentResource, Builder, + Updater { + + @Override + public Deployment buildFor(Tomcat tomcat, Context context) { + Deployment deployment = TomcatReconciler.loadYaml(Deployment.class, "deployment.yaml"); + final ObjectMeta tomcatMetadata = tomcat.getMetadata(); + final String tomcatName = tomcatMetadata.getName(); + deployment = new DeploymentBuilder(deployment) + .editMetadata() + .withName(tomcatName) + .withNamespace(tomcatMetadata.getNamespace()) + .addToLabels("app", tomcatName) + .addToLabels("app.kubernetes.io/part-of", tomcatName) + .addToLabels("app.kubernetes.io/managed-by", "tomcat-operator") + .endMetadata() + .editSpec() + .editSelector().addToMatchLabels("app", tomcatName).endSelector() + .withReplicas(tomcat.getSpec().getReplicas()) + // set tomcat version + .editTemplate() + // make sure label selector matches label (which has to be matched by service selector too) + .editMetadata().addToLabels("app", tomcatName).endMetadata() + .editSpec() + .editFirstContainer().withImage("tomcat:" + tomcat.getSpec().getVersion()).endContainer() + .endSpec() + .endTemplate() + .endSpec() + .build(); + return deployment; + } + + @Override + public Deployment update(Deployment fetched, Tomcat tomcat, Context context) { + return new DeploymentBuilder(fetched).editSpec().editTemplate().editSpec().editFirstContainer() + .withImage("tomcat:" + tomcat.getSpec().getVersion()) + .endContainer().endSpec().endTemplate().endSpec().build(); + } +} diff --git a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/ServiceDependentResource.java b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/ServiceDependentResource.java new file mode 100644 index 0000000000..b90bd0e6e2 --- /dev/null +++ b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/ServiceDependentResource.java @@ -0,0 +1,28 @@ +package io.javaoperatorsdk.operator.sample; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Builder; + +public class ServiceDependentResource + implements DependentResource, Builder { + + @Override + public Service buildFor(Tomcat tomcat, Context context) { + final ObjectMeta tomcatMetadata = tomcat.getMetadata(); + final Service service = + new ServiceBuilder(TomcatReconciler.loadYaml(Service.class, "service.yaml")) + .editMetadata() + .withName(tomcatMetadata.getName()) + .withNamespace(tomcatMetadata.getNamespace()) + .endMetadata() + .editSpec() + .addToSelector("app", tomcatMetadata.getName()) + .endSpec() + .build(); + return service; + } +} diff --git a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatDependentResource.java b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatDependentResource.java new file mode 100644 index 0000000000..9dd49466ab --- /dev/null +++ b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatDependentResource.java @@ -0,0 +1,41 @@ +package io.javaoperatorsdk.operator.sample; + +import java.util.Set; +import java.util.stream.Collectors; + +import io.javaoperatorsdk.operator.api.config.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.AssociatedSecondaryResourceIdentifier; +import io.javaoperatorsdk.operator.processing.event.source.EventSourceContextAware; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; + +public class TomcatDependentResource + implements DependentResource, PrimaryResourcesRetriever, + AssociatedSecondaryResourceIdentifier, EventSourceContextAware { + + private ResourceCache primaryCache; + + @Override + public void initWith(EventSourceContext context) { + this.primaryCache = context.getPrimaryCache(); + } + + @Override + public Set associatedPrimaryResources(Tomcat t) { + // To create an event to a related WebApp resource and trigger the reconciliation + // we need to find which WebApp this Tomcat custom resource is related to. + // To find the related customResourceId of the WebApp resource we traverse the cache to + // and identify it based on naming convention. + return primaryCache + .list(webApp -> webApp.getSpec().getTomcat().equals(t.getMetadata().getName())) + .map(ResourceID::fromResource) + .collect(Collectors.toSet()); + } + + @Override + public ResourceID associatedSecondaryID(Webapp primary) { + return new ResourceID(primary.getSpec().getTomcat(), primary.getMetadata().getNamespace()); + } +} diff --git a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatOperator.java b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatOperator.java index 487183dfe5..09a7394e5b 100644 --- a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatOperator.java +++ b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatOperator.java @@ -25,7 +25,7 @@ public static void main(String[] args) throws IOException { Config config = new ConfigBuilder().withNamespace(null).build(); KubernetesClient client = new DefaultKubernetesClient(config); Operator operator = new Operator(client, DefaultConfigurationService.instance()); - operator.register(new TomcatReconciler(client)); + operator.register(new TomcatReconciler()); operator.register(new WebappReconciler(client)); operator.installShutdownHook(); operator.start(); diff --git a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatReconciler.java b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatReconciler.java index bb67128f9e..32f7927f44 100644 --- a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatReconciler.java +++ b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/TomcatReconciler.java @@ -2,28 +2,20 @@ import java.io.IOException; import java.io.InputStream; -import java.util.List; import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.utils.Serialization; +import io.javaoperatorsdk.operator.api.config.Dependent; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_FINALIZER; @@ -31,45 +23,27 @@ * Runs a specified number of Tomcat app server Pods. It uses a Deployment to create the Pods. Also * creates a Service over which the Pods can be accessed. */ -@ControllerConfiguration(finalizerName = NO_FINALIZER) -public class TomcatReconciler implements Reconciler, EventSourceInitializer { +@ControllerConfiguration( + finalizerName = NO_FINALIZER, + dependents = { + @Dependent(resourceType = Deployment.class, type = DeploymentDependentResource.class), + @Dependent(resourceType = Service.class, type = ServiceDependentResource.class) + }) +public class TomcatReconciler implements Reconciler { private final Logger log = LoggerFactory.getLogger(getClass()); - private final KubernetesClient kubernetesClient; - - public TomcatReconciler(KubernetesClient client) { - this.kubernetesClient = client; - } - - @Override - public List prepareEventSources(EventSourceContext context) { - SharedIndexInformer deploymentInformer = - kubernetesClient.apps().deployments().inAnyNamespace() - .withLabel("app.kubernetes.io/managed-by", "tomcat-operator") - .runnableInformer(0); - - return List.of(new InformerEventSource<>( - deploymentInformer, Mappers.fromOwnerReference())); - } - @Override public UpdateControl reconcile(Tomcat tomcat, Context context) { - createOrUpdateDeployment(tomcat); - createOrUpdateService(tomcat); - - return context.getSecondaryResource(Deployment.class) - .map(deployment -> { - Tomcat updatedTomcat = - updateTomcatStatus(tomcat, deployment); - log.info( - "Updating status of Tomcat {} in namespace {} to {} ready replicas", - tomcat.getMetadata().getName(), - tomcat.getMetadata().getNamespace(), - tomcat.getStatus().getReadyReplicas()); - return UpdateControl.updateStatus(updatedTomcat); - }) - .orElse(UpdateControl.noUpdate()); + return context.getSecondaryResource(Deployment.class).map(deployment -> { + Tomcat updatedTomcat = updateTomcatStatus(tomcat, deployment); + log.info( + "Updating status of Tomcat {} in namespace {} to {} ready replicas", + tomcat.getMetadata().getName(), + tomcat.getMetadata().getNamespace(), + tomcat.getStatus().getReadyReplicas()); + return UpdateControl.updateStatus(updatedTomcat); + }).orElse(UpdateControl.noUpdate()); } private Tomcat updateTomcatStatus(Tomcat tomcat, Deployment deployment) { @@ -82,78 +56,8 @@ private Tomcat updateTomcatStatus(Tomcat tomcat, Deployment deployment) { return tomcat; } - private void createOrUpdateDeployment(Tomcat tomcat) { - String ns = tomcat.getMetadata().getNamespace(); - Deployment existingDeployment = - kubernetesClient - .apps() - .deployments() - .inNamespace(ns) - .withName(tomcat.getMetadata().getName()) - .get(); - if (existingDeployment == null) { - Deployment deployment = loadYaml(Deployment.class, "deployment.yaml"); - deployment.getMetadata().setName(tomcat.getMetadata().getName()); - deployment.getMetadata().setNamespace(ns); - deployment.getMetadata().getLabels().put("app.kubernetes.io/part-of", - tomcat.getMetadata().getName()); - deployment.getMetadata().getLabels().put("app.kubernetes.io/managed-by", "tomcat-operator"); - // set tomcat version - deployment - .getSpec() - .getTemplate() - .getSpec() - .getContainers() - .get(0) - .setImage("tomcat:" + tomcat.getSpec().getVersion()); - deployment.getSpec().setReplicas(tomcat.getSpec().getReplicas()); - - // make sure label selector matches label (which has to be matched by service selector too) - deployment - .getSpec() - .getTemplate() - .getMetadata() - .getLabels() - .put("app", tomcat.getMetadata().getName()); - deployment - .getSpec() - .getSelector() - .getMatchLabels() - .put("app", tomcat.getMetadata().getName()); - - OwnerReference ownerReference = deployment.getMetadata().getOwnerReferences().get(0); - ownerReference.setName(tomcat.getMetadata().getName()); - ownerReference.setUid(tomcat.getMetadata().getUid()); - - log.info("Creating or updating Deployment {} in {}", deployment.getMetadata().getName(), ns); - kubernetesClient.apps().deployments().inNamespace(ns).create(deployment); - } else { - existingDeployment - .getSpec() - .getTemplate() - .getSpec() - .getContainers() - .get(0) - .setImage("tomcat:" + tomcat.getSpec().getVersion()); - existingDeployment.getSpec().setReplicas(tomcat.getSpec().getReplicas()); - kubernetesClient.apps().deployments().inNamespace(ns).createOrReplace(existingDeployment); - } - } - - private void createOrUpdateService(Tomcat tomcat) { - Service service = loadYaml(Service.class, "service.yaml"); - service.getMetadata().setName(tomcat.getMetadata().getName()); - String ns = tomcat.getMetadata().getNamespace(); - service.getMetadata().setNamespace(ns); - service.getMetadata().getOwnerReferences().get(0).setName(tomcat.getMetadata().getName()); - service.getMetadata().getOwnerReferences().get(0).setUid(tomcat.getMetadata().getUid()); - service.getSpec().getSelector().put("app", tomcat.getMetadata().getName()); - log.info("Creating or updating Service {} in {}", service.getMetadata().getName(), ns); - kubernetesClient.services().inNamespace(ns).createOrReplace(service); - } - - private T loadYaml(Class clazz, String yaml) { - try (InputStream is = getClass().getResourceAsStream(yaml)) { + static T loadYaml(Class clazz, String yaml) { + try (InputStream is = TomcatReconciler.class.getResourceAsStream(yaml)) { return Serialization.unmarshal(is, clazz); } catch (IOException ex) { throw new IllegalStateException("Cannot find yaml on classpath: " + yaml); diff --git a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/WebappReconciler.java b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/WebappReconciler.java index 0f785feb02..9e1f6f677f 100644 --- a/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/WebappReconciler.java +++ b/sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/WebappReconciler.java @@ -25,15 +25,18 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.AssociatedSecondaryResourceIdentifier; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerConfiguration; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; @ControllerConfiguration public class WebappReconciler implements Reconciler, EventSourceInitializer { - private KubernetesClient kubernetesClient; + private static final Logger log = LoggerFactory.getLogger(WebappReconciler.class); - private final Logger log = LoggerFactory.getLogger(getClass()); + private final KubernetesClient kubernetesClient; public WebappReconciler(KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClient; @@ -41,20 +44,32 @@ public WebappReconciler(KubernetesClient kubernetesClient) { @Override public List prepareEventSources(EventSourceContext context) { - return List.of(new InformerEventSource<>( - kubernetesClient, Tomcat.class, t -> { - // To create an event to a related WebApp resource and trigger the reconciliation - // we need to find which WebApp this Tomcat custom resource is related to. - // To find the related customResourceId of the WebApp resource we traverse the cache to - // and identify it based on naming convention. - return context.getPrimaryCache() - .list(webApp -> webApp.getSpec().getTomcat().equals(t.getMetadata().getName())) - .map(ResourceID::fromResource) - .collect(Collectors.toSet()); - }, - (Webapp webapp) -> new ResourceID(webapp.getSpec().getTomcat(), - webapp.getMetadata().getNamespace()), - true)); + /* + * To create an event to a related WebApp resource and trigger the reconciliation we need to + * find which WebApp this Tomcat custom resource is related to. To find the related + * customResourceId of the WebApp resource we traverse the cache and identify it based on naming + * convention. + */ + final PrimaryResourcesRetriever webappsMatchingTomcatName = + (Tomcat t) -> context.getPrimaryCache() + .list(webApp -> webApp.getSpec().getTomcat().equals(t.getMetadata().getName())) + .map(ResourceID::fromResource) + .collect(Collectors.toSet()); + + /* + * We retrieve the Tomcat instance associated with out Webapp from its spec + */ + final AssociatedSecondaryResourceIdentifier tomcatFromWebAppSpec = + (Webapp webapp) -> new ResourceID( + webapp.getSpec().getTomcat(), + webapp.getMetadata().getNamespace()); + + InformerConfiguration configuration = + InformerConfiguration.from(context, Tomcat.class) + .withPrimaryResourcesRetriever(webappsMatchingTomcatName) + .withAssociatedSecondaryResourceIdentifier(tomcatFromWebAppSpec) + .build(); + return List.of(new InformerEventSource<>(configuration, context)); } /** @@ -144,7 +159,7 @@ private String[] executeCommandInAllPods( CompletableFuture data = new CompletableFuture<>(); try (ExecWatch execWatch = execCmd(pod, data, command)) { - status[i] = "" + pod.getMetadata().getName() + ":" + data.get(30, TimeUnit.SECONDS);; + status[i] = "" + pod.getMetadata().getName() + ":" + data.get(30, TimeUnit.SECONDS); } catch (ExecutionException e) { status[i] = "" + pod.getMetadata().getName() + ": ExecutionException - " + e.getMessage(); } catch (InterruptedException e) { @@ -194,7 +209,7 @@ public void onFailure(Throwable t, Response response) { @Override public void onClose(int code, String reason) { - log.debug("Exit with: " + code + " and with reason: " + reason); + log.debug("Exit with: {} and with reason: {}", code, reason); data.complete(baos.toString()); } } diff --git a/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/deployment.yaml b/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/deployment.yaml index 55d1a6be7d..aa38eb3619 100644 --- a/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/deployment.yaml +++ b/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/deployment.yaml @@ -5,11 +5,6 @@ metadata: labels: app.kubernetes.io/part-of: "" app.kubernetes.io/managed-by: "" # used for filtering of Deployments created by the controller - ownerReferences: # used for finding which Tomcat does this Deployment belong to - - apiVersion: apps/v1 - kind: Tomcat - name: "" - uid: "" spec: selector: matchLabels: diff --git a/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/service.yaml b/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/service.yaml index a807d277a7..ab198643ed 100644 --- a/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/service.yaml +++ b/sample-operators/tomcat-operator/src/main/resources/io/javaoperatorsdk/operator/sample/service.yaml @@ -2,11 +2,6 @@ apiVersion: v1 kind: Service metadata: name: "" - ownerReferences: # used for finding which Tomcat does this Deployment belong to - - apiVersion: apps/v1 - kind: Tomcat - name: "" - uid: "" spec: selector: app: "" diff --git a/sample-operators/tomcat-operator/src/test/java/io/javaoperatorsdk/operator/sample/TomcatOperatorE2E.java b/sample-operators/tomcat-operator/src/test/java/io/javaoperatorsdk/operator/sample/TomcatOperatorE2E.java index e803b70aba..9f80f31c5b 100644 --- a/sample-operators/tomcat-operator/src/test/java/io/javaoperatorsdk/operator/sample/TomcatOperatorE2E.java +++ b/sample-operators/tomcat-operator/src/test/java/io/javaoperatorsdk/operator/sample/TomcatOperatorE2E.java @@ -33,7 +33,7 @@ public void test() { // Use this if you want to run the test without deploying the Operator to Kubernetes if ("true".equals(System.getenv("RUN_OPERATOR_IN_TEST"))) { Operator operator = new Operator(client, DefaultConfigurationService.instance()); - operator.register(new TomcatReconciler(client)); + operator.register(new TomcatReconciler()); operator.register(new WebappReconciler(client)); operator.start(); } diff --git a/sample-operators/webpage/pom.xml b/sample-operators/webpage/pom.xml index 90522733af..94e4dbcb83 100644 --- a/sample-operators/webpage/pom.xml +++ b/sample-operators/webpage/pom.xml @@ -7,7 +7,7 @@ io.javaoperatorsdk sample-operators - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT webpage @@ -25,7 +25,6 @@ io.javaoperatorsdk operator-framework - 2.0.3-SNAPSHOT org.apache.logging.log4j diff --git a/smoke-test-samples/common/pom.xml b/smoke-test-samples/common/pom.xml index db2926c74b..96cd79f960 100644 --- a/smoke-test-samples/common/pom.xml +++ b/smoke-test-samples/common/pom.xml @@ -6,7 +6,7 @@ io.javaoperatorsdk java-operator-sdk-smoke-test-samples - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT operator-framework-smoke-test-samples-common diff --git a/smoke-test-samples/pom.xml b/smoke-test-samples/pom.xml index 3dcdb530c7..02aae34d54 100644 --- a/smoke-test-samples/pom.xml +++ b/smoke-test-samples/pom.xml @@ -6,7 +6,7 @@ io.javaoperatorsdk java-operator-sdk - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT java-operator-sdk-smoke-test-samples diff --git a/smoke-test-samples/pure-java/pom.xml b/smoke-test-samples/pure-java/pom.xml index 87ccbbf48d..0b26c7de42 100644 --- a/smoke-test-samples/pure-java/pom.xml +++ b/smoke-test-samples/pure-java/pom.xml @@ -6,7 +6,7 @@ io.javaoperatorsdk java-operator-sdk-smoke-test-samples - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT operator-framework-smoke-test-samples-pure-java diff --git a/smoke-test-samples/spring-boot-plain/pom.xml b/smoke-test-samples/spring-boot-plain/pom.xml index b652916960..c41e3c1cfc 100644 --- a/smoke-test-samples/spring-boot-plain/pom.xml +++ b/smoke-test-samples/spring-boot-plain/pom.xml @@ -6,7 +6,7 @@ io.javaoperatorsdk java-operator-sdk-smoke-test-samples - 2.0.3-SNAPSHOT + 2.1.0-SNAPSHOT operator-framework-smoke-test-samples-spring-boot