Add a cluster-wide configuration to force timeChunk lock and add a doc for segment locking (#8173)

* Add a cluster-wide configuration to force timeChunk lock and add a doc for segment locking

* add more test

* javadoc for missingIntervalsInOverwriteMode

* Fix test

* Address comments

* avoid spotbugs
This commit is contained in:
Jihoon Son 2019-08-02 20:30:05 -07:00 committed by GitHub
parent 8a16a8e97f
commit 1ee828ff49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 323 additions and 53 deletions

View File

@ -905,6 +905,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment. Experimental task runner "httpRemote" is also available which is same as "remote" but uses HTTP to interact with Middle Manaters instead of Zookeeper.|local|
|`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. Storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|local|
|`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H|
|`druid.indexer.tasklock.forceTimeChunkLock`|If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/locking-and-priority.html#task-context). See [Task Locking & Priority](../ingestion/locking-and-priority.html) for more details about locking in tasks.|true|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|
|`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue management throws an exception before trying again.|PT30S|

View File

@ -24,30 +24,85 @@ title: "Task Locking & Priority"
# Task Locking & Priority
This document explains the task locking system in Druid. Druid's locking system
and versioning system are tighly coupled with each other to guarantee the correctness of ingested data.
## Overshadow Relation between Segments
You can run a task to overwrite existing data. The segments created by an overwriting task _overshadows_ existing segments.
Note that the overshadow relation holds only for the same time chunk and the same data source.
These overshadowed segments are not considered in query processing to filter out stale data.
Each segment has a _major_ version and a _minor_ version. The major version is
represented as a timestamp in the format of [`"yyyy-MM-dd'T'hh:mm:ss"`](https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)
while the minor version is an integer number. These major and minor versions
are used to determine the overshadow relation between segments as seen below.
A segment `s1` overshadows another `s2` if
- `s1` has a higher major version than `s2`.
- `s1` has the same major version and a higher minor version than `s2`.
Here are some examples.
- A segment of the major version of `2019-01-01T00:00:00.000Z` and the minor version of `0` overshadows
another of the major version of `2018-01-01T00:00:00.000Z` and the minor version of `1`.
- A segment of the major version of `2019-01-01T00:00:00.000Z` and the minor version of `1` overshadows
another of the major version of `2019-01-01T00:00:00.000Z` and the minor version of `0`.
## Locking
Once an Overlord process accepts a task, the task acquires locks for the data source and intervals specified in the task.
If you are running two or more [druid tasks](./tasks.html) which generate segments for the same data source and the same time chunk,
the generated segments could potentially overshadow each other, which could lead to incorrect query results.
There are two lock types, i.e., _shared lock_ and _exclusive lock_.
To avoid this problem, tasks will attempt to get locks prior to creating any segment in Druid.
There are two types of locks, i.e., _time chunk lock_ and _segment lock_.
- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other.
- A task needs to acquire an exclusive lock before it writes segments for an interval. An exclusive lock is also preemptable except while the task is publishing segments.
When the time chunk lock is used, a task locks the entire time chunk of a data source where generated segments will be written.
For example, suppose we have a task ingesting data into the time chunk of `2019-01-01T00:00:00.000Z/2019-01-02T00:00:00.000Z` of the `wikipedia` data source.
With the time chunk locking, this task will lock the entire time chunk of `2019-01-01T00:00:00.000Z/2019-01-02T00:00:00.000Z` of the `wikipedia` data source
before it creates any segments. As long as it holds the lock, any other tasks will be unable to create segments for the same time chunk of the same data source.
The segments created with the time chunk locking have a _higher_ major version than existing segments. Their minor version is always `0`.
Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't compete for acquiring locks because they usually targets different dataSources or intervals.
When the segment lock is used, a task locks individual segments instead of the entire time chunk.
As a result, two or more tasks can create segments for the same time chunk of the same data source simultaneously
if they are reading different segments.
For example, a Kafka indexing task and a compaction task can always write segments into the same time chunk of the same data source simultaneously.
The reason for this is because a Kafka indexing task always appends new segments, while a compaction task always overwrites existing segments.
The segments created with the segment locking have the _same_ major version and a _higher_ minor version.
A task writing data into a dataSource must acquire exclusive locks for target intervals. Note that exclusive locks are still preemptable. That is, they also be able to be preempted by higher priority locks unless they are _publishing segments_ in a critical section. Once publishing segments is finished, those locks become preemptable again.
To enable segment locking, you may need to set `forceTimeChunkLock` to `false` in the [task context](#task-context).
Once `forceTimeChunkLock` is unset, the task will choose a proper lock type to use automatically.
Please note that segment lock is not always available. The most common use case where time chunk lock is enforced is
when an overwriting task changes the segment granularity.
Also, the segment locking is supported by only native indexing tasks and Kafka/Kinesis indexing tasks.
The Hadoop indexing tasks and realtime indexing tasks (with [Tranquility](./stream-push.html)) don't support it yet.
Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release
locks early if they desire. Task ids are unique by naming them using UUIDs or the timestamp in which the task was created.
Tasks are also part of a "task group", which is a set of tasks that can share interval locks.
`forceTimeChunkLock` in the task context is only applied to individual tasks.
If you want to unset it for all tasks, you would want to set `druid.indexer.tasklock.forceTimeChunkLock` to false in the [overlord configuration](../configuration/index.html#overlord-operations).
## Priority
Lock requests can conflict with each other if two or more tasks try to get locks for the overlapped time chunks of the same data source.
Note that the lock conflict can happen between different locks types.
Apache Druid (incubating)'s indexing tasks use locks for atomic data ingestion. Each lock is acquired for the combination of a dataSource and an interval. Once a task acquires a lock, it can write data for the dataSource and the interval of the acquired lock unless the lock is released or preempted. Please see [the below Locking section](#locking)
The behavior on lock conflicts depends on the [task priority](#task-lock-priority).
If all tasks of conflicting lock requests have the same priority, then the task who requested first will get the lock.
Other tasks will wait for the task to release the lock.
Each task has a priority which is used for lock acquisition. The locks of higher-priority tasks can preempt the locks of lower-priority tasks if they try to acquire for the same dataSource and interval. If some locks of a task are preempted, the behavior of the preempted task depends on the task implementation. Usually, most tasks finish as failed if they are preempted.
If a task of a lower priority asks a lock later than another of a higher priority,
this task will also wait for the task of a higher priority to release the lock.
If a task of a higher priority asks a lock later than another of a lower priority,
then this task will _preempt_ the other task of a lower priority. The lock
of the lower-prioritized task will be revoked and the higher-prioritized task will acquire a new lock.
Tasks can have different default priorities depening on their types. Here are a list of default priorities. Higher the number, higher the priority.
This lock preemption can happen at any time while a task is running except
when it is _publishing segments_ in a critical section. Its locks become preemptable again once publishing segments is finished.
Note that locks are shared by the tasks of the same groupId.
For example, Kafka indexing tasks of the same supervisor have the same groupId and share all locks with each other.
## Task Lock Priority
Each task type has a different default lock priority. The below table shows the default priorities of different task types. Higher the number, higher the priority.
|task type|default priority|
|---------|----------------|
@ -56,7 +111,7 @@ Tasks can have different default priorities depening on their types. Here are a
|Merge/Append/Compaction task|25|
|Other tasks|0|
You can override the task priority by setting your priority in the task context like below.
You can override the task priority by setting your priority in the task context as below.
```json
"context" : {
@ -66,11 +121,12 @@ You can override the task priority by setting your priority in the task context
## Task Context
The task context is used for various task configuration parameters. The following parameters apply to all task types.
The task context is used for various individual task configuration. The following parameters apply to all task types.
|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
|forceTimeChunkLock|true|Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.html#overlord-operations). See [Locking](#locking) for more details.|
|priority|Different based on task types. See [Priority](#priority).|Task priority|
<div class="note caution">

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexer.TaskStatus;
@ -34,6 +33,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@ -179,12 +179,16 @@ public class NoopTask extends AbstractTask
@VisibleForTesting
public static NoopTask create(int priority)
{
return new NoopTask(null, null, null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
final Map<String, Object> context = new HashMap<>();
context.put(Tasks.PRIORITY_KEY, priority);
return new NoopTask(null, null, null, 0, 0, null, null, context);
}
@VisibleForTesting
public static NoopTask create(String id, int priority)
{
return new NoopTask(id, null, null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
final Map<String, Object> context = new HashMap<>();
context.put(Tasks.PRIORITY_KEY, priority);
return new NoopTask(id, null, null, 0, 0, null, null, context);
}
}

View File

@ -187,6 +187,12 @@ public interface Task
return getContext();
}
default Map<String, Object> addToContextIfAbsent(String key, Object val)
{
getContext().putIfAbsent(key, val);
return getContext();
}
Map<String, Object> getContext();
default <ContextValueType> ContextValueType getContextValue(String key)

View File

@ -98,7 +98,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
{
public static final String TYPE = "index_sub";
private static final Logger log = new Logger(ParallelIndexSubTask.class);
private static final Logger LOG = new Logger(ParallelIndexSubTask.class);
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
@ -111,6 +111,20 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
private Thread runThread;
private boolean stopped = false;
/**
* If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
* In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
* If this task is overwriting existing segments, then we should know this task is changing segment granularity
* in advance to know what types of lock we should use. However, if intervals are missing, we can't know
* the segment granularity of existing segments until the task reads all data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
* are missing and force to use timeChunk lock.
*
* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
* in the task logs.
*/
private final boolean missingIntervalsInOverwriteMode;
@JsonCreator
public ParallelIndexSubTask(
// id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
@ -144,6 +158,14 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
this.indexingServiceClient = indexingServiceClient;
this.taskClientFactory = taskClientFactory;
this.appenderatorsManager = appenderatorsManager;
this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
&& !ingestionSchema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.isPresent();
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
}
@Override
@ -161,17 +183,6 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
if (!ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals().isPresent()
&& !ingestionSchema.getIOConfig().isAppendToExisting()) {
// If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
// In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
// If this task is overwriting existing segments, then we should know this task is changing segment granularity
// in advance to know what types of lock we should use. However, if intervals are missing, we can't know
// the segment granularity of existing segments until the task reads all data because we don't know what segments
// are going to be overwritten. As a result, we assume that segment granularity will be changed if intervals are
// missing force to use timeChunk lock.
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec());
}
@ -204,6 +215,12 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
}
}
if (missingIntervalsInOverwriteMode) {
LOG.warn(
"Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
+ "Forced to use timeChunk lock."
);
}
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
@ -459,7 +476,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
// which makes the size of segments smaller.
final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
log.info("Pushed segments[%s]", pushed.getSegments());
LOG.info("Pushed segments[%s]", pushed.getSegments());
}
} else {
throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
@ -478,7 +495,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
log.info("Pushed segments[%s]", pushed.getSegments());
LOG.info("Pushed segments[%s]", pushed.getSegments());
return pushedSegments;
}

