Make the tasks run with only a single directory (#14063)

* Make the tasks run with only a single directory

There was a change that tried to get indexing to run on multiple disks
It made a bunch of changes to how tasks run, effectively hiding the
"safe" directory for tasks to write files into from the task code itself
making it extremely difficult to do anything correctly inside of a task.

This change reverts those changes inside of the tasks and makes it so that
only the task runners are the ones that make decisions about which
mount points should be used for storing task-related files.

It adds the config druid.worker.baseTaskDirs which can be used by the
task runners to know which directories they should schedule tasks inside of.
The TaskConfig remains the authoritative source of configuration for where
and how an individual task should be operating.
This commit is contained in:
imply-cheddar 2023-04-13 16:45:02 +09:00 committed by GitHub
parent 179e2e8108
commit aaa6cc1883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 822 additions and 1126 deletions

View File

@ -1457,6 +1457,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM
|`druid.worker.ip`|The IP of the worker.|localhost|
|`druid.worker.version`|Version identifier for the MiddleManager. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.indexer.runner.minWorkerVersion`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons.|0|
|`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of CPUs on the machine - 1|
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. Example: `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`|
#### Peon Processing
@ -1510,8 +1511,7 @@ Additional peon configs include:
|--------|-----------|-------|
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Deprecated. Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.baseTaskDirPaths`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of `${druid.indexer.task.baseTaskDir}`. If a null or empty value is provided, `baseTaskDir` is used. Otherwise, it overrides the value of `baseTaskDir`. Example: `druid.indexer.task.baseTaskDirPaths=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.batchProcessingMode`| Batch ingestion tasks have three operating modes to control construction and tracking for intermediary segments: `OPEN_SEGMENTS`, `CLOSED_SEGMENTS`, and `CLOSED_SEGMENT_SINKS`. `OPEN_SEGMENTS` uses the streaming ingestion code path and performs a `mmap` on intermediary segments to build a timeline to make these segments available to realtime queries. Batch ingestion doesn't require intermediary segments, so the default mode, `CLOSED_SEGMENTS`, eliminates `mmap` of intermediary segments. `CLOSED_SEGMENTS` mode still tracks the entire set of segments in heap. The `CLOSED_SEGMENTS_SINKS` mode is the most aggressive configuration and should have the smallest memory footprint. It eliminates in-memory tracking and `mmap` of intermediary segments produced during segment creation. `CLOSED_SEGMENTS_SINKS` mode isn't as well tested as other modes so is currently considered experimental. You can use `OPEN_SEGMENTS` mode if problems occur with the 2 newer modes. |`CLOSED_SEGMENTS`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
@ -1536,7 +1536,7 @@ If the peon is running in remote mode, there must be an Overlord up and running.
When new segments are created, Druid temporarily stores some preprocessed data in some buffers. Currently three types of
*medium* exist for those buffers: *temporary files*, *off-heap memory*, and *on-heap memory*.
*Temporary files* (`tmpFile`) are stored under the task working directory (see `druid.indexer.task.baseTaskDirPaths`
*Temporary files* (`tmpFile`) are stored under the task working directory (see `druid.worker.baseTaskDirs`
configuration above) and thus share it's mounting properties, e. g. they could be backed by HDD, SSD or memory (tmpfs).
This type of medium may do unnecessary disk I/O and requires some disk space to be available.
@ -1577,11 +1577,11 @@ then the value from the configuration below is used:
|--------|-----------|-------|
|`druid.worker.version`|Version identifier for the Indexer.|0|
|`druid.worker.capacity`|Maximum number of tasks the Indexer can accept.|Number of available processors - 1|
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. Example: `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.globalIngestionHeapLimitBytes`|Total amount of heap available for ingestion processing. This is applied by automatically setting the `maxBytesInMemory` property on tasks.|60% of configured JVM heap|
|`druid.worker.numConcurrentMerges`|Maximum number of segment persist or merge operations that can run concurrently across all tasks.|`druid.worker.capacity` / 2, rounded down|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Deprecated. Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|`druid.indexer.task.baseTaskDirPaths`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of `${druid.indexer.task.baseTaskDir}`. If a null or empty value is provided, `baseTaskDir` is used. Otherwise, it overrides the value of `baseTaskDir`. Example: `druid.indexer.task.baseTaskDirPaths=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|

View File

@ -400,7 +400,7 @@ Logs are created by ingestion tasks as they run. You can configure Druid to pus
Once the task has been submitted to the Overlord it remains `WAITING` for locks to be acquired. Worker slot allocation is then `PENDING` until the task can actually start executing.
The task then starts creating logs in a local directory of the middle manager (or indexer) in a `log` directory for the specific `taskId` at [`druid.indexer.task.baseTaskDirPaths`] (../configuration/index.md#additional-peon-configuration).
The task then starts creating logs in a local directory of the middle manager (or indexer) in a `log` directory for the specific `taskId` at [`druid.worker.baseTaskDirs`] (../configuration/index.md#middlemanager-configuration).
When the task completes - whether it succeeds or fails - the middle manager (or indexer) will push the task log file into the location specified in [`druid.indexer.logs`](../configuration/index.md#task-logging).

View File

@ -24,7 +24,7 @@ druid.plaintextPort=8091
# determined based on available processor.
# Task launch parameters
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Processing threads and buffers on Indexer
# Determined automatically based on available memory. For details on how to manually set parameters:

View File

@ -26,7 +26,7 @@ druid.plaintextPort=8091
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Processing threads and buffers on Peons
# Determined automatically based on available memory. For details on how to manually set parameters:

View File

@ -24,7 +24,7 @@ druid.plaintextPort=8091
druid.worker.capacity=4
# Task launch parameters
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=60

View File

@ -22,11 +22,11 @@ druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=4
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=60

View File

@ -22,11 +22,11 @@ druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=8
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=60

View File

@ -22,11 +22,11 @@ druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=4
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=60

View File

@ -22,11 +22,11 @@ druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=2
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=12

View File

@ -22,11 +22,11 @@ druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=2
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms256m","-Xmx256m","-XX:MaxDirectMemorySize=300m","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=6

View File

@ -22,11 +22,11 @@ druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=3
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=50

View File

@ -22,11 +22,11 @@ druid.plaintextPort=8091
# Number of tasks per middleManager
druid.worker.capacity=16
druid.worker.baseTaskDirs=[\"var/druid/task\"]
# Task launch parameters
druid.indexer.runner.javaCommand=bin/run-java
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.task.baseTaskDirPaths=[\"var/druid/task\"]
# HTTP server threads
druid.server.http.numThreads=60

View File

@ -52,7 +52,6 @@ import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -109,7 +108,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
PeonCommandContext context = new PeonCommandContext(
generateCommand(task),
javaOpts(task),
new File(taskConfig.getBaseTaskDirPaths().get(0)),
taskConfig.getBaseTaskDir(),
node.isEnableTlsPort()
);
PodSpec podSpec = pod.getSpec();
@ -371,7 +370,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
{
final List<String> command = new ArrayList<>();
command.add("/peon.sh");
command.add(new File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath());
command.add(taskConfig.getBaseTaskDir().getAbsolutePath());
command.add("1"); // the attemptId is always 1, we never run the task twice on the same pod.
String nodeType = task.getNodeType();

View File

@ -209,7 +209,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
return ImmutableList.of(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(new File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath())
.withValue(taskConfig.getBaseDir())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_ID_ENV)

View File

@ -20,9 +20,9 @@
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
@ -72,23 +72,7 @@ public class KubernetesTaskRunnerFactoryTest
true,
false
);
taskConfig = new TaskConfig(
"/tmp",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("/tmp")
);
taskConfig = new TaskConfigBuilder().setBaseDir("/tmp").build();
properties = new Properties();
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@ -31,6 +30,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
@ -93,23 +93,7 @@ public class DruidPeonClientIntegrationTest
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build();
}
@Disabled

View File

@ -23,13 +23,13 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.api.client.util.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@ -40,6 +40,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
@ -92,29 +93,30 @@ class K8sTaskAdapterTest
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build();
}
@Test
void testAddingLabelsAndAnnotations() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
final PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
TestKubernetesClient testClient = new TestKubernetesClient(client)
{
@SuppressWarnings("unchecked")
@Override
public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException
{
return (T) new Pod()
{
@Override
public PodSpec getSpec()
{
return podSpec;
}
};
}
};
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
config.annotations.put("annotation_key", "annotation_value");
@ -128,11 +130,8 @@ class K8sTaskAdapterTest
jsonMapper
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
task,
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"))
);
Job jobFromSpec = adapter.fromTask(task);
assertTrue(jobFromSpec.getMetadata().getAnnotations().containsKey("annotation_key"));
assertTrue(jobFromSpec.getMetadata().getAnnotations().containsKey(DruidK8sConstants.TASK_ID));
assertFalse(jobFromSpec.getMetadata().getAnnotations().containsKey("label_key"));

View File

@ -22,7 +22,6 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@ -31,6 +30,7 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
@ -78,23 +78,7 @@ class MultiContainerTaskAdapterTest
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build();
}
@Test

View File

@ -20,12 +20,12 @@
package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
@ -58,23 +58,7 @@ public class PodTemplateTaskAdapterTest
public void setup()
{
taskRunnerConfig = new KubernetesTaskRunnerConfig();
taskConfig = new TaskConfig(
"/tmp",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("/tmp")
);
taskConfig = new TaskConfigBuilder().setBaseDir("/tmp").build();
node = new DruidNode(
"test",
"",

View File

@ -22,7 +22,6 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
@ -30,6 +29,7 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
@ -76,23 +76,7 @@ class SingleContainerTaskAdapterTest
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build();
}
@Test

View File

@ -19,81 +19,96 @@
package org.apache.druid.indexing.common;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import javax.inject.Inject;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class TaskStorageDirTracker
{
private int taskDirIndex = 0;
private final List<File> baseTaskDirs = new ArrayList<>();
private final Map<String, File> taskToTempDirMap = new HashMap<>();
@Inject
public TaskStorageDirTracker(final TaskConfig taskConfig)
public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
{
this(taskConfig.getBaseTaskDirPaths());
}
@VisibleForTesting
public TaskStorageDirTracker(final List<String> baseTaskDirPaths)
{
for (String baseTaskDirPath : baseTaskDirPaths) {
baseTaskDirs.add(new File(baseTaskDirPath));
if (workerConfig == null) {
return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
} else {
final List<String> basePaths = workerConfig.getBaseTaskDirs();
if (basePaths == null) {
return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
}
return new TaskStorageDirTracker(
basePaths.stream().map(File::new).collect(Collectors.toList())
);
}
}
public File getTaskDir(String taskId)
private final File[] baseTaskDirs;
// Initialize to a negative number because it ensures that we can handle the overflow-rollover case
private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
public TaskStorageDirTracker(List<File> baseTaskDirs)
{
return new File(getBaseTaskDir(taskId), taskId);
this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
}
public File getTaskWorkDir(String taskId)
@LifecycleStart
public void ensureDirectories()
{
return new File(getTaskDir(taskId), "work");
for (File baseTaskDir : baseTaskDirs) {
if (!baseTaskDir.exists()) {
try {
FileUtils.mkdirp(baseTaskDir);
}
catch (IOException e) {
throw new ISE(
e,
"base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
baseTaskDir
);
}
}
}
}
public File getTaskTempDir(String taskId)
public File pickBaseDir(String taskId)
{
return new File(getTaskDir(taskId), "temp");
}
public List<File> getBaseTaskDirs()
{
return baseTaskDirs;
}
public synchronized File getBaseTaskDir(final String taskId)
{
if (!taskToTempDirMap.containsKey(taskId)) {
addTask(taskId, baseTaskDirs.get(taskDirIndex));
taskDirIndex = (taskDirIndex + 1) % baseTaskDirs.size();
if (baseTaskDirs.length == 1) {
return baseTaskDirs[0];
}
return taskToTempDirMap.get(taskId);
}
public synchronized void addTask(final String taskId, final File taskDir)
{
final File existingTaskDir = taskToTempDirMap.get(taskId);
if (existingTaskDir != null && !existingTaskDir.equals(taskDir)) {
throw new ISE("Task [%s] is already assigned to worker path[%s]", taskId, existingTaskDir.getPath());
// if the task directory already exists, we want to give it precedence, so check.
for (File baseTaskDir : baseTaskDirs) {
if (new File(baseTaskDir, taskId).exists()) {
return baseTaskDir;
}
}
taskToTempDirMap.put(taskId, taskDir);
// if it doesn't exist, pick one round-robin and return. This can be negative, so abs() it
final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % baseTaskDirs.length);
return baseTaskDirs[currIncrement % baseTaskDirs.length];
}
public synchronized void removeTask(final String taskId)
@Nullable
public File findExistingTaskDir(String taskId)
{
taskToTempDirMap.remove(taskId);
if (baseTaskDirs.length == 1) {
return new File(baseTaskDirs[0], taskId);
}
for (File baseTaskDir : baseTaskDirs) {
final File candidateLocation = new File(baseTaskDir, taskId);
if (candidateLocation.exists()) {
return candidateLocation;
}
}
return null;
}
}

View File

@ -128,8 +128,6 @@ public class TaskToolbox
private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final TaskStorageDirTracker dirTracker;
public TaskToolbox(
TaskConfig config,
@ -170,8 +168,7 @@ public class TaskToolbox
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
String attemptId,
TaskStorageDirTracker dirTracker
String attemptId
)
{
this.config = config;
@ -214,7 +211,6 @@ public class TaskToolbox
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.dirTracker = dirTracker;
}
public TaskConfig getConfig()
@ -472,11 +468,6 @@ public class TaskToolbox
return attemptId;
}
public TaskStorageDirTracker getDirTracker()
{
return dirTracker;
}
/**
* Get {@link RuntimeInfo} adjusted for this particular task. When running in a task JVM launched by a MiddleManager,
* this is the same as the baseline {@link RuntimeInfo}. When running in an Indexer, it is adjusted based on
@ -552,7 +543,6 @@ public class TaskToolbox
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
private String attemptId;
private TaskStorageDirTracker dirTracker;
public Builder()
{
@ -597,7 +587,6 @@ public class TaskToolbox
this.intermediaryDataManager = other.intermediaryDataManager;
this.supervisorTaskClientProvider = other.supervisorTaskClientProvider;
this.shuffleClient = other.shuffleClient;
this.dirTracker = other.getDirTracker();
}
public Builder config(final TaskConfig config)
@ -834,12 +823,6 @@ public class TaskToolbox
return this;
}
public Builder dirTracker(final TaskStorageDirTracker dirTracker)
{
this.dirTracker = dirTracker;
return this;
}
public TaskToolbox build()
{
return new TaskToolbox(
@ -881,8 +864,7 @@ public class TaskToolbox
supervisorTaskClientProvider,
shuffleClient,
taskLogPusher,
attemptId,
dirTracker
attemptId
);
}
}

View File

@ -110,7 +110,6 @@ public class TaskToolboxFactory
private final ShuffleClient shuffleClient;
private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final TaskStorageDirTracker dirTracker;
@Inject
public TaskToolboxFactory(
@ -151,8 +150,7 @@ public class TaskToolboxFactory
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
@AttemptId String attemptId,
TaskStorageDirTracker dirTracker
@AttemptId String attemptId
)
{
this.config = config;
@ -193,12 +191,21 @@ public class TaskToolboxFactory
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.dirTracker = dirTracker;
}
public TaskToolbox build(Task task)
{
final File taskWorkDir = dirTracker.getTaskWorkDir(task.getId());
return build(config, task);
}
public TaskToolbox build(File baseTaskDir, Task task)
{
return build(config.withBaseTaskDir(baseTaskDir), task);
}
public TaskToolbox build(TaskConfig config, Task task)
{
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(taskExecutorNode)
@ -243,7 +250,6 @@ public class TaskToolboxFactory
.shuffleClient(shuffleClient)
.taskLogPusher(taskLogPusher)
.attemptId(attemptId)
.dirTracker(dirTracker)
.build();
}
}

View File

@ -25,10 +25,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Period;
import javax.annotation.Nullable;
@ -40,7 +40,7 @@ import java.util.List;
/**
* Configurations for ingestion tasks. These configurations can be applied per middleManager, indexer, or overlord.
*
* <p>
* See {@link org.apache.druid.indexing.overlord.config.DefaultTaskConfig} if you want to apply the same configuration
* to all tasks submitted to the overlord.
*/
@ -82,6 +82,9 @@ public class TaskConfig
@JsonProperty
private final String baseDir;
@JsonProperty
private final File baseTaskDir;
@JsonProperty
private final String hadoopWorkingPath;
@ -118,18 +121,10 @@ public class TaskConfig
@JsonProperty
private final boolean encapsulatedTask;
@Deprecated
@JsonProperty("baseTaskDir")
private final String baseTaskDirPath;
// Use multiple base files for tasks instead of a single one
@JsonProperty
private final List<String> baseTaskDirPaths;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@Deprecated @JsonProperty("baseTaskDir") String baseTaskDirPath,
@JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@ -138,14 +133,15 @@ public class TaskConfig
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
@JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMappedIndex, // deprecated, only set to true to fall back to older behavior
@JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMappedIndex,
// deprecated, only set to true to fall back to older behavior
@JsonProperty("batchProcessingMode") String batchProcessingMode,
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
@JsonProperty("baseTaskDirPaths") @Nullable List<String> baseTaskDirPaths
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task"));
// This is usually on HDFS or similar, so we can't use java.io.tmpdir
this.hadoopWorkingPath = hadoopWorkingPath == null ? "/tmp/druid-indexing" : hadoopWorkingPath;
this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 75000 : defaultRowFlushBoundary;
@ -187,13 +183,39 @@ public class TaskConfig
}
log.debug("Batch processing mode:[%s]", this.batchProcessingMode);
this.storeEmptyColumns = storeEmptyColumns == null ? DEFAULT_STORE_EMPTY_COLUMNS : storeEmptyColumns;
}
this.baseTaskDirPath = baseTaskDirPath;
if (CollectionUtils.isNullOrEmpty(baseTaskDirPaths)) {
String baseTaskFile = defaultDir(baseTaskDirPath, "persistent/task");
baseTaskDirPaths = Collections.singletonList(baseTaskFile);
}
this.baseTaskDirPaths = ImmutableList.copyOf(baseTaskDirPaths);
private TaskConfig(
String baseDir,
File baseTaskDir,
String hadoopWorkingPath,
int defaultRowFlushBoundary,
List<String> defaultHadoopCoordinates,
boolean restoreTasksOnRestart,
Period gracefulShutdownTimeout,
Period directoryLockTimeout,
List<StorageLocationConfig> shuffleDataLocations,
boolean ignoreTimestampSpecForDruidInputSource,
boolean batchMemoryMappedIndex,
BatchProcessingMode batchProcessingMode,
boolean storeEmptyColumns,
boolean encapsulatedTask
)
{
this.baseDir = baseDir;
this.baseTaskDir = baseTaskDir;
this.hadoopWorkingPath = hadoopWorkingPath;
this.defaultRowFlushBoundary = defaultRowFlushBoundary;
this.defaultHadoopCoordinates = defaultHadoopCoordinates;
this.restoreTasksOnRestart = restoreTasksOnRestart;
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
this.directoryLockTimeout = directoryLockTimeout;
this.shuffleDataLocations = shuffleDataLocations;
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.batchMemoryMappedIndex = batchMemoryMappedIndex;
this.batchProcessingMode = batchProcessingMode;
this.storeEmptyColumns = storeEmptyColumns;
this.encapsulatedTask = encapsulatedTask;
}
@JsonProperty
@ -202,17 +224,30 @@ public class TaskConfig
return baseDir;
}
@Deprecated
@JsonProperty("baseTaskDir")
public String getBaseTaskDirPath()
@JsonProperty
public File getBaseTaskDir()
{
return baseTaskDirPath;
return baseTaskDir;
}
@JsonProperty
public List<String> getBaseTaskDirPaths()
public File getTaskDir(String taskId)
{
return baseTaskDirPaths;
return new File(baseTaskDir, IdUtils.validateId("task ID", taskId));
}
public File getTaskWorkDir(String taskId)
{
return new File(getTaskDir(taskId), "work");
}
public File getTaskTempDir(String taskId)
{
return new File(getTaskDir(taskId), "temp");
}
public File getTaskLockFile(String taskId)
{
return new File(getTaskDir(taskId), "lock");
}
@JsonProperty
@ -300,4 +335,23 @@ public class TaskConfig
return configParameter;
}
public TaskConfig withBaseTaskDir(File baseTaskDir)
{
return new TaskConfig(
baseDir,
baseTaskDir,
hadoopWorkingPath,
defaultRowFlushBoundary,
defaultHadoopCoordinates,
restoreTasksOnRestart,
gracefulShutdownTimeout,
directoryLockTimeout,
shuffleDataLocations,
ignoreTimestampSpecForDruidInputSource,
batchMemoryMappedIndex,
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask
);
}
}

