Fix k8sAndWorker mode in a zookeeper-less environment (#15445)

* Fix k8sAndWorker mode in a zookeeper-less environment

* add unit test

* code reformat

* minor refine

* change to inject Provider

* correct style

* bind HttpRemoteTaskRunnerFactory as provider

* change to bind on TaskRunnerFactory

* fix styling
This commit is contained in:
YongGang 2024-01-12 09:30:01 -08:00 committed by GitHub
parent cccf13ea82
commit 0457c71d03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 166 additions and 68 deletions

View File

@ -20,10 +20,9 @@
package org.apache.druid.k8s.overlord;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import com.google.inject.name.Named;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
@ -32,9 +31,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K
public static final String TYPE_NAME = "k8sAndWorker";
private final KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig;
private final TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory;
private final RunnerStrategy runnerStrategy;
private KubernetesAndWorkerTaskRunner runner;
@ -42,16 +39,12 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K
@Inject
public KubernetesAndWorkerTaskRunnerFactory(
KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
RemoteTaskRunnerFactory remoteTaskRunnerFactory,
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig,
@Named("taskRunnerFactory") TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory,
RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig;
this.taskRunnerFactory = taskRunnerFactory;
this.runnerStrategy = runnerStrategy;
}
@ -60,19 +53,12 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K
{
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunnerFactory.build(),
getWorkerTaskRunner(),
taskRunnerFactory.build(),
runnerStrategy
);
return runner;
}
private WorkerTaskRunner getWorkerTaskRunner()
{
String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType();
return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ?
httpRemoteTaskRunnerFactory.build() : remoteTaskRunnerFactory.build();
}
@Override
public KubernetesAndWorkerTaskRunner get()
{

View File

@ -21,10 +21,12 @@ package org.apache.druid.k8s.overlord;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Named;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.discovery.NodeRole;
@ -37,8 +39,11 @@ import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@ -130,6 +135,26 @@ public class KubernetesOverlordModule implements DruidModule
return client;
}
/**
* Provides a TaskRunnerFactory instance suitable for environments without Zookeeper.
* In such environments, the standard RemoteTaskRunnerFactory may not be operational.
* Depending on the workerType defined in KubernetesAndWorkerTaskRunnerConfig,
* this method selects and returns an appropriate TaskRunnerFactory implementation.
*/
@Provides
@LazySingleton
@Named("taskRunnerFactory")
TaskRunnerFactory<? extends WorkerTaskRunner> provideWorkerTaskRunner(
KubernetesAndWorkerTaskRunnerConfig runnerConfig,
Injector injector
)
{
String workerType = runnerConfig.getWorkerType();
return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType)
? injector.getInstance(HttpRemoteTaskRunnerFactory.class)
: injector.getInstance(RemoteTaskRunnerFactory.class);
}
private static class RunnerStrategyProvider implements Provider<RunnerStrategy>
{
private KubernetesAndWorkerTaskRunnerConfig runnerConfig;

View File

@ -20,9 +20,8 @@
package org.apache.druid.k8s.overlord;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@ -37,59 +36,17 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport
{
@Mock KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
@Mock HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
@Mock RemoteTaskRunnerFactory remoteTaskRunnerFactory;
@Mock TaskRunnerFactory<? extends WorkerTaskRunner> taskRunnerFactory;
@Test
public void test_useHttpTaskRunner_asDefault()
public void test_buildKubernetesTaskRunnerSuccessfully()
{
KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig(null, null),
taskRunnerFactory,
new WorkerRunnerStrategy()
);
EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
replayAll();
factory.build();
verifyAll();
}
@Test
public void test_specifyRemoteTaskRunner()
{
KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig(null, "remote"),
new WorkerRunnerStrategy()
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
replayAll();
factory.build();
verifyAll();
}
@Test(expected = IllegalArgumentException.class)
public void test_specifyIncorrectTaskRunner_shouldThrowException()
{
KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig(null, "noop"),
new KubernetesRunnerStrategy()
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
EasyMock.expect(taskRunnerFactory.build()).andReturn(null);
EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
replayAll();

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.ProvisionException;
import org.apache.druid.guice.ConfigModule;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.DruidNode;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Properties;
@RunWith(EasyMockRunner.class)
public class KubernetesOverlordModuleTest
{
@Mock
private ServiceEmitter serviceEmitter;
@Mock
private TaskConfig taskConfig;
@Mock
private HttpClient httpClient;
@Mock
private RemoteTaskRunnerFactory remoteTaskRunnerFactory;
@Mock
private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
private Injector injector;
@Test
public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully()
{
injector = makeInjectorWithProperties(initializePropertes(false), false, true);
KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance(
KubernetesAndWorkerTaskRunnerFactory.class);
Assert.assertNotNull(taskRunnerFactory);
Assert.assertNotNull(taskRunnerFactory.build());
}
@Test
public void testRemoteTaskRunnerFactoryBindSuccessfully()
{
injector = makeInjectorWithProperties(initializePropertes(true), true, false);
KubernetesAndWorkerTaskRunnerFactory taskRunnerFactory = injector.getInstance(
KubernetesAndWorkerTaskRunnerFactory.class);
Assert.assertNotNull(taskRunnerFactory);
Assert.assertNotNull(taskRunnerFactory.build());
}
@Test(expected = ProvisionException.class)
public void testExceptionThrownIfNoTaskRunnerFactoryBind()
{
injector = makeInjectorWithProperties(initializePropertes(false), false, false);
injector.getInstance(KubernetesAndWorkerTaskRunnerFactory.class);
}
private Injector makeInjectorWithProperties(
final Properties props,
boolean isWorkerTypeRemote,
boolean isWorkerTypeHttpRemote
)
{
return Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new JacksonModule(),
binder -> {
binder.bind(Properties.class).toInstance(props);
binder.bind(ServiceEmitter.class).toInstance(serviceEmitter);
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
binder.bind(TaskConfig.class).toInstance(taskConfig);
binder.bind(DruidNode.class)
.annotatedWith(Self.class)
.toInstance(new DruidNode("test-inject", null, false, null, null, true, false));
if (isWorkerTypeRemote) {
binder.bind(RemoteTaskRunnerFactory.class).toInstance(remoteTaskRunnerFactory);
}
if (isWorkerTypeHttpRemote) {
binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory);
}
},
new ConfigModule(),
new KubernetesOverlordModule()
));
}
private static Properties initializePropertes(boolean isWorkerTypeRemote)
{
final Properties props = new Properties();
props.put("druid.indexer.runner.namespace", "NAMESPACE");
props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.type", "k8s");
if (isWorkerTypeRemote) {
props.put("druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType", "remote");
}
return props;
}
}