diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java index de6db915c8a..9b482e75dc9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java @@ -20,10 +20,9 @@ package org.apache.druid.k8s.overlord; import com.google.inject.Inject; -import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; +import com.google.inject.name.Named; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.WorkerTaskRunner; -import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy; @@ -32,9 +31,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory taskRunnerFactory; private final RunnerStrategy runnerStrategy; private KubernetesAndWorkerTaskRunner runner; @@ -42,16 +39,12 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory taskRunnerFactory, RunnerStrategy runnerStrategy ) { this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory; - this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory; - this.remoteTaskRunnerFactory = remoteTaskRunnerFactory; - this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig; + this.taskRunnerFactory = taskRunnerFactory; this.runnerStrategy = runnerStrategy; } @@ -60,19 +53,12 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory provideWorkerTaskRunner( + KubernetesAndWorkerTaskRunnerConfig runnerConfig, + Injector injector + ) + { + String workerType = runnerConfig.getWorkerType(); + return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) + ? injector.getInstance(HttpRemoteTaskRunnerFactory.class) + : injector.getInstance(RemoteTaskRunnerFactory.class); + } + private static class RunnerStrategyProvider implements Provider { private KubernetesAndWorkerTaskRunnerConfig runnerConfig; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java index 88696017d05..b309d4ef04b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java @@ -20,9 +20,8 @@ package org.apache.druid.k8s.overlord; -import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; -import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; -import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy; +import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -37,59 +36,17 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport { @Mock KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory; - @Mock HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory; - @Mock RemoteTaskRunnerFactory remoteTaskRunnerFactory; + @Mock TaskRunnerFactory taskRunnerFactory; @Test - public void test_useHttpTaskRunner_asDefault() + public void test_buildKubernetesTaskRunnerSuccessfully() { KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory( kubernetesTaskRunnerFactory, - httpRemoteTaskRunnerFactory, - remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig(null, null), + taskRunnerFactory, new WorkerRunnerStrategy() ); - - EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null); - EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null); - - replayAll(); - factory.build(); - verifyAll(); - } - - @Test - public void test_specifyRemoteTaskRunner() - { - KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory( - kubernetesTaskRunnerFactory, - httpRemoteTaskRunnerFactory, - remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig(null, "remote"), - new WorkerRunnerStrategy() - ); - - EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null); - EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null); - - replayAll(); - factory.build(); - verifyAll(); - } - - @Test(expected = IllegalArgumentException.class) - public void test_specifyIncorrectTaskRunner_shouldThrowException() - { - KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory( - kubernetesTaskRunnerFactory, - httpRemoteTaskRunnerFactory, - remoteTaskRunnerFactory, - new KubernetesAndWorkerTaskRunnerConfig(null, "noop"), - new KubernetesRunnerStrategy() - ); - - EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null); + EasyMock.expect(taskRunnerFactory.build()).andReturn(null); EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null); replayAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java new file mode 100644 index 00000000000..b83ec562bda --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.ProvisionException; +import org.apache.druid.guice.ConfigModule; +import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; +import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; +import org.apache.druid.jackson.JacksonModule; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.server.DruidNode; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Properties; + +@RunWith(EasyMockRunner.class) +public class KubernetesOverlordModuleTest +{ + @Mock + private ServiceEmitter serviceEmitter; + @Mock + private TaskConfig taskConfig; + @Mock + private HttpClient httpClient; + @Mock + private RemoteTaskRunnerFactory remoteTaskRunnerFactory; + @Mock + private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory; + private Injector injector; + + @Test + public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully() + { + injector = makeInjectorWithProperties(initializePropertes(false), false, true); + KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance( + KubernetesAndWorkerTaskRunnerFactory.class); + Assert.assertNotNull(taskRunnerFactory); + + Assert.assertNotNull(taskRunnerFactory.build()); + } + + @Test + public void testRemoteTaskRunnerFactoryBindSuccessfully() + { + injector = makeInjectorWithProperties(initializePropertes(true), true, false); + KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance( + KubernetesAndWorkerTaskRunnerFactory.class); + Assert.assertNotNull(taskRunnerFactory); + + Assert.assertNotNull(taskRunnerFactory.build()); + } + + @Test(expected = ProvisionException.class) + public void testExceptionThrownIfNoTaskRunnerFactoryBind() + { + injector = makeInjectorWithProperties(initializePropertes(false), false, false); + injector.getInstance(KubernetesAndWorkerTaskRunnerFactory.class); + } + + private Injector makeInjectorWithProperties( + final Properties props, + boolean isWorkerTypeRemote, + boolean isWorkerTypeHttpRemote + ) + { + return Guice.createInjector( + ImmutableList.of( + new DruidGuiceExtensions(), + new JacksonModule(), + + binder -> { + binder.bind(Properties.class).toInstance(props); + binder.bind(ServiceEmitter.class).toInstance(serviceEmitter); + binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient); + binder.bind(TaskConfig.class).toInstance(taskConfig); + binder.bind(DruidNode.class) + .annotatedWith(Self.class) + .toInstance(new DruidNode("test-inject", null, false, null, null, true, false)); + if (isWorkerTypeRemote) { + binder.bind(RemoteTaskRunnerFactory.class).toInstance(remoteTaskRunnerFactory); + } + if (isWorkerTypeHttpRemote) { + binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory); + } + }, + new ConfigModule(), + new KubernetesOverlordModule() + )); + } + + private static Properties initializePropertes(boolean isWorkerTypeRemote) + { + final Properties props = new Properties(); + props.put("druid.indexer.runner.namespace", "NAMESPACE"); + props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.type", "k8s"); + if (isWorkerTypeRemote) { + props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType", "remote"); + } + return props; + } +}