View File

@ -142,7 +142,7 @@ public abstract class AbstractTask implements Task
public String setup(TaskToolbox toolbox) throws Exception
{
if (toolbox.getConfig().isEncapsulatedTask()) {
File taskDir = toolbox.getDirTracker().getTaskDir(getId());
File taskDir = toolbox.getConfig().getTaskDir(getId());
FileUtils.mkdirp(taskDir);
File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile();
FileUtils.mkdirp(attemptDir);

View File

@ -48,7 +48,6 @@ import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
@ -102,7 +101,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
private static final String TYPE = "index_hadoop";
private TaskConfig taskConfig = null;
private TaskStorageDirTracker dirTracker = null;
private static String getTheDataSource(HadoopIngestionSpec spec)
{
@ -295,7 +293,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
private File getHadoopJobIdFile()
{
return new File(dirTracker.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME);
return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME);
}
@Override
@ -303,7 +301,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
try {
taskConfig = toolbox.getConfig();
dirTracker = toolbox.getDirTracker();
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this, false);

View File

@ -256,7 +256,7 @@ class PartialDimensionDistributionParallelIndexTaskRunner
private File createDistributionsDir()
{
File taskTempDir = getToolbox().getDirTracker().getTaskTempDir(getTaskId());
File taskTempDir = getToolbox().getConfig().getTaskTempDir(getTaskId());
File distributionsDir = new File(taskTempDir, "dimension_distributions");
try {
FileUtils.mkdirp(distributionsDir);

View File

@ -40,11 +40,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@ -65,62 +62,59 @@ public abstract class BaseRestorableTaskRunner<WorkItemType extends TaskRunnerWo
protected final ConcurrentHashMap<String, WorkItemType> tasks = new ConcurrentHashMap<>();
protected final ObjectMapper jsonMapper;
protected final TaskConfig taskConfig;
protected final TaskStorageDirTracker dirTracker;
private final TaskStorageDirTracker tracker;
public BaseRestorableTaskRunner(
ObjectMapper jsonMapper,
TaskConfig taskConfig,
TaskStorageDirTracker dirTracker
TaskStorageDirTracker tracker
)
{
this.jsonMapper = jsonMapper;
this.taskConfig = taskConfig;
this.dirTracker = dirTracker;
this.tracker = tracker;
}
protected TaskStorageDirTracker getTracker()
{
return tracker;
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
final Map<File, TaskRestoreInfo> taskRestoreInfos = new HashMap<>();
for (File baseDir : dirTracker.getBaseTaskDirs()) {
File restoreFile = new File(baseDir, TASK_RESTORE_FILENAME);
if (restoreFile.exists()) {
try {
taskRestoreInfos.put(baseDir, jsonMapper.readValue(restoreFile, TaskRestoreInfo.class));
}
catch (Exception e) {
LOG.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
}
final File restoreFile = getRestoreFile();
final TaskRestoreInfo taskRestoreInfo;
if (restoreFile.exists()) {
try {
taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
}
catch (Exception e) {
LOG.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
return ImmutableList.of();
}
} else {
return ImmutableList.of();
}
final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = new ArrayList<>();
for (Map.Entry<File, TaskRestoreInfo> entry : taskRestoreInfos.entrySet()) {
final File baseDir = entry.getKey();
final TaskRestoreInfo taskRestoreInfo = entry.getValue();
for (final String taskId : taskRestoreInfo.getRunningTasks()) {
try {
dirTracker.addTask(taskId, baseDir);
final File taskFile = new File(dirTracker.getTaskDir(taskId), "task.json");
final Task task = jsonMapper.readValue(taskFile, Task.class);
for (final String taskId : taskRestoreInfo.getRunningTasks()) {
try {
final File taskFile = new File(tracker.findExistingTaskDir(taskId), "task.json");
final Task task = jsonMapper.readValue(taskFile, Task.class);
if (!task.getId().equals(taskId)) {
throw new ISE("Task[%s] restore file had wrong id[%s]", taskId, task.getId());
}
if (!task.getId().equals(taskId)) {
throw new ISE("Task[%s] restore file had wrong id[%s]", taskId, task.getId());
}
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
LOG.info("Restoring task[%s] from location[%s].", task.getId(), baseDir);
retVal.add(Pair.of(task, run(task)));
} else {
dirTracker.removeTask(taskId);
}
}
catch (Exception e) {
LOG.warn(e, "Failed to restore task[%s] from path[%s]. Trying to restore other tasks.",
taskId, baseDir);
dirTracker.removeTask(taskId);
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
LOG.info("Restoring task[%s].", task.getId());
retVal.add(Pair.of(task, run(task)));
}
}
catch (Exception e) {
LOG.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
}
}
if (!retVal.isEmpty()) {
@ -188,54 +182,24 @@ public abstract class BaseRestorableTaskRunner<WorkItemType extends TaskRunnerWo
@GuardedBy("tasks")
protected void saveRunningTasks()
{
final Map<File, List<String>> theTasks = new HashMap<>();
final File restoreFile = getRestoreFile();
final List<String> theTasks = new ArrayList<>();
for (TaskRunnerWorkItem forkingTaskRunnerWorkItem : tasks.values()) {
final String taskId = forkingTaskRunnerWorkItem.getTaskId();
final File restoreFile = getRestoreFile(taskId);
theTasks.computeIfAbsent(restoreFile, k -> new ArrayList<>())
.add(taskId);
theTasks.add(forkingTaskRunnerWorkItem.getTaskId());
}
for (Map.Entry<File, List<String>> entry : theTasks.entrySet()) {
final File restoreFile = entry.getKey();
final TaskRestoreInfo taskRestoreInfo = new TaskRestoreInfo(entry.getValue());
try {
Files.createParentDirs(restoreFile);
jsonMapper.writeValue(restoreFile, taskRestoreInfo);
LOG.debug("Save restore file at [%s] for tasks [%s]",
restoreFile.getAbsoluteFile(), Arrays.toString(theTasks.get(restoreFile).toArray()));
}
catch (Exception e) {
LOG.warn(e, "Failed to save tasks to restore file[%s]. Skipping this save.", restoreFile);
}
try {
Files.createParentDirs(restoreFile);
jsonMapper.writeValue(restoreFile, new TaskRestoreInfo(theTasks));
}
catch (Exception e) {
LOG.warn(e, "Failed to save tasks to restore file[%s]. Skipping this save.", restoreFile);
}
}
@Override
public void stop()
protected File getRestoreFile()
{
if (!taskConfig.isRestoreTasksOnRestart()) {
return;
}
for (File baseDir : dirTracker.getBaseTaskDirs()) {
File restoreFile = new File(baseDir, TASK_RESTORE_FILENAME);
if (restoreFile.exists()) {
try {
TaskRestoreInfo taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
LOG.info("Path[%s] contains restore data for tasks[%s] on restart",
baseDir, taskRestoreInfo.getRunningTasks());
}
catch (Exception e) {
LOG.error(e, "Failed to read task restore info from file[%s].", restoreFile);
}
}
}
}
private File getRestoreFile(String taskId)
{
return new File(dirTracker.getBaseTaskDir(taskId), TASK_RESTORE_FILENAME);
return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
}
protected static class TaskRestoreInfo

View File

@ -155,9 +155,20 @@ public class ForkingTaskRunner
public TaskStatus call()
{
final String attemptId = String.valueOf(getNextAttemptID(dirTracker, task.getId()));
final String baseTaskDir = dirTracker.getBaseTaskDir(task.getId()).getAbsolutePath();
final File taskDir = dirTracker.getTaskDir(task.getId());
final File baseDirForTask;
try {
baseDirForTask = getTracker().pickBaseDir(task.getId());
}
catch (RuntimeException e) {
LOG.error(e, "Failed to get directory for task [%s], cannot schedule.", task.getId());
return TaskStatus.failure(
task.getId(),
StringUtils.format("Could not schedule due to error [%s]", e.getMessage())
);
}
final File taskDir = new File(baseDirForTask, task.getId());
final String attemptId = String.valueOf(getNextAttemptID(taskDir));
final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", attemptId).toFile();
final ProcessHolder processHolder;
@ -368,12 +379,12 @@ public class ForkingTaskRunner
// for more information
// command.add("-XX:+UseThreadPriorities");
// command.add("-XX:ThreadPriorityPolicy=42");
command.add(StringUtils.format("-Ddruid.indexer.task.baseTaskDir=%s", baseDirForTask.getAbsolutePath()));
command.add("org.apache.druid.cli.Main");
command.add("internal");
command.add("peon");
command.add(baseTaskDir);
command.add(task.getId());
command.add(taskDir.toString());
command.add(attemptId);
String nodeType = task.getNodeType();
if (nodeType != null) {
@ -578,7 +589,6 @@ public class ForkingTaskRunner
} else {
LOGGER.warn("Ran out of time, not waiting for executor to finish!");
}
super.stop();
}
@Override
@ -891,9 +901,8 @@ public class ForkingTaskRunner
}
@VisibleForTesting
static int getNextAttemptID(TaskStorageDirTracker dirTracker, String taskId)
static int getNextAttemptID(File taskDir)
{
File taskDir = dirTracker.getTaskDir(taskId);
File attemptDir = new File(taskDir, "attempt");
try {
FileUtils.mkdirp(attemptDir);

View File

@ -155,8 +155,21 @@ public class ThreadingTaskRunner
@Override
public TaskStatus call()
{
final File baseDirForTask;
try {
baseDirForTask = getTracker().pickBaseDir(task.getId());
}
catch (RuntimeException e) {
LOG.error(e, "Failed to get directory for task [%s], cannot schedule.", task.getId());
return TaskStatus.failure(
task.getId(),
StringUtils.format("Could not schedule due to error [%s]", e.getMessage())
);
}
final File taskDir = new File(baseDirForTask, task.getId());
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = dirTracker.getTaskDir(task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
final TaskLocation taskLocation = TaskLocation.create(
@ -199,7 +212,7 @@ public class ThreadingTaskRunner
.setName(StringUtils.format("[%s]-%s", task.getId(), priorThreadName));
TaskStatus taskStatus;
final TaskToolbox toolbox = toolboxFactory.build(task);
final TaskToolbox toolbox = toolboxFactory.build(baseDirForTask, task);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
TaskRunnerUtils.notifyStatusChanged(
listeners,
@ -414,7 +427,6 @@ public class ThreadingTaskRunner
}
appenderatorsManager.shutdown();
super.stop();
}
@Override

View File

@ -35,7 +35,7 @@ import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
@ -78,11 +78,6 @@ import java.util.stream.Collectors;
*/
public class WorkerTaskManager
{
private static final String TEMP_WORKER = "workerTaskManagerTmp";
private static final String ASSIGNED = "assignedTasks";
private static final String COMPLETED = "completedTasks";
private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class);
private final ObjectMapper jsonMapper;
@ -108,15 +103,14 @@ public class WorkerTaskManager
private final AtomicBoolean disabled = new AtomicBoolean(false);
private final DruidLeaderClient overlordClient;
private final TaskStorageDirTracker dirTracker;
private final File storageDir;
@Inject
public WorkerTaskManager(
ObjectMapper jsonMapper,
TaskRunner taskRunner,
@IndexingService DruidLeaderClient overlordClient,
TaskStorageDirTracker dirTracker
TaskConfig taskConfig,
@IndexingService DruidLeaderClient overlordClient
)
{
this.jsonMapper = jsonMapper;
@ -124,7 +118,8 @@ public class WorkerTaskManager
this.exec = Execs.singleThreaded("WorkerTaskManager-NoticeHandler");
this.completedTasksCleanupExecutor = Execs.scheduledSingleThreaded("WorkerTaskManager-CompletedTasksCleaner");
this.overlordClient = overlordClient;
this.dirTracker = dirTracker;
storageDir = taskConfig.getBaseTaskDir();
}
@LifecycleStart
@ -137,7 +132,7 @@ public class WorkerTaskManager
synchronized (lock) {
try {
log.debug("Starting...");
cleanupAndMakeTmpTaskDirs();
cleanupAndMakeTmpTaskDir();
registerLocationListener();
restoreRestorableTasks();
initAssignedTasks();
@ -284,8 +279,8 @@ public class WorkerTaskManager
try {
FileUtils.writeAtomically(
getAssignedTaskFile(task.getId()),
getTmpTaskDir(task.getId()),
new File(getAssignedTaskDir(), task.getId()),
getTmpTaskDir(),
os -> {
jsonMapper.writeValue(os, task);
return null;
@ -312,21 +307,14 @@ public class WorkerTaskManager
submitNoticeToExec(new RunNotice(task));
}
private File getTmpTaskDir(String taskId)
private File getTmpTaskDir()
{
return new File(dirTracker.getBaseTaskDir(taskId), TEMP_WORKER);
return new File(storageDir, "workerTaskManagerTmp");
}
private void cleanupAndMakeTmpTaskDirs() throws IOException
private void cleanupAndMakeTmpTaskDir() throws IOException
{
for (File baseTaskDir : dirTracker.getBaseTaskDirs()) {
cleanupAndMakeTmpTaskDir(baseTaskDir);
}
}
private void cleanupAndMakeTmpTaskDir(File baseTaskDir) throws IOException
{
File tmpDir = new File(baseTaskDir, TEMP_WORKER);
File tmpDir = getTmpTaskDir();
FileUtils.mkdirp(tmpDir);
if (!tmpDir.isDirectory()) {
throw new ISE("Tmp Tasks Dir [%s] does not exist/not-a-directory.", tmpDir);
@ -341,29 +329,14 @@ public class WorkerTaskManager
}
}
public File getAssignedTaskFile(String taskId)
public File getAssignedTaskDir()
{
return new File(new File(dirTracker.getBaseTaskDir(taskId), ASSIGNED), taskId);
}
public List<File> getAssignedTaskDirs()
{
return dirTracker.getBaseTaskDirs()
.stream()
.map(location -> new File(location.getPath(), ASSIGNED))
.collect(Collectors.toList());
return new File(storageDir, "assignedTasks");
}
private void initAssignedTasks() throws IOException
{
for (File baseTaskDir : dirTracker.getBaseTaskDirs()) {
initAssignedTasks(baseTaskDir);
}
}
private void initAssignedTasks(File baseTaskDir) throws IOException
{
File assignedTaskDir = new File(baseTaskDir, ASSIGNED);
File assignedTaskDir = getAssignedTaskDir();
log.debug("Looking for any previously assigned tasks on disk[%s].", assignedTaskDir);
@ -401,7 +374,7 @@ public class WorkerTaskManager
private void cleanupAssignedTask(Task task)
{
assignedTasks.remove(task.getId());
File taskFile = getAssignedTaskFile(task.getId());
File taskFile = new File(getAssignedTaskDir(), task.getId());
try {
Files.delete(taskFile.toPath());
}
@ -457,17 +430,9 @@ public class WorkerTaskManager
}
}
public List<File> getCompletedTaskDirs()
public File getCompletedTaskDir()
{
return dirTracker.getBaseTaskDirs()
.stream()
.map(location -> new File(location.getPath(), COMPLETED))
.collect(Collectors.toList());
}
public File getCompletedTaskFile(String taskId)
{
return new File(new File(dirTracker.getBaseTaskDir(taskId), COMPLETED), taskId);
return new File(storageDir, "completedTasks");
}
private void moveFromRunningToCompleted(String taskId, TaskAnnouncement taskAnnouncement)
@ -478,7 +443,7 @@ public class WorkerTaskManager
try {
FileUtils.writeAtomically(
getCompletedTaskFile(taskId), getTmpTaskDir(taskId),
new File(getCompletedTaskDir(), taskId), getTmpTaskDir(),
os -> {
jsonMapper.writeValue(os, taskAnnouncement);
return null;
@ -494,14 +459,7 @@ public class WorkerTaskManager
private void initCompletedTasks() throws IOException
{
for (File baseTaskDir : dirTracker.getBaseTaskDirs()) {
initCompletedTasks(baseTaskDir);
}
}
private void initCompletedTasks(File baseTaskDir) throws IOException
{
File completedTaskDir = new File(baseTaskDir, COMPLETED);
File completedTaskDir = getCompletedTaskDir();
log.debug("Looking for any previously completed tasks on disk[%s].", completedTaskDir);
FileUtils.mkdirp(completedTaskDir);
@ -598,7 +556,7 @@ public class WorkerTaskManager
);
completedTasks.remove(taskId);
File taskFile = getCompletedTaskFile(taskId);
File taskFile = new File(getCompletedTaskDir(), taskId);
try {
Files.deleteIfExists(taskFile.toPath());
changeHistory.addChangeRequest(new WorkerHistoryItem.TaskRemoval(taskId));
@ -606,9 +564,6 @@ public class WorkerTaskManager
catch (IOException ex) {
log.error(ex, "Failed to delete completed task from disk [%s].", taskFile);
}
finally {
dirTracker.removeTask(taskId);
}
}
}

View File

@ -31,7 +31,7 @@ import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -63,13 +63,13 @@ public class WorkerTaskMonitor extends WorkerTaskManager
public WorkerTaskMonitor(
ObjectMapper jsonMapper,
TaskRunner taskRunner,
TaskConfig taskConfig,
CuratorFramework cf,
WorkerCuratorCoordinator workerCuratorCoordinator,
@IndexingService DruidLeaderClient overlordClient,
TaskStorageDirTracker dirTracker
@IndexingService DruidLeaderClient overlordClient
)
{
super(jsonMapper, taskRunner, overlordClient, dirTracker);
super(jsonMapper, taskRunner, taskConfig, overlordClient);
this.jsonMapper = jsonMapper;
this.pathChildrenCache = new PathChildrenCache(

View File

@ -108,7 +108,7 @@ public class ExecutorLifecycle
// pod twice, no need to lock.
if (taskExecutorConfig.isParentStreamDefined()) {
// Avoid running the same task twice on the same machine by locking the task base directory.
final File taskLockFile = Preconditions.checkNotNull(taskExecutorConfig.getLockFile(), "lockfile is null");
final File taskLockFile = taskConfig.getTaskLockFile(task.getId());
try {
synchronized (this) {
if (taskLockChannel == null && taskLockFileLock == null) {

View File

@ -39,10 +39,6 @@ public class ExecutorLifecycleConfig
@NotNull
private File statusFile = null;
@JsonProperty
@NotNull
private File lockFile = null;
@JsonProperty
@Pattern(regexp = "\\{stdin}")
private String parentStreamName = "stdin";
@ -79,17 +75,6 @@ public class ExecutorLifecycleConfig
return this;
}
public File getLockFile()
{
return lockFile;
}
public ExecutorLifecycleConfig setLockFile(File lockFile)
{
this.lockFile = lockFile;
return this;
}
public ExecutorLifecycleConfig setParentStreamDefined(boolean parentStreamDefined)
{
this.parentStreamDefined = parentStreamDefined;

View File

@ -29,7 +29,6 @@ import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
@ -91,6 +90,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
private final long intermediaryPartitionDiscoveryPeriodSec;
private final long intermediaryPartitionCleanupPeriodSec;
private final Period intermediaryPartitionTimeout;
private final TaskConfig taskConfig;
private final List<StorageLocation> shuffleDataLocations;
private final OverlordClient overlordClient;
@ -109,26 +109,23 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
@MonotonicNonNull
private ScheduledExecutorService supervisorTaskChecker;
private final TaskStorageDirTracker dirTracker;
@Inject
public LocalIntermediaryDataManager(
WorkerConfig workerConfig,
TaskConfig taskConfig,
OverlordClient overlordClient,
TaskStorageDirTracker dirTracker
OverlordClient overlordClient
)
{
this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
this.taskConfig = taskConfig;
this.shuffleDataLocations = taskConfig
.getShuffleDataLocations()
.stream()
.map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent()))
.collect(Collectors.toList());
this.overlordClient = overlordClient;
this.dirTracker = dirTracker;
}
@Override
@ -295,7 +292,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
);
// Create a zipped segment in a temp directory.
final File taskTempDir = dirTracker.getTaskTempDir(subTaskId);
final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
final Closer closer = Closer.create();
closer.register(() -> {
try {

View File

@ -20,59 +20,92 @@
package org.apache.druid.indexing.common;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public class TaskStorageDirTrackerTest
{
private TaskStorageDirTracker dirTracker;
@Before
public void setup()
{
dirTracker = new TaskStorageDirTracker(ImmutableList.of("A", "B", "C"));
}
@ClassRule
public static final TemporaryFolder TMP = new TemporaryFolder();
@Test
public void testGetOrSelectTaskDir()
public void testGetOrSelectTaskDir() throws IOException
{
// Test round-robin allocation
Assert.assertEquals(dirTracker.getBaseTaskDir("task0").getPath(), "A");
Assert.assertEquals(dirTracker.getBaseTaskDir("task1").getPath(), "B");
Assert.assertEquals(dirTracker.getBaseTaskDir("task2").getPath(), "C");
Assert.assertEquals(dirTracker.getBaseTaskDir("task3").getPath(), "A");
Assert.assertEquals(dirTracker.getBaseTaskDir("task4").getPath(), "B");
Assert.assertEquals(dirTracker.getBaseTaskDir("task5").getPath(), "C");
File tmpFolder = TMP.newFolder();
List<File> files = ImmutableList.of(
new File(tmpFolder, "A"),
new File(tmpFolder, "B"),
new File(tmpFolder, "C")
);
final TaskStorageDirTracker tracker = new TaskStorageDirTracker(files);
tracker.ensureDirectories();
validateRoundRobinAllocation(tmpFolder, tracker);
for (File file : Objects.requireNonNull(tmpFolder.listFiles())) {
FileUtils.deleteDirectory(file);
}
final TaskStorageDirTracker otherTracker = TaskStorageDirTracker.fromConfigs(
new WorkerConfig()
{
@Override
public List<String> getBaseTaskDirs()
{
return files.stream().map(File::toString).collect(Collectors.toList());
}
},
null
);
otherTracker.ensureDirectories();
validateRoundRobinAllocation(tmpFolder, otherTracker);
}
private void validateRoundRobinAllocation(File tmpFolder, TaskStorageDirTracker dirTracker) throws IOException
{
// Test round-robin allocation, it starts from "C" and goes "backwards" because the counter is initialized
// negatively, which modulos to 2 -> 1 -> 0
Assert.assertEquals(new File(tmpFolder, "C").toString(), dirTracker.pickBaseDir("task0").getPath());
Assert.assertEquals(new File(tmpFolder, "B").toString(), dirTracker.pickBaseDir("task1").getPath());
Assert.assertEquals(new File(tmpFolder, "A").toString(), dirTracker.pickBaseDir("task2").getPath());
Assert.assertEquals(new File(tmpFolder, "C").toString(), dirTracker.pickBaseDir("task3").getPath());
Assert.assertEquals(new File(tmpFolder, "B").toString(), dirTracker.pickBaseDir("task4").getPath());
Assert.assertEquals(new File(tmpFolder, "A").toString(), dirTracker.pickBaseDir("task5").getPath());
// Test that the result is always the same
FileUtils.mkdirp(new File(new File(tmpFolder, "C"), "task0"));
for (int i = 0; i < 10; i++) {
Assert.assertEquals(dirTracker.getBaseTaskDir("task0").getPath(), "A");
Assert.assertEquals(new File(tmpFolder, "C").toString(), dirTracker.pickBaseDir("task0").getPath());
}
}
@Test
public void testAddTask()
public void testFallBackToTaskConfig() throws IOException
{
// Test add after get. task0 -> "A"
Assert.assertEquals(dirTracker.getBaseTaskDir("task0").getPath(), "A");
dirTracker.addTask("task0", new File("A"));
Assert.assertEquals(dirTracker.getBaseTaskDir("task0").getPath(), "A");
final File baseDir = new File(TMP.newFolder(), "A");
final TaskStorageDirTracker tracker = TaskStorageDirTracker.fromConfigs(
new WorkerConfig(),
new TaskConfigBuilder().setBaseDir(baseDir.toString()).build()
);
tracker.ensureDirectories();
// Assign base path directly
dirTracker.addTask("task1", new File("C"));
Assert.assertEquals(dirTracker.getBaseTaskDir("task1").getPath(), "C");
}
@Test
public void testAddTaskThrowsISE()
{
// Test add after get. task0 -> "A"
Assert.assertEquals(dirTracker.getBaseTaskDir("task0").getPath(), "A");
Assert.assertThrows(ISE.class, () -> dirTracker.addTask("task0", new File("B")));
final String expected = new File(baseDir, "persistent/task").toString();
Assert.assertEquals(expected, tracker.pickBaseDir("task0").getPath());
Assert.assertEquals(expected, tracker.pickBaseDir("task1").getPath());
Assert.assertEquals(expected, tracker.pickBaseDir("task2").getPath());
Assert.assertEquals(expected, tracker.pickBaseDir("task3").getPath());
Assert.assertEquals(expected, tracker.pickBaseDir("task1").getPath());
Assert.assertEquals(expected, tracker.pickBaseDir("task10293721").getPath());
}
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
@ -105,23 +106,12 @@ public class TaskToolboxTest
EasyMock.expect(mockIndexMergerV9.create(true)).andReturn(indexMergerV9).anyTimes();
EasyMock.replay(task, mockHandoffNotifierFactory, mockIndexMergerV9);
TaskConfig taskConfig = new TaskConfig(
temporaryFolder.newFile().toString(),
null,
null,
50000,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(temporaryFolder.newFile().toString())
.setDefaultRowFlushBoundary(50000)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
taskToolbox = new TaskToolboxFactory(
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
@ -160,8 +150,7 @@ public class TaskToolboxTest
null,
null,
null,
"1",
new TaskStorageDirTracker(taskConfig)
"1"
);
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.config;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.joda.time.Period;
import java.util.List;
public class TaskConfigBuilder
{
private String baseDir;
private String baseTaskDir;
private String hadoopWorkingPath;
private Integer defaultRowFlushBoundary;
private List<String> defaultHadoopCoordinates;
private boolean restoreTasksOnRestart;
private Period gracefulShutdownTimeout;
private Period directoryLockTimeout;
private List<StorageLocationConfig> shuffleDataLocations;
private boolean ignoreTimestampSpecForDruidInputSource;
private boolean batchMemoryMappedIndex; // deprecated; only set to true to fall back to older behavior
private String batchProcessingMode;
private Boolean storeEmptyColumns;
private boolean enableTaskLevelLogPush;
public TaskConfigBuilder setBaseDir(String baseDir)
{
this.baseDir = baseDir;
return this;
}
public TaskConfigBuilder setBaseTaskDir(String baseTaskDir)
{
this.baseTaskDir = baseTaskDir;
return this;
}
public TaskConfigBuilder setHadoopWorkingPath(String hadoopWorkingPath)
{
this.hadoopWorkingPath = hadoopWorkingPath;
return this;
}
public TaskConfigBuilder setDefaultRowFlushBoundary(Integer defaultRowFlushBoundary)
{
this.defaultRowFlushBoundary = defaultRowFlushBoundary;
return this;
}
public TaskConfigBuilder setDefaultHadoopCoordinates(List<String> defaultHadoopCoordinates)
{
this.defaultHadoopCoordinates = defaultHadoopCoordinates;
return this;
}
public TaskConfigBuilder setRestoreTasksOnRestart(boolean restoreTasksOnRestart)
{
this.restoreTasksOnRestart = restoreTasksOnRestart;
return this;
}
public TaskConfigBuilder setGracefulShutdownTimeout(Period gracefulShutdownTimeout)
{
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
return this;
}
public TaskConfigBuilder setDirectoryLockTimeout(Period directoryLockTimeout)
{
this.directoryLockTimeout = directoryLockTimeout;
return this;
}
public TaskConfigBuilder setShuffleDataLocations(List<StorageLocationConfig> shuffleDataLocations)
{
this.shuffleDataLocations = shuffleDataLocations;
return this;
}
public TaskConfigBuilder setIgnoreTimestampSpecForDruidInputSource(boolean ignoreTimestampSpecForDruidInputSource)
{
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
return this;
}
public TaskConfigBuilder setBatchMemoryMappedIndex(boolean batchMemoryMappedIndex)
{
this.batchMemoryMappedIndex = batchMemoryMappedIndex;
return this;
}
public TaskConfigBuilder setBatchProcessingMode(String batchProcessingMode)
{
this.batchProcessingMode = batchProcessingMode;
return this;
}
public TaskConfigBuilder setStoreEmptyColumns(Boolean storeEmptyColumns)
{
this.storeEmptyColumns = storeEmptyColumns;
return this;
}
public TaskConfigBuilder setEnableTaskLevelLogPush(boolean enableTaskLevelLogPush)
{
this.enableTaskLevelLogPush = enableTaskLevelLogPush;
return this;
}
public TaskConfig build()
{
return new TaskConfig(
baseDir,
baseTaskDir,
hadoopWorkingPath,
defaultRowFlushBoundary,
defaultHadoopCoordinates,
restoreTasksOnRestart,
gracefulShutdownTimeout,
directoryLockTimeout,
shuffleDataLocations,
ignoreTimestampSpecForDruidInputSource,
batchMemoryMappedIndex,
batchProcessingMode,
storeEmptyColumns,
enableTaskLevelLogPush
);
}
}

View File

@ -19,10 +19,8 @@
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.UpdateStatusAction;
@ -57,6 +55,10 @@ public class AbstractTaskTest
@Test
public void testSetupAndCleanupIsCalledWtihParameter() throws Exception
{
// These tests apparently use Mockito. Mockito is bad as we've seen it rewrite byte code and effectively cause
// impact to other totally unrelated tests. Mockito needs to be completely erradicated from the codebase. This
// comment is here to either cause me to do it in this commit or just for posterity so that it is clear that it
// should happen in the future.
TaskToolbox toolbox = mock(TaskToolbox.class);
when(toolbox.getAttemptId()).thenReturn("1");
@ -68,11 +70,9 @@ public class AbstractTaskTest
TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(true);
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(
ImmutableList.of(temporaryFolder.newFolder().getAbsolutePath())
);
when(toolbox.getDirTracker()).thenReturn(dirTracker);
TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@ -87,10 +87,7 @@ public class AbstractTaskTest
{
// create a reports file to test the taskLogPusher pushes task reports
String result = super.setup(toolbox);
File attemptDir = Paths.get(
dirTracker.getTaskDir("myID").getAbsolutePath(),
"attempt", toolbox.getAttemptId()
).toFile();
File attemptDir = Paths.get(folder.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile();
File reportsDir = new File(attemptDir, "report.json");
FileUtils.write(reportsDir, "foo", StandardCharsets.UTF_8);
return result;
@ -118,9 +115,8 @@ public class AbstractTaskTest
TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(false);
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(ImmutableList.of(folder.getAbsolutePath()));
when(toolbox.getDirTracker()).thenReturn(dirTracker);
TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@ -163,9 +159,8 @@ public class AbstractTaskTest
TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(true);
File folder = temporaryFolder.newFolder();
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(ImmutableList.of(folder.getAbsolutePath()));
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
when(toolbox.getDirTracker()).thenReturn(dirTracker);
TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);

View File

@ -54,7 +54,6 @@ import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
@ -63,6 +62,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
@ -1544,23 +1544,12 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
};
taskLockbox = new TaskLockbox(taskStorage, mdc);
final TaskConfig taskConfig = new TaskConfig(
directory.getPath(),
null,
null,
50000,
null,
true,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(directory.getPath())
.setDefaultRowFlushBoundary(50000)
.setRestoreTasksOnRestart(true)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
@ -1654,8 +1643,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
null,
null,
null,
"1",
new TaskStorageDirTracker(taskConfig)
"1"
);
}

View File

@ -25,9 +25,9 @@ import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -567,23 +567,9 @@ public class BatchAppenderatorsTest
TaskConfig.BatchProcessingMode mode
)
{
TaskConfig config = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
mode.name(),
null,
false,
null
);
TaskConfig config = new TaskConfigBuilder()
.setBatchProcessingMode(mode.name())
.build();
return new TaskToolbox.Builder()
.config(config)
.joinableFactory(NoopJoinableFactory.INSTANCE)
@ -596,7 +582,6 @@ public class BatchAppenderatorsTest
.appenderatorsManager(new TestAppenderatorsManager())
.taskLogPusher(null)
.attemptId("1")
.dirTracker(new TaskStorageDirTracker(config))
.build();
}

View File

@ -41,10 +41,10 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.overlord.Segments;
@ -1589,23 +1589,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
objectMapper
);
final TaskConfig config = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig config = new TaskConfigBuilder()
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
return new TaskToolbox.Builder()
.config(config)
.taskActionClient(createActionClient(task))
@ -1626,7 +1612,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
.coordinatorClient(coordinatorClient)
.taskLogPusher(null)
.attemptId("1")
.dirTracker(new TaskStorageDirTracker(config))
.build();
}

View File

@ -57,13 +57,13 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
@ -1908,23 +1908,9 @@ public class CompactionTaskTest
}
};
final TaskConfig config = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig config = new TaskConfigBuilder()
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
return new TaskToolbox.Builder()
.config(config)
.taskActionClient(taskActionClient)
@ -1945,7 +1931,6 @@ public class CompactionTaskTest
.segmentCacheManager(segmentCacheManager)
.taskLogPusher(null)
.attemptId("1")
.dirTracker(new TaskStorageDirTracker(config))
.build();
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.timeline.DataSegment;
@ -106,23 +107,13 @@ public class HadoopTaskTest
}
};
final TaskToolbox toolbox = EasyMock.createStrictMock(TaskToolbox.class);
EasyMock.expect(toolbox.getConfig()).andReturn(new TaskConfig(
temporaryFolder.newFolder().toString(),
null,
null,
null,
ImmutableList.of("something:hadoop:1"),
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
)).once();
EasyMock.expect(toolbox.getConfig()).andReturn(
new TaskConfigBuilder()
.setBaseDir(temporaryFolder.newFolder().toString())
.setDefaultHadoopCoordinates(ImmutableList.of("something:hadoop:1"))
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build()
).once();
EasyMock.replay(toolbox);
final ClassLoader classLoader = task.buildClassLoader(toolbox);

View File

@ -38,7 +38,6 @@ import org.apache.druid.data.input.impl.RegexParseSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
@ -48,6 +47,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -372,23 +372,9 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
StringUtils.format("ingestionTestBase-%s.json", System.currentTimeMillis())
);
final TaskConfig config = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig config = new TaskConfigBuilder()
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final TaskToolbox box = new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
@ -408,7 +394,6 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
.appenderatorsManager(new TestAppenderatorsManager())
.taskLogPusher(null)
.attemptId("1")
.dirTracker(new TaskStorageDirTracker(config))
.build();

View File

@ -43,7 +43,6 @@ import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestFirehose;
@ -53,6 +52,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -898,23 +898,12 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
final File directory
)
{
final TaskConfig taskConfig = new TaskConfig(
directory.getPath(),
null,
null,
50000,
null,
true,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(directory.getPath())
.setDefaultRowFlushBoundary(50000)
.setRestoreTasksOnRestart(true)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
@ -1026,8 +1015,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
null,
null,
null,
"1",
new TaskStorageDirTracker(taskConfig)
"1"
);
return toolboxFactory.build(task);

View File

@ -51,11 +51,11 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
@ -247,29 +247,11 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
taskRunner = new SimpleThreadingTaskRunner(testName.getMethodName());
objectMapper = getObjectMapper();
indexingServiceClient = new LocalOverlordClient(objectMapper, taskRunner);
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
intermediaryDataManager = new LocalIntermediaryDataManager(
new WorkerConfig(),
taskConfig,
null,
new TaskStorageDirTracker(taskConfig)
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setShuffleDataLocations(ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)))
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
intermediaryDataManager = new LocalIntermediaryDataManager(new WorkerConfig(), taskConfig, null);
remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor");
coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor);
prepareObjectMapper(objectMapper, getIndexIO());
@ -652,23 +634,9 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO)
{
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
objectMapper.setInjectableValues(
new InjectableValues.Std()
@ -702,23 +670,9 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
{
TaskConfig config = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskConfig config = new TaskConfigBuilder()
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
return new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
@ -754,7 +708,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
.shuffleClient(new LocalShuffleClient(intermediaryDataManager))
.taskLogPusher(null)
.attemptId("1")
.dirTracker(new TaskStorageDirTracker(config))
.build();
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
@ -192,24 +193,8 @@ public class ForkingTaskRunnerTest
@Test
public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
{
TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.build();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
taskConfig,
@ -219,7 +204,7 @@ public class ForkingTaskRunnerTest
new DefaultObjectMapper(),
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig(),
dirTracker
TaskStorageDirTracker.fromConfigs(null, taskConfig)
)
{
@Override
@ -262,24 +247,9 @@ public class ForkingTaskRunnerTest
ObjectMapper mapper = new DefaultObjectMapper();
Task task = NoopTask.create();
File file = temporaryFolder.newFolder();
TaskConfig taskConfig = new TaskConfig(
null,
file.toString(),
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.setBaseTaskDir(file.toString())
.build();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
taskConfig,
@ -289,7 +259,7 @@ public class ForkingTaskRunnerTest
mapper,
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig(),
dirTracker
TaskStorageDirTracker.fromConfigs(null, taskConfig)
)
{
@Override
@ -301,7 +271,7 @@ public class ForkingTaskRunnerTest
for (String param : command) {
if (param.endsWith(task.getId())) {
File resultFile = Paths.get(dirTracker.getTaskDir(task.getId()).getAbsolutePath(), "attempt", "1", "status.json").toFile();
File resultFile = Paths.get(getTracker().findExistingTaskDir(task.getId()).getAbsolutePath(), "attempt", "1", "status.json").toFile();
mapper.writeValue(resultFile, TaskStatus.success(task.getId()));
break;
}
@ -337,24 +307,10 @@ public class ForkingTaskRunnerTest
ObjectMapper mapper = new DefaultObjectMapper();
Task task = NoopTask.create();
File file = temporaryFolder.newFolder();
TaskConfig taskConfig = new TaskConfig(
null,
file.toString(),
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.setBaseTaskDir(file.toString())
.build();
TaskStorageDirTracker dirTracker = TaskStorageDirTracker.fromConfigs(null, taskConfig);
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
taskConfig,
@ -376,7 +332,7 @@ public class ForkingTaskRunnerTest
for (String param : command) {
if (param.endsWith(task.getId())) {
File resultFile = Paths.get(dirTracker.getTaskDir(task.getId()).getAbsolutePath(), "attempt", "1", "status.json").toFile();
File resultFile = Paths.get(dirTracker.findExistingTaskDir(task.getId()).getAbsolutePath(), "attempt", "1", "status.json").toFile();
mapper.writeValue(resultFile, TaskStatus.failure(task.getId(), "task failure test"));
break;
}
@ -402,28 +358,14 @@ public class ForkingTaskRunnerTest
public void testGettingTheNextAttemptDir() throws IOException
{
File file = temporaryFolder.newFolder();
TaskConfig taskConfig = new TaskConfig(
null,
file.toString(),
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.setBaseTaskDir(file.toString())
.build();
TaskStorageDirTracker dirTracker = TaskStorageDirTracker.fromConfigs(null, taskConfig);
String taskId = "foo";
assertEquals(1, ForkingTaskRunner.getNextAttemptID(dirTracker, taskId));
assertEquals(2, ForkingTaskRunner.getNextAttemptID(dirTracker, taskId));
assertEquals(3, ForkingTaskRunner.getNextAttemptID(dirTracker, taskId));
assertEquals(1, ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickBaseDir(taskId), taskId)));
assertEquals(2, ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickBaseDir(taskId), taskId)));
assertEquals(3, ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickBaseDir(taskId), taskId)));
}
@Test
@ -448,24 +390,8 @@ public class ForkingTaskRunnerTest
final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
final AtomicInteger xmxJavaOptsIndex = new AtomicInteger(-1);
final AtomicInteger xmxJavaOptsArrayIndex = new AtomicInteger(-1);
TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.build();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
taskConfig,
@ -475,7 +401,7 @@ public class ForkingTaskRunnerTest
mapper,
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig(),
dirTracker
TaskStorageDirTracker.fromConfigs(null, taskConfig)
)
{
@Override
@ -519,24 +445,8 @@ public class ForkingTaskRunnerTest
+ " }\n"
+ "}";
final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.build();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
taskConfig,
@ -546,7 +456,7 @@ public class ForkingTaskRunnerTest
mapper,
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig(),
dirTracker
TaskStorageDirTracker.fromConfigs(null, taskConfig)
)
{
@Override
@ -580,27 +490,15 @@ public class ForkingTaskRunnerTest
@Test
public void testCannotRestoreTasks() throws Exception
{
TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.build();
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(
ImmutableList.of(
temporaryFolder.newFolder().getAbsolutePath(),
temporaryFolder.newFolder().getAbsolutePath()
temporaryFolder.newFolder().getAbsoluteFile(),
temporaryFolder.newFolder().getAbsoluteFile()
)
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
taskConfig,
@ -629,4 +527,14 @@ public class ForkingTaskRunnerTest
forkingTaskRunner.run(task);
Assert.assertTrue(forkingTaskRunner.restore().isEmpty());
}
public static TaskConfigBuilder makeDefaultTaskConfigBuilder()
{
return new TaskConfigBuilder()
.setDefaultHadoopCoordinates(ImmutableList.of())
.setGracefulShutdownTimeout(new Period("PT0S"))
.setDirectoryLockTimeout(new Period("PT10S"))
.setShuffleDataLocations(ImmutableList.of())
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name());
}
}

View File

@ -26,13 +26,13 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
@ -87,23 +87,12 @@ public class SingleTaskBackgroundRunnerTest
{
final TestUtils utils = new TestUtils();
final DruidNode node = new DruidNode("testServer", "testHost", false, 1000, null, true, false);
final TaskConfig taskConfig = new TaskConfig(
temporaryFolder.newFile().toString(),
null,
null,
50000,
null,
true,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(temporaryFolder.newFile().toString())
.setDefaultRowFlushBoundary(50000)
.setRestoreTasksOnRestart(true)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final ServiceEmitter emitter = new NoopServiceEmitter();
EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
@ -144,8 +133,7 @@ public class SingleTaskBackgroundRunnerTest
null,
null,
null,
"1",
new TaskStorageDirTracker(taskConfig)
"1"
);
runner = new SingleTaskBackgroundRunner(
toolboxFactory,

View File

@ -59,7 +59,6 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
@ -71,6 +70,7 @@ import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractFixedIntervalTask;
import org.apache.druid.indexing.common.task.IndexTask;
@ -607,28 +607,11 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
),
new TaskAuditLogConfig(true)
);
final String taskDirA = temporaryFolder.newFolder().toString();
final String taskDirB = temporaryFolder.newFolder().toString();
taskConfig = new TaskConfig(
null,
null,
null,
50000,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
ImmutableList.of(
taskDirA,
taskDirB
)
);
taskConfig = new TaskConfigBuilder()
.setBaseDir(temporaryFolder.newFolder().toString())
.setDefaultRowFlushBoundary(50000)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
return new TaskToolboxFactory(
taskConfig,
@ -713,8 +696,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
null,
null,
null,
"1",
new TaskStorageDirTracker(taskConfig)
"1"
);
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
@ -34,7 +33,6 @@ import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
@ -49,23 +47,7 @@ public class ThreadingTaskRunnerTest
@Test
public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws ExecutionException, InterruptedException
{
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = ForkingTaskRunnerTest.makeDefaultTaskConfigBuilder().build();
ThreadingTaskRunner runner = new ThreadingTaskRunner(
mockTaskToolboxFactory(),
taskConfig,
@ -75,7 +57,7 @@ public class ThreadingTaskRunnerTest
new TestAppenderatorsManager(),
new MultipleFileTaskReportFileWriter(),
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new TaskStorageDirTracker(taskConfig)
TaskStorageDirTracker.fromConfigs(null, taskConfig)
);
Future<TaskStatus> statusFuture = runner.run(new AbstractTask("id", "datasource", null)

View File

@ -55,7 +55,6 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
@ -64,6 +63,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
@ -564,23 +564,14 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
{
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
directory = tempFolder.newFolder();
final TaskConfig taskConfig = new TaskConfig(
new File(directory, "baseDir").getPath(),
new File(directory, "baseTaskDir").getPath(),
null,
50000,
null,
true,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig =
new TaskConfigBuilder()
.setBaseDir(new File(directory, "baseDir").getPath())
.setBaseTaskDir(new File(directory, "baseTaskDir").getPath())
.setDefaultRowFlushBoundary(50000)
.setRestoreTasksOnRestart(true)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
derbyConnector.createPendingSegmentsTable();
@ -707,8 +698,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
null,
null,
null,
"1",
new TaskStorageDirTracker(taskConfig)
"1"
);
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.NoopOverlordClient;
@ -29,7 +28,6 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestTasks;
@ -37,6 +35,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
@ -63,10 +62,10 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
*
*/
@RunWith(Parameterized.class)
public class WorkerTaskManagerTest
@ -79,11 +78,9 @@ public class WorkerTaskManagerTest
private final boolean restoreTasksOnRestart;
private final boolean useMultipleBaseTaskDirPaths;
private WorkerTaskManager workerTaskManager;
public WorkerTaskManagerTest(boolean restoreTasksOnRestart, boolean useMultipleBaseTaskDirPaths)
public WorkerTaskManagerTest(boolean restoreTasksOnRestart)
{
testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
@ -91,49 +88,25 @@ public class WorkerTaskManagerTest
indexMergerV9Factory = testUtils.getIndexMergerV9Factory();
indexIO = testUtils.getTestIndexIO();
this.restoreTasksOnRestart = restoreTasksOnRestart;
this.useMultipleBaseTaskDirPaths = useMultipleBaseTaskDirPaths;
}
@Parameterized.Parameters(name = "restoreTasksOnRestart = {0}, useMultipleBaseTaskDirPaths = {1}")
public static Collection<Object[]> getParameters()
{
Object[][] parameters = new Object[][]{
{false, false},
{true, false},
{false, true},
{true, true}
};
Object[][] parameters = new Object[][]{{false}, {true}};
return Arrays.asList(parameters);
}
private WorkerTaskManager createWorkerTaskManager()
{
List<String> baseTaskDirPaths = null;
if (useMultipleBaseTaskDirPaths) {
baseTaskDirPaths = ImmutableList.of(
FileUtils.createTempDir().toString(),
FileUtils.createTempDir().toString()
);
}
TaskConfig taskConfig = new TaskConfig(
FileUtils.createTempDir().toString(),
null,
null,
0,
null,
restoreTasksOnRestart,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
baseTaskDirPaths
);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(FileUtils.createTempDir().toString())
.setDefaultRowFlushBoundary(0)
.setRestoreTasksOnRestart(restoreTasksOnRestart)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes();
@ -181,14 +154,13 @@ public class WorkerTaskManagerTest
null,
null,
null,
"1",
dirTracker
"1"
),
taskConfig,
location
),
EasyMock.createNiceMock(DruidLeaderClient.class),
dirTracker
taskConfig,
EasyMock.createNiceMock(DruidLeaderClient.class)
)
{
@Override
@ -222,26 +194,21 @@ public class WorkerTaskManagerTest
Task task2 = createNoopTask("task2-completed-already");
Task task3 = createNoopTask("task3-assigned-explicitly");
for (File completedTaskDir : workerTaskManager.getCompletedTaskDirs()) {
FileUtils.mkdirp(completedTaskDir);
}
for (File assignedTaskDir : workerTaskManager.getAssignedTaskDirs()) {
FileUtils.mkdirp(assignedTaskDir);
}
FileUtils.mkdirp(workerTaskManager.getAssignedTaskDir());
FileUtils.mkdirp(workerTaskManager.getCompletedTaskDir());
// create a task in assigned task directory, to simulate MM shutdown right after a task was assigned.
jsonMapper.writeValue(workerTaskManager.getAssignedTaskFile(task1.getId()), task1);
jsonMapper.writeValue(new File(workerTaskManager.getAssignedTaskDir(), task1.getId()), task1);
// simulate an already completed task
jsonMapper.writeValue(
workerTaskManager.getCompletedTaskFile(task2.getId()),
new File(workerTaskManager.getCompletedTaskDir(), task2.getId()),
TaskAnnouncement.create(
task2,
TaskStatus.success(task2.getId()),
location
)
);
workerTaskManager.start();
Assert.assertTrue(workerTaskManager.getCompletedTasks().get(task2.getId()).getTaskStatus().isSuccess());
@ -250,8 +217,8 @@ public class WorkerTaskManagerTest
Thread.sleep(100);
}
Assert.assertTrue(workerTaskManager.getCompletedTasks().get(task1.getId()).getTaskStatus().isSuccess());
Assert.assertTrue(workerTaskManager.getCompletedTaskFile(task1.getId()).exists());
Assert.assertFalse(workerTaskManager.getAssignedTaskFile(task1.getId()).exists());
Assert.assertTrue(new File(workerTaskManager.getCompletedTaskDir(), task1.getId()).exists());
Assert.assertFalse(new File(workerTaskManager.getAssignedTaskDir(), task1.getId()).exists());
ChangeRequestsSnapshot<WorkerHistoryItem> baseHistory = workerTaskManager
.getChangesSince(new ChangeRequestHistory.Counter(-1, 0))
@ -283,8 +250,8 @@ public class WorkerTaskManagerTest
}
Assert.assertTrue(workerTaskManager.getCompletedTasks().get(task3.getId()).getTaskStatus().isSuccess());
Assert.assertTrue(workerTaskManager.getCompletedTaskFile(task3.getId()).exists());
Assert.assertFalse(workerTaskManager.getAssignedTaskFile(task3.getId()).exists());
Assert.assertTrue(new File(workerTaskManager.getCompletedTaskDir(), task3.getId()).exists());
Assert.assertFalse(new File(workerTaskManager.getAssignedTaskDir(), task3.getId()).exists());
ChangeRequestsSnapshot<WorkerHistoryItem> changes = workerTaskManager.getChangesSince(baseHistory.getCounter())
.get();

View File

@ -32,7 +32,6 @@ import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.IndexingServiceCondition;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestIndexTask;
import org.apache.druid.indexing.common.TestTasks;
@ -40,6 +39,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
@ -154,29 +154,17 @@ public class WorkerTaskMonitorTest
private WorkerTaskMonitor createTaskMonitor()
{
final TaskConfig taskConfig = new TaskConfig(
FileUtils.createTempDir().toString(),
null,
null,
0,
null,
false,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(FileUtils.createTempDir().toString())
.setDefaultRowFlushBoundary(0)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes();
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory);
final TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
return new WorkerTaskMonitor(
jsonMapper,
new SingleTaskBackgroundRunner(
@ -218,18 +206,17 @@ public class WorkerTaskMonitorTest
null,
null,
null,
"1",
dirTracker
"1"
),
taskConfig,
new NoopServiceEmitter(),
DUMMY_NODE,
new ServerConfig()
),
taskConfig,
cf,
workerCuratorCoordinator,
EasyMock.createNiceMock(DruidLeaderClient.class),
dirTracker
EasyMock.createNiceMock(DruidLeaderClient.class)
);
}

View File

@ -27,8 +27,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.rpc.indexing.OverlordClient;
@ -65,23 +65,10 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
@Before
public void setup() throws IOException
{
this.taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
this.taskConfig = new TaskConfigBuilder()
.setShuffleDataLocations(ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)))
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
this.overlordClient = new NoopOverlordClient()
{
@Override
@ -139,9 +126,8 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
// Setup data manager with expiry timeout 1s and initial delay of 1 second
WorkerConfig workerConfig = new TestWorkerConfig(1, 1, timeoutPeriod);
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
LocalIntermediaryDataManager intermediaryDataManager =
new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient, dirTracker);
new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient);
intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, segmentFile);
intermediaryDataManager

View File

@ -24,8 +24,8 @@ import com.google.common.io.ByteSource;
import com.google.common.primitives.Ints;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -69,26 +69,12 @@ public class LocalIntermediaryDataManagerManualAddAndDeleteTest
final WorkerConfig workerConfig = new WorkerConfig();
intermediarySegmentsLocation = tempDir.newFolder();
siblingLocation = tempDir.newFolder();
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 1200L, null)),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setShuffleDataLocations(ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 1200L, null)))
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final OverlordClient overlordClient = new NoopOverlordClient();
final TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient, dirTracker);
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient);
intermediaryDataManager.start();
}

View File

@ -32,8 +32,8 @@ import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
@ -99,27 +99,13 @@ public class ShuffleDataSegmentPusherTest
public void setup() throws IOException
{
final WorkerConfig workerConfig = new WorkerConfig();
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setShuffleDataLocations(ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)))
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final OverlordClient overlordClient = new NoopOverlordClient();
final TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
if (LOCAL.equals(intermediateDataStore)) {
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient, dirTracker);
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient);
} else if (DEEPSTORE.equals(intermediateDataStore)) {
localDeepStore = temporaryFolder.newFolder("localStorage");
intermediaryDataManager = new DeepStorageIntermediaryDataManager(

View File

@ -27,8 +27,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.common.Intervals;
@ -91,23 +91,10 @@ public class ShuffleResourceTest
}
};
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false,
null
);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setShuffleDataLocations(ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)))
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
final OverlordClient overlordClient = new NoopOverlordClient()
{
@Override
@ -120,8 +107,7 @@ public class ShuffleResourceTest
return Futures.immediateFuture(result);
}
};
final TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(taskConfig);
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient, dirTracker);
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient);
shuffleMetrics = new ShuffleMetrics();
shuffleResource = new ShuffleResource(intermediaryDataManager, Optional.of(shuffleMetrics));
}

