Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions extensions-core/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<properties>
<!-- fabric8 6.13.2+ is incompatible with jackson 2.12
~ due to fix for https://github.com/fabric8io/kubernetes-client/issues/6110.
~ GoIntegerDeserializer uses a _parseInteger method that does not exist in 2.12 (introduced in 2.14) -->
<fabric8.version>6.13.1</fabric8.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- snakeyaml explicitly pinned to version 1.33 as it is
Expand Down Expand Up @@ -111,7 +118,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId>
<version>6.7.2</version>
<version>${fabric8.version}</version>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
Expand All @@ -121,18 +128,34 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-batch</artifactId>
<version>6.7.2</version>
<version>${fabric8.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-api</artifactId>
<version>6.7.2</version>
<version>${fabric8.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>6.7.2</version>
<version>${fabric8.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-okhttp</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-vertx</artifactId>
<version>${fabric8.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.5.8</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
Expand Down Expand Up @@ -169,7 +192,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
<version>6.4.1</version>
<version>${fabric8.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesHttpClientConfig;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskExecutionConfigResource;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class KubernetesOverlordModule implements DruidModule
+ ".k8sAndWorker";
private static final String RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING = K8SANDWORKER_PROPERTIES_PREFIX
+ ".runnerStrategy.%s";
private static final String HTTPCLIENT_PROPERITES_PREFIX = K8SANDWORKER_PROPERTIES_PREFIX + ".http";

@Override
public void configure(Binder binder)
Expand Down Expand Up @@ -119,22 +121,28 @@ public void configure(Binder binder)
configureTaskLogs(binder);

Jerseys.addResource(binder, KubernetesTaskExecutionConfigResource.class);

JsonConfigProvider.bind(binder, HTTPCLIENT_PROPERITES_PREFIX, DruidKubernetesHttpClientConfig.class);
}

@Provides
@LazySingleton
public DruidKubernetesClient makeKubernetesClient(KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, Lifecycle lifecycle)
public DruidKubernetesClient makeKubernetesClient(
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
DruidKubernetesHttpClientConfig httpClientConfig,
Lifecycle lifecycle
)
{
DruidKubernetesClient client;
final DruidKubernetesClient client;
final Config config = new ConfigBuilder().build();

if (kubernetesTaskRunnerConfig.isDisableClientProxy()) {
Config config = new ConfigBuilder().build();
config.setHttpsProxy(null);
config.setHttpProxy(null);
client = new DruidKubernetesClient(config);
} else {
client = new DruidKubernetesClient();
}

client = new DruidKubernetesClient(httpClientConfig, config);

lifecycle.addHandler(
new Lifecycle.Handler()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,19 @@
package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;

public class DruidKubernetesClient implements KubernetesClientApi
{

private final Config config;
private final KubernetesClient kubernetesClient;

public DruidKubernetesClient()
{
this(new ConfigBuilder().build());
}

public DruidKubernetesClient(Config config)
public DruidKubernetesClient(DruidKubernetesHttpClientConfig httpClientConfig, Config kubernetesClientConfig)
{
this.config = config;
this.kubernetesClient = new KubernetesClientBuilder().withConfig(config).build();
this.kubernetesClient = new KubernetesClientBuilder()
.withHttpClientFactory(new DruidKubernetesHttpClientFactory(httpClientConfig))
.withConfig(kubernetesClientConfig)
.build();
}

@Override
Expand All @@ -47,8 +41,10 @@ public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesRes
return executor.executeRequest(kubernetesClient);
}

/** This client automatically gets closed by the druid lifecycle, it should not be closed when used as it is
/**
* This client automatically gets closed by the druid lifecycle, it should not be closed when used as it is
* meant to be reused.
*
* @return re-useable KubernetesClient
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.common;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.VertxOptions;

public class DruidKubernetesHttpClientConfig
{
@JsonProperty
private int workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE;

@JsonProperty
private int eventLoopPoolSize = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE;

@JsonProperty
private int internalBlockingPoolSize = VertxOptions.DEFAULT_INTERNAL_BLOCKING_POOL_SIZE;

public int getWorkerPoolSize()
{
return workerPoolSize;
}

public int getEventLoopPoolSize()
{
return eventLoopPoolSize;
}

public int getInternalBlockingPoolSize()
{
return internalBlockingPoolSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.vertx.VertxHttpClientBuilder;
import io.fabric8.kubernetes.client.vertx.VertxHttpClientFactory;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.file.FileSystemOptions;
import io.vertx.core.spi.resolver.ResolverProvider;

/**
* Similar to {@link VertxHttpClientFactory} but allows us to override thread pool configurations.
*/
public class DruidKubernetesHttpClientFactory implements HttpClient.Factory
{
private final Vertx vertx;

public DruidKubernetesHttpClientFactory(final DruidKubernetesHttpClientConfig httpClientConfig)
{
this.vertx = createVertxInstance(httpClientConfig);
}

@Override
public VertxHttpClientBuilder<DruidKubernetesHttpClientFactory> newBuilder()
{
return new VertxHttpClientBuilder<>(this, vertx);
}

/**
* Adapted from fabric8 kubernetes-client 7.1.0. We bring this here so we can customize thread pool sizes
* and force usage of daemon threads.
*/
private static Vertx createVertxInstance(final DruidKubernetesHttpClientConfig httpClientConfig)
{
// fabric8 disables the async DNS resolver while creating Vertx.
// I'm not sure if we really need to do this, but I'm keeping it to align behavior with upstream.
final String originalDnsResolverProperty = System.getProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME);
Vertx vertx;
try {
System.setProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME, "true");
vertx = Vertx.vertx(
new VertxOptions()
.setFileSystemOptions(
new FileSystemOptions().setFileCachingEnabled(false)
.setClassPathResolvingEnabled(false)
)
.setWorkerPoolSize(httpClientConfig.getWorkerPoolSize())
.setEventLoopPoolSize(httpClientConfig.getEventLoopPoolSize())
.setInternalBlockingPoolSize(httpClientConfig.getInternalBlockingPoolSize())
.setUseDaemonThread(true)
);
}
finally {
if (originalDnsResolverProperty == null) {
System.clearProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME);
} else {
System.setProperty(ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME, originalDnsResolverProperty);
}
}

return vertx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesHttpClientConfig;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.NoopTaskLogs;
Expand Down Expand Up @@ -54,7 +56,8 @@ public void setup()
.withCapacity(1)
.build();
taskLogs = new NoopTaskLogs();
druidKubernetesClient = new DruidKubernetesClient();
druidKubernetesClient =
new DruidKubernetesClient(new DruidKubernetesHttpClientConfig(), new ConfigBuilder().build());
taskAdapter = new TestTaskAdapter();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.TestUtils;
Expand All @@ -34,6 +35,7 @@
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesHttpClientConfig;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
Expand Down Expand Up @@ -85,7 +87,7 @@ public void setup()
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
k8sClient = new DruidKubernetesClient();
k8sClient = new DruidKubernetesClient(new DruidKubernetesHttpClientConfig(), new ConfigBuilder().build());
peonClient = new KubernetesPeonClient(k8sClient, "default", false, new NoopServiceEmitter());
druidNode = new DruidNode(
"test",
Expand Down
17 changes: 15 additions & 2 deletions licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ name: kubernetes fabric java client
license_category: binary
module: extensions-core/kubernetes-overlord-extensions
license_name: Apache License version 2.0
version: 6.7.2
version: 6.13.1
libraries:
- io.fabric8: kubernetes-client-api
- io.fabric8: kubernetes-model-batch
Expand All @@ -880,6 +880,19 @@ libraries:
- io.fabric8: kubernetes-model-resource
- io.fabric8: kubernetes-model-scheduling
- io.fabric8: kubernetes-model-storageclass
- io.fabric8: kubernetes-httpclient-vertx
---

name: vertx
license_category: binary
module: extensions-core/kubernetes-overlord-extensions
license_name: Apache License version 2.0
version: 4.5.8
libraries:
- io.vertx: vertx-auth-common
- io.vertx: vertx-core
- io.vertx: vertx-web-client
- io.vertx: vertx-web-common
---

name: kubernetes official java client
Expand Down Expand Up @@ -1051,7 +1064,7 @@ name: org.snakeyaml snakeyaml-engine
license_category: binary
module: extensions-core/druid-kubernetes-overlord-extensions
license_name: Apache License version 2.0
version: 2.6
version: 2.7
libraries:
- org.snakeyaml: snakeyaml-engine

Expand Down
Loading