Skip to content

feat: non static reference of services #1865

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
import java.util.HashSet;
import java.util.Set;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand All @@ -23,26 +20,21 @@

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractMicrometerMetricsTestFixture {
@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder().withReconciler(new MetricsCleaningTestReconciler())
.build();

protected final TestSimpleMeterRegistry registry = new TestSimpleMeterRegistry();
protected final MicrometerMetrics metrics = getMetrics();
protected static final String testResourceName = "micrometer-metrics-cr";

protected abstract MicrometerMetrics getMetrics();

@BeforeAll
void setup() {
ConfigurationServiceProvider.overrideCurrent(overrider -> overrider.withMetrics(metrics));
}
@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withConfigurationService(overrider -> overrider.withMetrics(metrics))
.withReconciler(new MetricsCleaningTestReconciler())
.build();

@AfterAll
void reset() {
ConfigurationServiceProvider.reset();
}

protected abstract MicrometerMetrics getMetrics();

@Test
void properlyHandlesResourceDeletion() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ class ControllerManager {
@SuppressWarnings("rawtypes")
private final Map<String, Controller> controllers = new HashMap<>();
private boolean started = false;
private final ExecutorServiceManager executorServiceManager;

public ControllerManager(ExecutorServiceManager executorServiceManager) {
this.executorServiceManager = executorServiceManager;
}


public synchronized void shouldStart() {
if (started) {
Expand All @@ -33,15 +39,15 @@ public synchronized void shouldStart() {
}

public synchronized void start(boolean startEventProcessor) {
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
executorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
c.start(startEventProcessor);
return null;
}, c -> "Controller Starter for: " + c.getConfiguration().getName());
started = true;
}

public synchronized void stop() {
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
executorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
log.debug("closing {}", c);
c.stop();
return null;
Expand All @@ -50,7 +56,7 @@ public synchronized void stop() {
}

public synchronized void startEventProcessing() {
ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
executorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> {
c.startEventProcessing();
return null;
}, c -> "Event processor starter for: " + c.getConfiguration().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;

public class LeaderElectionManager {
Expand All @@ -24,41 +23,19 @@ public class LeaderElectionManager {
private final ControllerManager controllerManager;
private String identity;
private CompletableFuture<?> leaderElectionFuture;
private final ConfigurationService configurationService;
private final KubernetesClient kubernetesClient;

public LeaderElectionManager(ControllerManager controllerManager) {
public LeaderElectionManager(KubernetesClient kubernetesClient,
ControllerManager controllerManager,
ConfigurationService configurationService) {
this.kubernetesClient = kubernetesClient;
this.controllerManager = controllerManager;
}

public void init(LeaderElectionConfiguration config, KubernetesClient client) {
this.identity = identity(config);
final var leaseNamespace =
config.getLeaseNamespace().orElseGet(
() -> ConfigurationServiceProvider.instance().getClientConfiguration().getNamespace());
if (leaseNamespace == null) {
final var message =
"Lease namespace is not set and cannot be inferred. Leader election cannot continue.";
log.error(message);
throw new IllegalArgumentException(message);
}
final var lock = new LeaseLock(leaseNamespace, config.getLeaseName(), identity);
// releaseOnCancel is not used in the underlying implementation
leaderElector =
new LeaderElectorBuilder(
client, ExecutorServiceManager.instance().executorService())
.withConfig(
new LeaderElectionConfig(
lock,
config.getLeaseDuration(),
config.getRenewDeadline(),
config.getRetryPeriod(),
leaderCallbacks(),
true,
config.getLeaseName()))
.build();
this.configurationService = configurationService;
}

public boolean isLeaderElectionEnabled() {
return leaderElector != null;
return configurationService.getLeaderElectionConfiguration().isPresent();
}

private LeaderCallbacks leaderCallbacks() {
Expand Down Expand Up @@ -90,6 +67,7 @@ private String identity(LeaderElectionConfiguration config) {

public void start() {
if (isLeaderElectionEnabled()) {
init(configurationService.getLeaderElectionConfiguration().orElseThrow());
leaderElectionFuture = leaderElector.start();
}
}
Expand All @@ -99,4 +77,31 @@ public void stop() {
leaderElectionFuture.cancel(false);
}
}

private void init(LeaderElectionConfiguration config) {
this.identity = identity(config);
final var leaseNamespace =
config.getLeaseNamespace().orElseGet(
() -> configurationService.getClientConfiguration().getNamespace());
if (leaseNamespace == null) {
final var message =
"Lease namespace is not set and cannot be inferred. Leader election cannot continue.";
log.error(message);
throw new IllegalArgumentException(message);
}
final var lock = new LeaseLock(leaseNamespace, config.getLeaseName(), identity);
// releaseOnCancel is not used in the underlying implementation
leaderElector = new LeaderElectorBuilder(
kubernetesClient, configurationService.getExecutorServiceManager().cachingExecutorService())
.withConfig(
new LeaderElectionConfig(
lock,
config.getLeaseDuration(),
config.getRenewDeadline(),
config.getRetryPeriod(),
leaderCallbacks(),
true,
config.getLeaseName()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.Version;
import io.javaoperatorsdk.operator.api.config.BaseConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
Expand All @@ -29,17 +28,18 @@ public class Operator implements LifecycleAware {
private static final Logger log = LoggerFactory.getLogger(Operator.class);
private static final int DEFAULT_MAX_CONCURRENT_REQUEST = 512;
private final KubernetesClient kubernetesClient;
private final ControllerManager controllerManager = new ControllerManager();
private final LeaderElectionManager leaderElectionManager =
new LeaderElectionManager(controllerManager);
private final ControllerManager controllerManager;
private final LeaderElectionManager leaderElectionManager;
private final ConfigurationService configurationService;
private volatile boolean started = false;


public Operator() {
this((KubernetesClient) null);
}

public Operator(KubernetesClient kubernetesClient) {
this(kubernetesClient, ConfigurationServiceProvider.instance());
this(kubernetesClient, new BaseConfigurationService());
}

/**
Expand All @@ -56,7 +56,8 @@ public Operator(Consumer<ConfigurationServiceOverrider> overrider) {
}

public Operator(KubernetesClient client, Consumer<ConfigurationServiceOverrider> overrider) {
this(client, ConfigurationServiceProvider.overrideCurrent(overrider));
this(client, ConfigurationService
.newOverriddenConfigurationService(new BaseConfigurationService(), overrider));
}

/**
Expand All @@ -67,15 +68,19 @@ public Operator(KubernetesClient client, Consumer<ConfigurationServiceOverrider>
* @param configurationService provides configuration
*/
public Operator(KubernetesClient kubernetesClient, ConfigurationService configurationService) {
this.configurationService = configurationService;
final var executorServiceManager = configurationService.getExecutorServiceManager();
controllerManager = new ControllerManager(executorServiceManager);
this.kubernetesClient =
kubernetesClient != null ? kubernetesClient
: new KubernetesClientBuilder()
.withConfig(new ConfigBuilder()
.withMaxConcurrentRequests(DEFAULT_MAX_CONCURRENT_REQUEST).build())
.build();
ConfigurationServiceProvider.set(configurationService);
configurationService.getLeaderElectionConfiguration()
.ifPresent(c -> leaderElectionManager.init(c, this.kubernetesClient));


leaderElectionManager =
new LeaderElectionManager(kubernetesClient, controllerManager, configurationService);
}

/**
Expand All @@ -86,8 +91,7 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur
*/
@Deprecated(forRemoval = true)
public void installShutdownHook() {
installShutdownHook(
Duration.ofSeconds(ConfigurationServiceProvider.instance().getTerminationTimeoutSeconds()));
installShutdownHook(Duration.ofSeconds(configurationService.getTerminationTimeoutSeconds()));
}

/**
Expand Down Expand Up @@ -123,9 +127,8 @@ public synchronized void start() {
if (started) {
return;
}
ExecutorServiceManager.init();
controllerManager.shouldStart();
final var version = ConfigurationServiceProvider.instance().getVersion();
final var version = configurationService.getVersion();
log.info(
"Operator SDK {} (commit: {}) built on {} starting...",
version.getSdkVersion(),
Expand All @@ -149,12 +152,11 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
if (!started) {
return;
}
final var configurationService = ConfigurationServiceProvider.instance();
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
controllerManager.stop();

ExecutorServiceManager.stop(gracefulShutdownTimeout);
configurationService.getExecutorServiceManager().stop(gracefulShutdownTimeout);
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
kubernetesClient.close();
Expand All @@ -179,8 +181,7 @@ public void stop() throws OperatorException {
*/
public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> reconciler)
throws OperatorException {
final var controllerConfiguration =
ConfigurationServiceProvider.instance().getConfigurationFor(reconciler);
final var controllerConfiguration = configurationService.getConfigurationFor(reconciler);
return register(reconciler, controllerConfiguration);
}

Expand Down Expand Up @@ -210,7 +211,7 @@ public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> re
" reconciler named " + ReconcilerUtils.getNameFor(reconciler)
+ " because its configuration cannot be found.\n" +
" Known reconcilers are: "
+ ConfigurationServiceProvider.instance().getKnownReconcilerNames());
+ configurationService.getKnownReconcilerNames());
}

final var controller = new Controller<>(reconciler, configuration, kubernetesClient);
Expand Down Expand Up @@ -239,7 +240,7 @@ public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> re
public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> reconciler,
Consumer<ControllerConfigurationOverrider<P>> configOverrider) {
final var controllerConfiguration =
ConfigurationServiceProvider.instance().getConfigurationFor(reconciler);
configurationService.getConfigurationFor(reconciler);
var configToOverride = ControllerConfigurationOverrider.override(controllerConfiguration);
configOverrider.accept(configToOverride);
return register(reconciler, configToOverride.build());
Expand All @@ -264,4 +265,8 @@ public RuntimeInfo getRuntimeInfo() {
boolean isStarted() {
return started;
}

public ConfigurationService getConfigurationService() {
return configurationService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ protected <P extends HasMetadata> ControllerConfiguration<P> configFor(Reconcile
" annotation for reconciler: " + reconciler);
}
Class<Reconciler<P>> reconcilerClass = (Class<Reconciler<P>>) reconciler.getClass();
final var resourceClass = ConfigurationServiceProvider.instance().getResourceClassResolver()
.getResourceClass(reconcilerClass);
final var resourceClass = getResourceClassResolver().getResourceClass(reconcilerClass);

final var name = ReconcilerUtils.getNameFor(reconciler);
final var generationAware = valueOrDefault(
Expand Down Expand Up @@ -152,7 +151,7 @@ protected <P extends HasMetadata> ControllerConfiguration<P> configFor(Reconcile
io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration::labelSelector,
Constants.NO_VALUE_SET),
null,
Utils.instantiate(annotation.itemStore(), ItemStore.class, context));
Utils.instantiate(annotation.itemStore(), ItemStore.class, context), this);

ResourceEventFilter<P> answer = deprecatedEventFilter(annotation);
config.setEventFilter(answer != null ? answer : ResourceEventFilters.passthrough());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -248,4 +252,19 @@ default ManagedWorkflowFactory getWorkflowFactory() {
default ResourceClassResolver getResourceClassResolver() {
return new DefaultResourceClassResolver();
}

static ConfigurationService newOverriddenConfigurationService(
ConfigurationService baseConfiguration,
Consumer<ConfigurationServiceOverrider> overrider) {
if (overrider != null) {
final var toOverride = new ConfigurationServiceOverrider(baseConfiguration);
overrider.accept(toOverride);
return toOverride.build();
}
return baseConfiguration;
}

default ExecutorServiceManager getExecutorServiceManager() {
return new ExecutorServiceManager(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,10 @@ public ResourceClassResolver getResourceClassResolver() {
}

/**
* @deprecated Use {@link ConfigurationServiceProvider#overrideCurrent(Consumer)} instead
* @param original that will be overriding
* @deprecated Use
* {@link ConfigurationService#newOverriddenConfigurationService(ConfigurationService, Consumer)}
* instead
* @param original that will be overridden
* @return current overrider
*/
@Deprecated(since = "2.2.0")
Expand Down
Loading