View File

@ -26,6 +26,7 @@ import org.joda.time.Period;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.List;
/**
*/
@ -43,6 +44,9 @@ public class WorkerConfig
@Min(1)
private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
@JsonProperty
private List<String> baseTaskDirs = null;
@JsonProperty
@NotNull
private String category = DEFAULT_CATEGORY;
@ -76,6 +80,11 @@ public class WorkerConfig
return capacity;
}
public List<String> getBaseTaskDirs()
{
return baseTaskDirs;
}
public String getCategory()
{
return category;

View File

@ -156,7 +156,7 @@ public class CliIndexer extends ServerRunnable
CliPeon.bindPeonDataSegmentHandlers(binder);
CliPeon.bindRealtimeCache(binder);
CliPeon.bindCoordinatorHandoffNotiferAndClient(binder);
CliMiddleManager.bindWorkerManagementClasses(binder, isZkEnabled);
binder.install(CliMiddleManager.makeWorkerManagementModule(isZkEnabled));
binder.bind(AppenderatorsManager.class)
.to(UnifiedIndexerAppenderatorsManager.class)

View File

@ -47,6 +47,7 @@ import org.apache.druid.guice.MiddleManagerServiceModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
@ -155,7 +156,7 @@ public class CliMiddleManager extends ServerRunnable
.in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
bindWorkerManagementClasses(binder, isZkEnabled);
binder.install(makeWorkerManagementModule(isZkEnabled));
binder.bind(JettyServerInitializer.class)
.to(MiddleManagerJettyServerInitializer.class)
@ -230,18 +231,32 @@ public class CliMiddleManager extends ServerRunnable
);
}
public static void bindWorkerManagementClasses(Binder binder, boolean isZkEnabled)
public static Module makeWorkerManagementModule(boolean isZkEnabled)
{
if (isZkEnabled) {
binder.bind(WorkerTaskManager.class).to(WorkerTaskMonitor.class);
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, WorkerTaskMonitor.class);
} else {
binder.bind(WorkerTaskManager.class).in(ManageLifecycle.class);
}
return new Module()
{
@Override
public void configure(Binder binder)
{
if (isZkEnabled) {
binder.bind(WorkerTaskManager.class).to(WorkerTaskMonitor.class);
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, WorkerTaskMonitor.class);
} else {
binder.bind(WorkerTaskManager.class).in(ManageLifecycle.class);
}
Jerseys.addResource(binder, WorkerResource.class);
Jerseys.addResource(binder, TaskManagementResource.class);
Jerseys.addResource(binder, WorkerResource.class);
Jerseys.addResource(binder, TaskManagementResource.class);
}
@Provides
@ManageLifecycle
public TaskStorageDirTracker getTaskStorageDirTracker(WorkerConfig workerConfig, TaskConfig taskConfig)
{
return TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig);
}
};
}
}