View File

@ -99,7 +99,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
{
public static final String TYPE = "index_parallel";
private static final Logger log = new Logger(ParallelIndexSupervisorTask.class);
private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
private final ParallelIndexIngestionSpec ingestionSchema;
private final FiniteFirehoseFactory<?, ?> baseFirehoseFactory;
@ -109,6 +109,20 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final AppenderatorsManager appenderatorsManager;
/**
* If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
* In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
* If this task is overwriting existing segments, then we should know this task is changing segment granularity
* in advance to know what types of lock we should use. However, if intervals are missing, we can't know
* the segment granularity of existing segments until the task reads all data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
* are missing and force to use timeChunk lock.
*
* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
* in the task logs.
*/
private final boolean missingIntervalsInOverwriteMode;
private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();
private volatile ParallelIndexTaskRunner runner;
@ -153,16 +167,24 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
this.authorizerMapper = authorizerMapper;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.appenderatorsManager = appenderatorsManager;
this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
&& !ingestionSchema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.isPresent();
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
!= TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS) {
log.warn("maxSavedParseExceptions is not supported yet");
LOG.warn("maxSavedParseExceptions is not supported yet");
}
if (ingestionSchema.getTuningConfig().getMaxParseExceptions() != TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS) {
log.warn("maxParseExceptions is not supported yet");
LOG.warn("maxParseExceptions is not supported yet");
}
if (ingestionSchema.getTuningConfig().isLogParseExceptions() != TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS) {
log.warn("logParseExceptions is not supported yet");
LOG.warn("logParseExceptions is not supported yet");
}
}
@ -225,17 +247,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
if (!ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals().isPresent()
&& !ingestionSchema.getIOConfig().isAppendToExisting()) {
// If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
// In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
// If this task is overwriting existing segments, then we should know this task is changing segment granularity
// in advance to know what types of lock we should use. However, if intervals are missing, we can't know
// the segment granularity of existing segments until the task reads all data because we don't know what segments
// are going to be overwritten. As a result, we assume that segment granularity will be changed if intervals are
// missing force to use timeChunk lock.
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec());
}
@ -291,7 +302,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
log.info(
if (missingIntervalsInOverwriteMode) {
LOG.warn(
"Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
+ "Forced to use timeChunk lock."
);
}
LOG.info(
"Found chat handler of class[%s]",
Preconditions.checkNotNull(chatHandlerProvider, "chatHandlerProvider").getClass().getName()
);
@ -302,12 +319,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return runParallel(toolbox);
} else {
if (!baseFirehoseFactory.isSplittable()) {
log.warn(
LOG.warn(
"firehoseFactory[%s] is not splittable. Running sequentially.",
baseFirehoseFactory.getClass().getSimpleName()
);
} else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) {
log.warn(
LOG.warn(
"maxNumSubTasks is 1. Running sequentially. "
+ "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode."
);

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
@ -74,6 +75,7 @@ public class TaskMaster implements TaskCountStatsProvider
@Inject
public TaskMaster(
final TaskLockConfig taskLockConfig,
final TaskQueueConfig taskQueueConfig,
final TaskLockbox taskLockbox,
final TaskStorage taskStorage,
@ -110,6 +112,7 @@ public class TaskMaster implements TaskCountStatsProvider
taskLockbox.syncFromStorage();
taskRunner = runnerFactory.build();
taskQueue = new TaskQueue(
taskLockConfig,
taskQueueConfig,
taskStorage,
taskRunner,

View File

@ -30,12 +30,13 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@ -80,6 +81,7 @@ public class TaskQueue
private final List<Task> tasks = new ArrayList<>();
private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new HashMap<>();
private final TaskLockConfig lockConfig;
private final TaskQueueConfig config;
private final TaskStorage taskStorage;
private final TaskRunner taskRunner;
@ -109,8 +111,8 @@ public class TaskQueue
private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
@Inject
public TaskQueue(
TaskLockConfig lockConfig,
TaskQueueConfig config,
TaskStorage taskStorage,
TaskRunner taskRunner,
@ -119,6 +121,7 @@ public class TaskQueue
ServiceEmitter emitter
)
{
this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig");
this.config = Preconditions.checkNotNull(config, "config");
this.taskStorage = Preconditions.checkNotNull(taskStorage, "taskStorage");
this.taskRunner = Preconditions.checkNotNull(taskRunner, "taskRunner");
@ -343,6 +346,9 @@ public class TaskQueue
throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId()));
}
// Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
giant.lock();
try {

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.config;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Global configurations for task lock. Used by the overlord.
*/
public class TaskLockConfig
{
@JsonProperty
private boolean forceTimeChunkLock = true;
public boolean isForceTimeChunkLock()
{
return forceTimeChunkLock;
}
}

View File

@ -71,6 +71,7 @@ import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
@ -227,6 +228,7 @@ public class TaskLifecycleTest
private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private MonitorScheduler monitorScheduler;
private ServiceEmitter emitter;
private TaskLockConfig lockConfig;
private TaskQueueConfig tqc;
private TaskConfig taskConfig;
private DataSegmentPusher dataSegmentPusher;
@ -653,12 +655,13 @@ public class TaskLifecycleTest
Preconditions.checkNotNull(tac);
Preconditions.checkNotNull(emitter);
lockConfig = new TaskLockConfig();
tqc = mapper.readValue(
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}",
TaskQueueConfig.class
);
return new TaskQueue(tqc, ts, tr, tac, taskLockbox, emitter);
return new TaskQueue(lockConfig, tqc, ts, tr, tac, taskLockbox, emitter);
}
@After

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
public class TaskLockConfigTest
{
private TaskStorage taskStorage;
@Before
public void setup()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
}
@Test
public void testDefault() throws EntryExistsException
{
final TaskQueue taskQueue = createTaskQueue(null);
taskQueue.start();
final Task task = NoopTask.create();
Assert.assertTrue(taskQueue.add(task));
taskQueue.stop();
final Optional<Task> optionalTask = taskStorage.getTask(task.getId());
Assert.assertTrue(optionalTask.isPresent());
final Task fromTaskStorage = optionalTask.get();
Assert.assertTrue(fromTaskStorage.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
}
@Test
public void testNotForceTimeChunkLock() throws EntryExistsException
{
final TaskQueue taskQueue = createTaskQueue(false);
taskQueue.start();
final Task task = NoopTask.create();
Assert.assertTrue(taskQueue.add(task));
taskQueue.stop();
final Optional<Task> optionalTask = taskStorage.getTask(task.getId());
Assert.assertTrue(optionalTask.isPresent());
final Task fromTaskStorage = optionalTask.get();
Assert.assertFalse(fromTaskStorage.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
}
@Test
public void testOverwriteDefault() throws EntryExistsException
{
final TaskQueue taskQueue = createTaskQueue(null);
taskQueue.start();
final Task task = NoopTask.create();
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false);
Assert.assertTrue(taskQueue.add(task));
taskQueue.stop();
final Optional<Task> optionalTask = taskStorage.getTask(task.getId());
Assert.assertTrue(optionalTask.isPresent());
final Task fromTaskStorage = optionalTask.get();
Assert.assertFalse(fromTaskStorage.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
}
private TaskQueue createTaskQueue(@Nullable Boolean forceTimeChunkLock)
{
final TaskLockConfig lockConfig;
if (forceTimeChunkLock != null) {
lockConfig = new TaskLockConfig()
{
@Override
public boolean isForceTimeChunkLock()
{
return forceTimeChunkLock;
}
};
} else {
lockConfig = new TaskLockConfig();
}
final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, null);
final TaskRunner taskRunner = EasyMock.createNiceMock(RemoteTaskRunner.class);
final TaskActionClientFactory actionClientFactory = EasyMock.createNiceMock(LocalTaskActionClientFactory.class);
final TaskLockbox lockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator());
final ServiceEmitter emitter = new NoopServiceEmitter();
return new TaskQueue(lockConfig, queueConfig, taskStorage, taskRunner, actionClientFactory, lockbox, emitter);
}
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
@ -170,6 +171,7 @@ public class OverlordTest
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
ServiceEmitter serviceEmitter = new NoopServiceEmitter();
taskMaster = new TaskMaster(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(1), null, new Period(10)),
taskLockbox,
taskStorage,

View File

@ -79,6 +79,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfi
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
import org.apache.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
import org.apache.druid.indexing.overlord.helpers.TaskLogAutoCleaner;
@ -174,6 +175,7 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.tasklock", TaskLockConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);