Use base task dir in kubernetes task runner (#13880)

* Use TaskConfig to get task dir in KubernetesTaskRunner

* Use the first path specified in baseTaskDirPaths instead of deprecated baseTaskDirPath

* Use getBaseTaskDirPaths in generate command
This commit is contained in:
Nicholas Lippis 2023-03-07 17:30:42 -05:00 committed by GitHub
parent 3924f0eff4
commit faac43eabe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 60 deletions

View File

@ -35,7 +35,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
@ -66,6 +66,7 @@ import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@ -109,7 +110,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final ListeningExecutorService exec;
private final KubernetesPeonClient client;
private final DruidNode node;
private final TaskStorageDirTracker dirTracker;
private final TaskConfig taskConfig;
public KubernetesTaskRunner(
@ -120,7 +121,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
TaskLogPusher taskLogPusher,
KubernetesPeonClient client,
DruidNode node,
TaskStorageDirTracker dirTracker
TaskConfig taskConfig
)
{
this.startupLoggingConfig = startupLoggingConfig;
@ -138,7 +139,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
"The task queue bounds how many concurrent k8s tasks you can have"
);
this.dirTracker = dirTracker;
this.taskConfig = taskConfig;
}
@ -165,7 +166,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
PeonCommandContext context = new PeonCommandContext(
generateCommand(task),
javaOpts(task),
dirTracker.getTaskDir(task.getId()),
new File(taskConfig.getBaseTaskDirPaths().get(0)),
node.isEnableTlsPort()
);
Job job = adapter.fromTask(task, context);
@ -223,7 +224,6 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
Files.deleteIfExists(log);
}
client.cleanUpJob(new K8sTaskId(task.getId()));
dirTracker.removeTask(task.getId());
synchronized (tasks) {
tasks.remove(task.getId());
}
@ -267,7 +267,6 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
public void shutdown(String taskid, String reason)
{
client.cleanUpJob(new K8sTaskId(taskid));
dirTracker.removeTask(taskid);
}
@ -346,7 +345,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
{
final List<String> command = new ArrayList<>();
command.add("/peon.sh");
command.add(dirTracker.getBaseTaskDir(task.getId()).getAbsolutePath());
command.add(taskConfig.getBaseTaskDirPaths().get(0));
command.add(task.getId());
command.add("1"); // the attemptId is always 1, we never run the task twice on the same pod.

View File

@ -25,7 +25,7 @@ import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
@ -46,7 +46,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final TaskQueueConfig taskQueueConfig;
private final TaskLogPusher taskLogPusher;
private final DruidNode druidNode;
private final TaskStorageDirTracker dirTracker;
private final TaskConfig taskConfig;
private KubernetesTaskRunner runner;
@ -58,7 +58,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
@JacksonInject TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
@Self DruidNode druidNode,
TaskStorageDirTracker dirTracker
TaskConfig taskConfig
)
{
@ -68,7 +68,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.druidNode = druidNode;
this.dirTracker = dirTracker;
this.taskConfig = taskConfig;
}
@Override
@ -99,7 +99,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
taskLogPusher,
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
druidNode,
dirTracker
taskConfig
);
return runner;
}

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
@ -38,7 +37,6 @@ import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
@ -62,17 +60,10 @@ import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -90,25 +81,18 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class KubernetesTaskRunnerTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private TaskQueueConfig taskQueueConfig;
private StartupLoggingConfig startupLoggingConfig;
private ObjectMapper jsonMapper;
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private TaskStorageDirTracker dirTracker;
private TaskConfig taskConfig;
private TaskLogPusher taskLogPusher;
private DruidNode node;
private final boolean useMultipleBaseTaskDirPaths;
public KubernetesTaskRunnerTest(boolean useMultipleBaseTaskDirPaths)
public KubernetesTaskRunnerTest()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@ -119,31 +103,12 @@ public class KubernetesTaskRunnerTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
this.useMultipleBaseTaskDirPaths = useMultipleBaseTaskDirPaths;
}
@Parameterized.Parameters(name = "useMultipleBaseTaskDirPaths = {0}")
public static Collection<Object[]> getParameters()
{
Object[][] parameters = new Object[][]{
{false},
{true}
};
return Arrays.asList(parameters);
}
@Before
public void setUp() throws IOException
public void setUp()
{
List<String> baseTaskDirPaths = ImmutableList.of("src/test/resources");
if (useMultipleBaseTaskDirPaths) {
baseTaskDirPaths = ImmutableList.of(
temporaryFolder.newFolder().toString(),
temporaryFolder.newFolder().toString()
);
}
final TaskConfig taskConfig = new TaskConfig(
taskConfig = new TaskConfig(
"src/test/resources",
"src/test/resources",
null,
@ -158,9 +123,8 @@ public class KubernetesTaskRunnerTest
null,
null,
false,
baseTaskDirPaths
null
);
dirTracker = new TaskStorageDirTracker(taskConfig);
kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
kubernetesTaskRunnerConfig.namespace = "test";
kubernetesTaskRunnerConfig.javaOptsArray = Collections.singletonList("-Xmx2g");
@ -214,7 +178,7 @@ public class KubernetesTaskRunnerTest
taskLogPusher,
peonClient,
node,
dirTracker
taskConfig
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@ -271,7 +235,7 @@ public class KubernetesTaskRunnerTest
taskLogPusher,
peonClient,
node,
dirTracker
taskConfig
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@ -339,7 +303,7 @@ public class KubernetesTaskRunnerTest
taskLogPusher,
peonClient,
node,
dirTracker
taskConfig
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems = spyRunner.getKnownTasks();
@ -404,7 +368,7 @@ public class KubernetesTaskRunnerTest
taskLogPusher,
peonClient,
node,
dirTracker
taskConfig
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems = spyRunner.getKnownTasks();
@ -447,7 +411,7 @@ public class KubernetesTaskRunnerTest
taskLogPusher,
peonClient,
node,
dirTracker
taskConfig
);
RunnerTaskState state = taskRunner.getRunnerTaskState("foo");
@ -477,7 +441,7 @@ public class KubernetesTaskRunnerTest
taskLogPusher,
mock(DruidKubernetesPeonClient.class),
node,
dirTracker
taskConfig
));
}

View File

@ -118,6 +118,7 @@ public class TaskConfig
@JsonProperty
private final boolean encapsulatedTask;
@Deprecated
@JsonProperty("baseTaskDir")
private final String baseTaskDirPath;
@ -201,6 +202,7 @@ public class TaskConfig
return baseDir;
}
@Deprecated
@JsonProperty("baseTaskDir")
public String getBaseTaskDirPath()
{