Skip to content

refactor: rename internal package to source, moving LifecycleAware #716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 26, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Version;
import io.javaoperatorsdk.operator.api.LifecycleAware;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;

@SuppressWarnings("rawtypes")
public class Operator implements AutoCloseable, LifecycleAware {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.ControllerUtils;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceEventFilters;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilters;

public interface ControllerConfiguration<R extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;

public class ControllerConfigurationOverrider<R extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;

public class DefaultControllerConfiguration<R extends HasMetadata>
implements ControllerConfiguration<R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.javaoperatorsdk.operator.processing.event.internal.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.javaoperatorsdk.operator.api.reconciler;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.EventSourceRegistry;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;

public interface EventSourceInitializer<T extends HasMetadata> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.javaoperatorsdk.operator.CustomResourceUtils;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.LifecycleAware;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.monitoring.Metrics.ControllerExecution;
import io.javaoperatorsdk.operator.api.reconciler.Context;
Expand All @@ -20,15 +19,14 @@
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.EventSourceRegistry;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;

public class Controller<R extends HasMetadata> implements Reconciler<R>,
LifecycleAware, EventSourceInitializer<R> {
private final Reconciler<R> reconciler;
private final ControllerConfiguration<R> configuration;
private final KubernetesClient kubernetesClient;
private EventSourceManager<R> eventSourceManager;
private EventProcessor<R> eventProcessor;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea as desribed here is that the controller maneges the lifecycle of every other component top level component:
#655

So it is the highest level agregate, therefore will be no multiple layers for managing LifecycleAware

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that it doesn't really do anything with it, it's just needed by the event manager so I don't think we should expose it since the lifecycle of the EventProcessor is tied to the EventManager (you cannot have one without the other). Making EventManager handle the EventProcessor life cycle also allows to avoid the rather ugly setters and prevent forgetting to start/stop one or the other.
If the Controller was using the EventProcessor directly, it would make sense to keep there. At the very least, if we wanted to keep the top-level aggregate concept, the EventManager should retrieve the EventProcessor from the Controller (or vice-versa) and that's actually what I tried doing yesterday without complicating things even more…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in general the Controller does not do anything just instantiates and starts/stops the major components. The circular dependency is ugly, but the EventManager is just a class to encapsulate a list of EventSources. For me that it creates a central component, probably the most complex part is quite a smell.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except that EventSourceManager and EventProcessor are not really separate components. Again, one cannot exist without the other. If we want to keep them separate, we should make it as cleanly as possible and I think the current solution is cleaner because it avoids the mess of having incomplete objects for no good reason. Maybe we should combine EventSourceManager and EventProcessor, renaming them to EventManager?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with that is even now the EventProcessor is too big, with bunch of logic, what little bugs me. Actually wanted to somehow break it down to more components.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed quite complex, but some things are 😄
It doesn't make sense to split things that are not coherent if we end up with more complexity to manage the different pieces. I could see the TimerEventSource moving back to EventSourceManager but I really think that we should tie the lifecycle of EventSourceManager and EventProcessor by making EventSourceManager handle the processor's lifecycle because, again, the manager cannot work without the processor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the event source manager will handle the lifecycle of event processor?
ok that is fine by me, we can evaluate further later.


public Controller(Reconciler<R> reconciler,
ControllerConfiguration<R> configuration,
Expand Down Expand Up @@ -170,10 +168,6 @@ public void start() throws OperatorException {
}

eventSourceManager = new EventSourceManager<>(this);
eventProcessor =
new EventProcessor<>(this, eventSourceManager.getControllerResourceEventSource());
eventProcessor.setEventSourceManager(eventSourceManager);
eventSourceManager.setEventProcessor(eventProcessor);
if (reconciler instanceof EventSourceInitializer) {
((EventSourceInitializer<R>) reconciler).prepareEventSources(eventSourceManager);
}
Expand All @@ -183,7 +177,6 @@ public void start() throws OperatorException {
+ controllerName
+ "' is configured to watch the current namespace but it couldn't be inferred from the current configuration.");
}
eventProcessor.start();
eventSourceManager.start();
} catch (MissingCRDException e) {
throwMissingCRDException(crdName, specVersion, controllerName);
Expand Down Expand Up @@ -223,8 +216,5 @@ public void stop() {
if (eventSourceManager != null) {
eventSourceManager.stop();
}
if (eventProcessor != null) {
eventProcessor.stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.javaoperatorsdk.operator.api;
package io.javaoperatorsdk.operator.processing;

import io.javaoperatorsdk.operator.OperatorException;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.javaoperatorsdk.operator.processing;
package io.javaoperatorsdk.operator.processing.event;

import java.util.HashMap;

import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

/**
* Manages the state of received events. Basically there can be only three distinct states relevant
* for event processing. Either an event is received, so we eventually process or no event for
Expand All @@ -13,7 +10,7 @@
* events are irrelevant for us from this point. Note that the dependant resources are either
* cleaned up by K8S garbage collection or by the controller implementation for cleanup.
*/
public class EventMarker {
class EventMarker {

public enum EventingState {
/** Event but NOT Delete event present */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.javaoperatorsdk.operator.processing;
package io.javaoperatorsdk.operator.processing.event;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -14,17 +14,16 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.LifecycleAware;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceEvent;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
Expand All @@ -36,8 +35,7 @@
* Event handler that makes sure that events are processed in a "single threaded" way per resource
* UID, while buffering events which are received during an execution.
*/
public class EventProcessor<R extends HasMetadata>
implements EventHandler, LifecycleAware {
class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware {

private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);

Expand All @@ -51,32 +49,34 @@ public class EventProcessor<R extends HasMetadata>
private final Metrics metrics;
private volatile boolean running;
private final ResourceCache<R> resourceCache;
private EventSourceManager<R> eventSourceManager;
private final EventSourceManager<R> eventSourceManager;
private final EventMarker eventMarker;

public EventProcessor(Controller<R> controller, ResourceCache<R> resourceCache) {
EventProcessor(EventSourceManager<R> eventSourceManager) {
this(
resourceCache,
eventSourceManager.getControllerResourceEventSource(),
ExecutorServiceManager.instance().executorService(),
controller.getConfiguration().getName(),
new ReconciliationDispatcher<>(controller),
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()),
controller.getConfiguration().getConfigurationService().getMetrics(),
new EventMarker());
eventSourceManager.getController().getConfiguration().getName(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
GenericRetry.fromConfiguration(
eventSourceManager.getController().getConfiguration().getRetryConfiguration()),
eventSourceManager.getController().getConfiguration().getConfigurationService()
.getMetrics(),
eventSourceManager);
}

EventProcessor(ReconciliationDispatcher<R> reconciliationDispatcher,
ResourceCache<R> resourceCache,
EventSourceManager<R> eventSourceManager,
String relatedControllerName,
Retry retry, EventMarker eventMarker) {
this(resourceCache, null, relatedControllerName, reconciliationDispatcher, retry, null,
eventMarker);
Retry retry) {
this(eventSourceManager.getControllerResourceEventSource(), null, relatedControllerName,
reconciliationDispatcher, retry, null, eventSourceManager);
}

private EventProcessor(ResourceCache<R> resourceCache, ExecutorService executor,
String relatedControllerName,
ReconciliationDispatcher<R> reconciliationDispatcher, Retry retry, Metrics metrics,
EventMarker eventMarker) {
EventSourceManager<R> eventSourceManager) {
this.running = true;
this.executor =
executor == null
Expand All @@ -88,11 +88,12 @@ private EventProcessor(ResourceCache<R> resourceCache, ExecutorService executor,
this.retry = retry;
this.resourceCache = resourceCache;
this.metrics = metrics != null ? metrics : Metrics.NOOP;
this.eventMarker = eventMarker;
this.eventMarker = new EventMarker();
this.eventSourceManager = eventSourceManager;
}

public void setEventSourceManager(EventSourceManager<R> eventSourceManager) {
this.eventSourceManager = eventSourceManager;
EventMarker getEventMarker() {
return eventMarker;
}

@Override
Expand Down Expand Up @@ -243,9 +244,12 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> execution

private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecutionControl,
R customResource) {
postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager
.getRetryAndRescheduleTimerEventSource()
.scheduleOnce(customResource, delay));
postExecutionControl.getReScheduleDelay()
.ifPresent(delay -> retryEventSource().scheduleOnce(customResource, delay));
}

TimerEventSource<R> retryEventSource() {
return eventSourceManager.retryEventSource();
}

/**
Expand Down Expand Up @@ -275,9 +279,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope,
delay,
customResourceID);
metrics.failedReconciliation(customResourceID, exception);
eventSourceManager
.getRetryAndRescheduleTimerEventSource()
.scheduleOnce(executionScope.getResource(), delay);
retryEventSource().scheduleOnce(executionScope.getResource(), delay);
},
() -> log.error("Exhausted retries for {}", executionScope));
}
Expand All @@ -289,9 +291,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
if (isRetryConfigured()) {
retryState.remove(executionScope.getCustomResourceID());
}
eventSourceManager
.getRetryAndRescheduleTimerEventSource()
.cancelOnceSchedule(executionScope.getCustomResourceID());
retryEventSource().cancelOnceSchedule(executionScope.getCustomResourceID());
}

private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.LifecycleAware;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.EventProcessor;
import io.javaoperatorsdk.operator.processing.event.internal.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;

public class EventSourceManager<R extends HasMetadata>
implements EventSourceRegistry<R>, LifecycleAware {
Expand All @@ -25,38 +26,33 @@ public class EventSourceManager<R extends HasMetadata>

private final ReentrantLock lock = new ReentrantLock();
private final Set<EventSource> eventSources = Collections.synchronizedSet(new HashSet<>());
private EventProcessor<R> eventProcessor;
private final EventProcessor<R> eventProcessor;
private TimerEventSource<R> retryAndRescheduleTimerEventSource;
private ControllerResourceEventSource<R> controllerResourceEventSource;
private final Controller<R> controller;

EventSourceManager() {
init();
EventSourceManager(EventProcessor<R> eventProcessor) {
this.eventProcessor = eventProcessor;
controller = null;
initRetryEventSource();
}

public EventSourceManager(Controller<R> controller) {
init();
this.controller = controller;
controllerResourceEventSource = new ControllerResourceEventSource<>(controller);
this.eventProcessor = new EventProcessor<>(this);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event source manager should not instantiate the processor. I that the controller manages the lifecycle is much cleaner. This is what was moved out as part of the design changes.

Copy link
Collaborator

@metacosm metacosm Nov 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make things cleaner: look at how ugly the setEventHandler / setEventManager mess is. You cannot use the EventProcessor without the EventManager and vice-versa. They should be created complete and not half way having to use a setter to inject the missing part after they're built, imo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except for those :D

registerEventSource(controllerResourceEventSource);
initRetryEventSource();
}

private void init() {
this.retryAndRescheduleTimerEventSource = new TimerEventSource<>();
private void initRetryEventSource() {
retryAndRescheduleTimerEventSource = new TimerEventSource<>();
registerEventSource(retryAndRescheduleTimerEventSource);
}

public EventSourceManager<R> setEventProcessor(EventProcessor<R> eventProcessor) {
this.eventProcessor = eventProcessor;
if (controllerResourceEventSource != null) {
controllerResourceEventSource.setEventHandler(eventProcessor);
}
if (retryAndRescheduleTimerEventSource != null) {
retryAndRescheduleTimerEventSource.setEventHandler(eventProcessor);
}
return this;
}

@Override
public void start() throws OperatorException {
eventProcessor.start();
lock.lock();
try {
log.debug("Starting event sources.");
Expand Down Expand Up @@ -88,6 +84,7 @@ public void stop() {
} finally {
lock.unlock();
}
eventProcessor.stop();
}

@Override
Expand Down Expand Up @@ -121,10 +118,6 @@ public void cleanupForCustomResource(ResourceID customResourceUid) {
}
}

public TimerEventSource<R> getRetryAndRescheduleTimerEventSource() {
return retryAndRescheduleTimerEventSource;
}

@Override
public Set<EventSource> getRegisteredEventSources() {
return Collections.unmodifiableSet(eventSources);
Expand All @@ -135,4 +128,11 @@ public ControllerResourceEventSource<R> getControllerResourceEventSource() {
return controllerResourceEventSource;
}

TimerEventSource<R> retryEventSource() {
return retryAndRescheduleTimerEventSource;
}

Controller<R> getController() {
return controller;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.javaoperatorsdk.operator.processing;
package io.javaoperatorsdk.operator.processing.event;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class ExecutionScope<R extends HasMetadata> {
class ExecutionScope<R extends HasMetadata> {

// the latest custom resource from cache
private final R resource;
Expand Down
Loading