Mixed task runner for migration to mm-less ingestion (#14918)

* save work

* Working

* Fix runner constructor

* Working runner

* extra log lines

* try using lifecycle for everything

* clean up configs

* cleanup /workers call

* Use a single config

* Allow selecting runner

* debug changes

* Work on composite task runner

* Unit tests running

* Add documentation

* Add some javadocs

* Fix spelling

* Use standard libraries

* code review

* fix

* fix

* use taskRunner as string

* checkstyl

---------

Co-authored-by: Suneet Saldanha <suneet@apache.org>
This commit is contained in:
George Shiqi Wu 2023-09-11 21:09:46 -04:00 committed by GitHub
parent 3a453f7a3c
commit f773d83914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 911 additions and 9 deletions

View File

@ -272,4 +272,18 @@ roleRef:
kind: Role
name: druid-k8s-task-scheduler
apiGroup: rbac.authorization.k8s.io
```
```
## Migration/Kubernetes and Worker Task Runner
If you are running a cluster with tasks running on middle managers or indexers and want to do a zero downtime migration to mm-less ingestion, the mm-less ingestion system is capable of running in migration mode by reading tasks from middle managers/indexers and Kubernetes and writing tasks to either middle managers or to Kubernetes.
To do this, set the following property.
`druid.indexer.runner.type: k8sAndWorker` (instead of `druid.indexer.runner.type: k8s`)
### Additional Configurations
|Property| Possible Values |Description|Default|required|
|--------|-----------------|-----------|-------|--------|
|`druid.indexer.runner.k8sAndWorker.workerTaskRunnerType`|`String`|Determines whether the `httpRemote` or the `remote` task runner should be used in addition to the Kubernetes task runner.|`httpRemote`|No|
|`druid.indexer.runner.k8sAndWorker.sendAllTasksToWorkerTaskRunner`|`boolean`| Whether to send all the tasks to the worker task runner. If this is set to false all tasks will be sent to Kubernetes|`false`|No|

View File

@ -0,0 +1,268 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.tasklogs.TaskLogStreamer;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
/**
* Mixed mode task runner that can run tasks on either Kubernetes or workers based on KubernetesAndWorkerTaskRunnerConfig.
* This task runner is always aware of task runner running on either system.
*/
public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTaskRunner
{
private final KubernetesTaskRunner kubernetesTaskRunner;
private final WorkerTaskRunner workerTaskRunner;
private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig;
public KubernetesAndWorkerTaskRunner(
KubernetesTaskRunner kubernetesTaskRunner,
WorkerTaskRunner workerTaskRunner,
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
)
{
this.kubernetesTaskRunner = kubernetesTaskRunner;
this.workerTaskRunner = workerTaskRunner;
this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig;
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.restore(), workerTaskRunner.restore()));
}
@Override
@LifecycleStart
public void start()
{
kubernetesTaskRunner.start();
workerTaskRunner.start();
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
kubernetesTaskRunner.registerListener(listener, executor);
workerTaskRunner.registerListener(listener, executor);
}
@Override
public void unregisterListener(String listenerId)
{
kubernetesTaskRunner.unregisterListener(listenerId);
workerTaskRunner.unregisterListener(listenerId);
}
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
if (kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
return workerTaskRunner.run(task);
} else {
return kubernetesTaskRunner.run(task);
}
}
@Override
public void shutdown(String taskid, String reason)
{
// Technically this is a no-op for tasks a runner does't know about.
workerTaskRunner.shutdown(taskid, reason);
kubernetesTaskRunner.shutdown(taskid, reason);
}
@Override
@LifecycleStop
public void stop()
{
kubernetesTaskRunner.stop();
workerTaskRunner.stop();
}
@Override
public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{
return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getRunningTasks(), workerTaskRunner.getRunningTasks()));
}
@Override
public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
{
return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getPendingTasks(), workerTaskRunner.getPendingTasks()));
}
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
return Lists.newArrayList(Iterables.concat(kubernetesTaskRunner.getKnownTasks(), workerTaskRunner.getKnownTasks()));
}
@Override
public Optional<ScalingStats> getScalingStats()
{
return workerTaskRunner.getScalingStats();
}
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
Map<String, Long> taskSlotCounts = new HashMap<>();
taskSlotCounts.putAll(kubernetesTaskRunner.getTotalTaskSlotCount());
taskSlotCounts.putAll(workerTaskRunner.getTotalTaskSlotCount());
return taskSlotCounts;
}
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
Map<String, Long> taskSlotCounts = new HashMap<>(workerTaskRunner.getIdleTaskSlotCount());
kubernetesTaskRunner.getIdleTaskSlotCount().forEach((tier, count) -> taskSlotCounts.merge(tier, count, Long::sum));
return taskSlotCounts;
}
@Override
public Map<String, Long> getUsedTaskSlotCount()
{
Map<String, Long> taskSlotCounts = new HashMap<>(workerTaskRunner.getUsedTaskSlotCount());
kubernetesTaskRunner.getUsedTaskSlotCount().forEach((tier, count) -> taskSlotCounts.merge(tier, count, Long::sum));
return taskSlotCounts;
}
@Override
public Map<String, Long> getLazyTaskSlotCount()
{
Map<String, Long> taskSlotCounts = new HashMap<>(workerTaskRunner.getLazyTaskSlotCount());
kubernetesTaskRunner.getLazyTaskSlotCount().forEach((tier, count) -> taskSlotCounts.merge(tier, count, Long::sum));
return taskSlotCounts;
}
@Override
public Map<String, Long> getBlacklistedTaskSlotCount()
{
Map<String, Long> taskSlotCounts = new HashMap<>(workerTaskRunner.getBlacklistedTaskSlotCount());
kubernetesTaskRunner.getBlacklistedTaskSlotCount().forEach((tier, count) -> taskSlotCounts.merge(tier, count, Long::sum));
return taskSlotCounts;
}
@Override
public Collection<ImmutableWorkerInfo> getWorkers()
{
return workerTaskRunner.getWorkers();
}
@Override
public Collection<Worker> getLazyWorkers()
{
return workerTaskRunner.getLazyWorkers();
}
@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
{
return workerTaskRunner.markWorkersLazy(isLazyWorker, maxWorkers);
}
@Override
public WorkerTaskRunnerConfig getConfig()
{
return workerTaskRunner.getConfig();
}
@Override
public Collection<Task> getPendingTaskPayloads()
{
return workerTaskRunner.getPendingTaskPayloads();
}
@Override
public Optional<InputStream> streamTaskLog(String taskid, long offset) throws IOException
{
Optional<InputStream> kubernetesTaskLog = kubernetesTaskRunner.streamTaskLog(taskid, offset);
if (kubernetesTaskLog.isPresent()) {
return kubernetesTaskLog;
} else if (workerTaskRunner instanceof TaskLogStreamer) {
return ((TaskLogStreamer) workerTaskRunner).streamTaskLog(taskid, offset);
}
return Optional.absent();
}
@Nullable
@Override
public RunnerTaskState getRunnerTaskState(String taskId)
{
RunnerTaskState runnerTaskState = kubernetesTaskRunner.getRunnerTaskState(taskId);
if (runnerTaskState == null) {
return workerTaskRunner.getRunnerTaskState(taskId);
}
return runnerTaskState;
}
@Override
public int getTotalCapacity()
{
int k8sCapacity = kubernetesTaskRunner.getTotalCapacity();
int workerCapacity = workerTaskRunner.getTotalCapacity();
if (k8sCapacity == -1 && workerCapacity == -1) {
return -1;
}
return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
}
@Override
public int getUsedCapacity()
{
int k8sCapacity = kubernetesTaskRunner.getUsedCapacity();
int workerCapacity = workerTaskRunner.getUsedCapacity();
if (k8sCapacity == -1 && workerCapacity == -1) {
return -1;
}
return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import javax.validation.constraints.NotNull;
public class KubernetesAndWorkerTaskRunnerConfig
{
private static final String DEFAULT_WORKER_TASK_RUNNER_TYPE = HttpRemoteTaskRunnerFactory.TYPE_NAME;
/**
* Select which worker task runner to use in addition to the Kubernetes runner, options are httpRemote or remote.
*/
@JsonProperty
@NotNull
private final String workerTaskRunnerType;
/**
* Whether or not to send tasks to the worker task runner instead of the Kubernetes runner.
*/
@JsonProperty
@NotNull
private final boolean sendAllTasksToWorkerTaskRunner;
@JsonCreator
public KubernetesAndWorkerTaskRunnerConfig(
@JsonProperty("workerTaskRunnerType") String workerTaskRunnerType,
@JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean sendAllTasksToWorkerTaskRunner
)
{
this.workerTaskRunnerType = ObjectUtils.defaultIfNull(
workerTaskRunnerType,
DEFAULT_WORKER_TASK_RUNNER_TYPE
);
Preconditions.checkArgument(
this.workerTaskRunnerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
this.workerTaskRunnerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
"workerTaskRunnerType must be set to one of (%s, %s)",
HttpRemoteTaskRunnerFactory.TYPE_NAME,
RemoteTaskRunnerFactory.TYPE_NAME
);
this.sendAllTasksToWorkerTaskRunner = ObjectUtils.defaultIfNull(
sendAllTasksToWorkerTaskRunner,
false
);
}
public String getWorkerTaskRunnerType()
{
return workerTaskRunnerType;
}
public boolean isSendAllTasksToWorkerTaskRunner()
{
return sendAllTasksToWorkerTaskRunner;
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<KubernetesAndWorkerTaskRunner>
{
public static final String TYPE_NAME = "k8sAndWorker";
private final KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig;
private KubernetesAndWorkerTaskRunner runner;
@Inject
public KubernetesAndWorkerTaskRunnerFactory(
KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
RemoteTaskRunnerFactory remoteTaskRunnerFactory,
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
)
{
this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig;
}
@Override
public KubernetesAndWorkerTaskRunner build()
{
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunnerFactory.build(),
HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(kubernetesAndWorkerTaskRunnerConfig.getWorkerTaskRunnerType()) ?
httpRemoteTaskRunnerFactory.build() : remoteTaskRunnerFactory.build(),
kubernetesAndWorkerTaskRunnerConfig
);
return runner;
}
@Override
public KubernetesAndWorkerTaskRunner get()
{
return runner;
}
}

View File

@ -57,6 +57,7 @@ public class KubernetesOverlordModule implements DruidModule
{
// druid.indexer.runner.type=k8s
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8sAndWorker", KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
PolyBind.createChoice(
binder,
@ -72,7 +73,11 @@ public class KubernetesOverlordModule implements DruidModule
biddy.addBinding(KubernetesTaskRunnerFactory.TYPE_NAME)
.to(KubernetesTaskRunnerFactory.class)
.in(LazySingleton.class);
biddy.addBinding(KubernetesAndWorkerTaskRunnerFactory.TYPE_NAME)
.to(KubernetesAndWorkerTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
configureTaskLogs(binder);
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class KubernetesAndWorkerTaskRunnerConfigTest
{
@Test
public void test_deserializable() throws IOException
{
ObjectMapper mapper = new DefaultObjectMapper();
KubernetesAndWorkerTaskRunnerConfig config = mapper.readValue(
this.getClass().getClassLoader().getResource("kubernetesAndWorkerTaskRunnerConfig.json"),
KubernetesAndWorkerTaskRunnerConfig.class
);
Assert.assertEquals(RemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType());
Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
}
@Test
public void test_withDefaults()
{
KubernetesAndWorkerTaskRunnerConfig config = new KubernetesAndWorkerTaskRunnerConfig(null, null);
Assert.assertEquals(HttpRemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType());
Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(EasyMockRunner.class)
public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport
{
@Mock KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
@Mock HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
@Mock RemoteTaskRunnerFactory remoteTaskRunnerFactory;
@Test
public void test_useHttpTaskRunner_asDefault()
{
KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig(null, null)
);
EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
replayAll();
factory.build();
verifyAll();
}
@Test
public void test_specifyRemoteTaskRunner()
{
KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig("remote", null)
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
replayAll();
factory.build();
verifyAll();
}
@Test(expected = IllegalArgumentException.class)
public void test_specifyIncorrectTaskRunner_shouldThrowException()
{
KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig("noop", null)
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
replayAll();
factory.build();
verifyAll();
}
}

View File

@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@RunWith(EasyMockRunner.class)
public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
{
private static final String ID = "id";
private static final String DATA_SOURCE = "dataSource";
@Mock KubernetesTaskRunner kubernetesTaskRunner;
@Mock HttpRemoteTaskRunner workerTaskRunner;
KubernetesAndWorkerTaskRunner runner;
private Task task;
@Before
public void setup()
{
task = NoopTask.create();
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, null)
);
}
@Test
public void test_runOnKubernetes() throws ExecutionException, InterruptedException
{
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, false)
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(kubernetesTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
replayAll();
Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(task).get());
verifyAll();
}
@Test
public void test_runOnWorker() throws ExecutionException, InterruptedException
{
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, true)
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(workerTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
replayAll();
Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(task).get());
verifyAll();
}
@Test
public void test_getUsedCapacity()
{
EasyMock.expect(kubernetesTaskRunner.getUsedCapacity()).andReturn(1);
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(1);
replayAll();
Assert.assertEquals(2, runner.getUsedCapacity());
verifyAll();
}
@Test
public void test_getTotalCapacity()
{
EasyMock.expect(kubernetesTaskRunner.getTotalCapacity()).andReturn(1);
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(1);
replayAll();
Assert.assertEquals(2, runner.getTotalCapacity());
verifyAll();
}
@Test
public void test_getRunnerTaskState_kubernetes()
{
RunnerTaskState runnerTaskState = RunnerTaskState.RUNNING;
EasyMock.expect(kubernetesTaskRunner.getRunnerTaskState(ID)).andReturn(runnerTaskState);
replayAll();
Assert.assertEquals(runnerTaskState, runner.getRunnerTaskState(ID));
verifyAll();
}
@Test
public void test_getRunnerTaskState_worker()
{
RunnerTaskState runnerTaskState = RunnerTaskState.RUNNING;
EasyMock.expect(kubernetesTaskRunner.getRunnerTaskState(ID)).andReturn(null);
EasyMock.expect(workerTaskRunner.getRunnerTaskState(ID)).andReturn(runnerTaskState);
replayAll();
Assert.assertEquals(runnerTaskState, runner.getRunnerTaskState(ID));
verifyAll();
}
@Test
public void test_streamTaskLog_kubernetes() throws IOException
{
InputStream inputStream = IOUtils.toInputStream("inputStream", Charset.defaultCharset());
EasyMock.expect(kubernetesTaskRunner.streamTaskLog(ID, 0)).andReturn(Optional.of(inputStream));
replayAll();
Assert.assertEquals(inputStream, runner.streamTaskLog(ID, 0).get());
verifyAll();
}
@Test
public void test_streamTasklog_worker() throws IOException
{
InputStream inputStream = IOUtils.toInputStream("inputStream", Charset.defaultCharset());
EasyMock.expect(kubernetesTaskRunner.streamTaskLog(ID, 0)).andReturn(Optional.absent());
EasyMock.expect(workerTaskRunner.streamTaskLog(ID, 0)).andReturn(Optional.of(inputStream));
replayAll();
Assert.assertEquals(inputStream, runner.streamTaskLog(ID, 0).get());
verifyAll();
}
@Test
public void test_getBlacklistedTaskSlotCount()
{
Map<String, Long> kubernetesTaskSlots = ImmutableMap.of("category", 1L);
Map<String, Long> workerTaskSlots = ImmutableMap.of("category2", 2L);
EasyMock.expect(kubernetesTaskRunner.getBlacklistedTaskSlotCount()).andReturn(kubernetesTaskSlots);
EasyMock.expect(workerTaskRunner.getBlacklistedTaskSlotCount()).andReturn(workerTaskSlots);
replayAll();
Assert.assertEquals(
ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
runner.getBlacklistedTaskSlotCount()
);
verifyAll();
}
@Test
public void test_getLazyTaskSlotCount()
{
Map<String, Long> kubernetesTaskSlots = ImmutableMap.of("category", 1L);
Map<String, Long> workerTaskSlots = ImmutableMap.of("category2", 2L);
EasyMock.expect(kubernetesTaskRunner.getLazyTaskSlotCount()).andReturn(kubernetesTaskSlots);
EasyMock.expect(workerTaskRunner.getLazyTaskSlotCount()).andReturn(workerTaskSlots);
replayAll();
Assert.assertEquals(
ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
runner.getLazyTaskSlotCount()
);
verifyAll();
}
@Test
public void test_getIdleTaskSlotCount()
{
Map<String, Long> kubernetesTaskSlots = ImmutableMap.of("category", 1L);
Map<String, Long> workerTaskSlots = ImmutableMap.of("category2", 2L);
EasyMock.expect(kubernetesTaskRunner.getLazyTaskSlotCount()).andReturn(kubernetesTaskSlots);
EasyMock.expect(workerTaskRunner.getLazyTaskSlotCount()).andReturn(workerTaskSlots);
replayAll();
Assert.assertEquals(
ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
runner.getLazyTaskSlotCount()
);
verifyAll();
}
@Test
public void test_getTotalTaskSlotCount()
{
Map<String, Long> kubernetesTaskSlots = ImmutableMap.of("category", 1L);
Map<String, Long> workerTaskSlots = ImmutableMap.of("category2", 2L);
EasyMock.expect(kubernetesTaskRunner.getLazyTaskSlotCount()).andReturn(kubernetesTaskSlots);
EasyMock.expect(workerTaskRunner.getLazyTaskSlotCount()).andReturn(workerTaskSlots);
replayAll();
Assert.assertEquals(
ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
runner.getLazyTaskSlotCount()
);
verifyAll();
}
@Test
public void test_getKnownTasks()
{
EasyMock.expect(kubernetesTaskRunner.getKnownTasks()).andReturn(ImmutableList.of());
EasyMock.expect(workerTaskRunner.getKnownTasks()).andReturn(ImmutableList.of());
replayAll();
Assert.assertEquals(
0,
runner.getKnownTasks().size()
);
verifyAll();
}
@Test
public void test_getPendingTasks()
{
EasyMock.expect(kubernetesTaskRunner.getPendingTasks()).andReturn(ImmutableList.of());
EasyMock.expect(workerTaskRunner.getPendingTasks()).andReturn(ImmutableList.of());
replayAll();
Assert.assertEquals(
0,
runner.getPendingTasks().size()
);
verifyAll();
}
@Test
public void test_getRunningTasks()
{
EasyMock.expect(kubernetesTaskRunner.getRunningTasks()).andReturn(ImmutableList.of());
EasyMock.expect(workerTaskRunner.getRunningTasks()).andReturn(ImmutableList.of());
replayAll();
Assert.assertEquals(
0,
runner.getRunningTasks().size()
);
verifyAll();
}
@Test
public void test_shutdown()
{
String reason = "reason";
kubernetesTaskRunner.shutdown(ID, reason);
workerTaskRunner.shutdown(ID, reason);
replayAll();
runner.shutdown(ID, reason);
verifyAll();
}
@Test
public void test_restore()
{
EasyMock.expect(kubernetesTaskRunner.restore()).andReturn(ImmutableList.of());
EasyMock.expect(workerTaskRunner.restore()).andReturn(ImmutableList.of());
replayAll();
Assert.assertEquals(0, runner.restore().size());
verifyAll();
}
}

View File

@ -0,0 +1,4 @@
{
"workerTaskRunnerType": "remote",
"sendAllTasksToWorkerTaskRunner": false
}

View File

@ -163,5 +163,4 @@ public interface TaskRunner
{
return -1;
}
}

View File

@ -231,7 +231,7 @@ public class CliPeon extends GuiceRunnable
.setTaskFile(Paths.get(taskDirPath, "task.json").toFile())
.setStatusFile(Paths.get(taskDirPath, "attempt", attemptId, "status.json").toFile());
if ("k8s".equals(properties.getProperty("druid.indexer.runner.type", null))) {
if (properties.getProperty("druid.indexer.runner.type", "").contains("k8s")) {
log.info("Running peon in k8s mode");
executorLifecycleConfig.setParentStreamDefined(false);
}

View File

@ -45,7 +45,7 @@ public class CliPeonTest
{
File file = temporaryFolder.newFile("task.json");
FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), true);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), "k8s");
final Injector injector = GuiceInjectors.makeStartupInjector();
injector.injectMembers(runnable);
Assert.assertNotNull(runnable.makeInjector());
@ -56,7 +56,18 @@ public class CliPeonTest
{
File file = temporaryFolder.newFile("task.json");
FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), false);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), "httpRemote");
final Injector injector = GuiceInjectors.makeStartupInjector();
injector.injectMembers(runnable);
Assert.assertNotNull(runnable.makeInjector());
}
@Test
public void testCliPeonK8sANdWorkerIsK8sMode() throws IOException
{
File file = temporaryFolder.newFile("task.json");
FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), "k8sAndWorker");
final Injector injector = GuiceInjectors.makeStartupInjector();
injector.injectMembers(runnable);
Assert.assertNotNull(runnable.makeInjector());
@ -66,7 +77,7 @@ public class CliPeonTest
{
List<String> taskAndStatusFile = new ArrayList<>();
FakeCliPeon(String taskDirectory, boolean runningOnK8s)
FakeCliPeon(String taskDirectory, String runnerType)
{
try {
taskAndStatusFile.add(taskDirectory);
@ -77,9 +88,7 @@ public class CliPeonTest
privateField.setAccessible(true);
privateField.set(this, taskAndStatusFile);
if (runningOnK8s) {
System.setProperty("druid.indexer.runner.type", "k8s");
}
System.setProperty("druid.indexer.runner.type", runnerType);
}
catch (Exception ex) {
// do nothing