View File

@ -28,6 +28,7 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
@ -53,6 +54,7 @@ import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
@ -241,7 +243,7 @@ public class CliOverlord extends ServerRunnable
configureTaskStorage(binder);
configureIntermediaryData(binder);
configureAutoscale(binder);
configureRunners(binder);
binder.install(runnerConfigModule());
configureOverlordHelpers(binder);
binder.bind(AuditManager.class)
@ -314,36 +316,50 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class);
}
private void configureRunners(Binder binder)
private Module runnerConfigModule()
{
JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
return new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
PolyBind.createChoice(
binder,
"druid.indexer.runner.type",
Key.get(TaskRunnerFactory.class),
Key.get(HttpRemoteTaskRunnerFactory.class)
);
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(
binder,
Key.get(TaskRunnerFactory.class)
);
PolyBind.createChoice(
binder,
"druid.indexer.runner.type",
Key.get(TaskRunnerFactory.class),
Key.get(HttpRemoteTaskRunnerFactory.class)
);
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(
binder,
Key.get(TaskRunnerFactory.class)
);
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME)
.to(RemoteTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME)
.to(RemoteTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME)
.to(HttpRemoteTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME)
.to(HttpRemoteTaskRunnerFactory.class)
.in(LazySingleton.class);
binder.bind(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
}
@Provides
@ManageLifecycle
public TaskStorageDirTracker getTaskStorageDirTracker(WorkerConfig workerConfig, TaskConfig taskConfig)
{
return TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig);
}
};
}
private void configureAutoscale(Binder binder)

View File

@ -67,7 +67,6 @@ import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
@ -144,10 +143,10 @@ public class CliPeon extends GuiceRunnable
{
@SuppressWarnings("WeakerAccess")
@Required
@Arguments(description = "baseTaskDirPath taskId attemptId")
@Arguments(description = "taskDirPath attemptId")
public List<String> taskAndStatusFile;
// path to the base task Directory
// path to the task Directory
private String taskDirPath;
// the attemptId
@ -200,8 +199,8 @@ public class CliPeon extends GuiceRunnable
@Override
public void configure(Binder binder)
{
taskDirPath = Paths.get(taskAndStatusFile.get(0), taskAndStatusFile.get(1)).toAbsolutePath().toString();
attemptId = taskAndStatusFile.get(2);
taskDirPath = taskAndStatusFile.get(0);
attemptId = taskAndStatusFile.get(1);
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
@ -221,11 +220,7 @@ public class CliPeon extends GuiceRunnable
LifecycleModule.register(binder, ExecutorLifecycle.class);
ExecutorLifecycleConfig executorLifecycleConfig = new ExecutorLifecycleConfig()
.setTaskFile(Paths.get(taskDirPath, "task.json").toFile())
.setStatusFile(Paths.get(taskDirPath, "attempt", attemptId, "status.json").toFile())
.setLockFile(Paths.get(taskDirPath, "lock").toFile());
TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(ImmutableList.of(taskAndStatusFile.get(0)));
binder.bind(TaskStorageDirTracker.class).toInstance(dirTracker);
.setStatusFile(Paths.get(taskDirPath, "attempt", attemptId, "status.json").toFile());
if ("k8s".equals(properties.getProperty("druid.indexer.runner.type", null))) {
log.info("Running peon in k8s mode");

View File

@ -43,11 +43,9 @@ public class CliPeonTest
@Test
public void testCliPeonK8sMode() throws IOException
{
String taskId = "id0";
File baseTaskFile = temporaryFolder.newFolder(taskId);
File taskFile = new File(baseTaskFile, "task.json");
FileUtils.write(taskFile, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(baseTaskFile.getParent(), taskId, true);
File file = temporaryFolder.newFile("task.json");
FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), true);
final Injector injector = GuiceInjectors.makeStartupInjector();
injector.injectMembers(runnable);
Assert.assertNotNull(runnable.makeInjector());
@ -56,11 +54,9 @@ public class CliPeonTest
@Test
public void testCliPeonNonK8sMode() throws IOException
{
String taskId = "id0";
File baseTaskFile = temporaryFolder.newFolder(taskId);
File taskFile = new File(baseTaskFile, "task.json");
FileUtils.write(taskFile, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(baseTaskFile.getParent(), taskId, false);
File file = temporaryFolder.newFile("task.json");
FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
GuiceRunnable runnable = new FakeCliPeon(file.getParent(), false);
final Injector injector = GuiceInjectors.makeStartupInjector();
injector.injectMembers(runnable);
Assert.assertNotNull(runnable.makeInjector());
@ -70,11 +66,10 @@ public class CliPeonTest
{
List<String> taskAndStatusFile = new ArrayList<>();
FakeCliPeon(String baseTaskDirectory, String taskId, boolean runningOnK8s)
FakeCliPeon(String taskDirectory, boolean runningOnK8s)
{
try {
taskAndStatusFile.add(baseTaskDirectory);
taskAndStatusFile.add(taskId);
taskAndStatusFile.add(taskDirectory);
taskAndStatusFile.add("1");
Field privateField = CliPeon.class