Prioritized locking (#4550)

* Implementation of prioritized locking

* Fix build failure

* Fix tc fail

* Fix typos

* Fix IndexTaskTest

* Addressed comments

* Fix test

* Fix spacing

* Fix build error

* Fix build error

* Add lock status

* Cleanup suspicious method

* Add nullables

*  add doInCriticalSection to TaskLockBox and revert return type of task actions

* fix build

* refactor CriticalAction

* make replaceLock transactional

* fix formatting

* fix javadoc

* fix build
This commit is contained in:
Jihoon Son 2017-10-12 15:16:31 +09:00 committed by Gian Merlino
parent 7a9940d624
commit dfa9cdc982
47 changed files with 2435 additions and 528 deletions

View File

@ -20,9 +20,7 @@
package io.druid.metadata;
import com.google.common.base.Optional;
import io.druid.java.util.common.Pair;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -43,7 +41,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param status status object associated wit this object, can be null
* @throws EntryExistsException
*/
public void insert(
void insert(
@NotNull String id,
@NotNull DateTime timestamp,
@NotNull String dataSource,
@ -62,7 +60,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param status status
* @return true if the status was updated, false if the entry did not exist of if the entry was inactive
*/
public boolean setStatus(String entryId, boolean active, StatusType status);
boolean setStatus(String entryId, boolean active, StatusType status);
/**
* Retrieves the entry with the given id.
@ -70,7 +68,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param entryId entry id
* @return optional entry, absent if the given id does not exist
*/
public Optional<EntryType> getEntry(String entryId);
Optional<EntryType> getEntry(String entryId);
/**
* Retrieve the status for the entry with the given id.
@ -78,14 +76,14 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param entryId entry id
* @return optional status, absent if entry does not exist or status is not set
*/
public Optional<StatusType> getStatus(String entryId);
Optional<StatusType> getStatus(String entryId);
/**
* Return all active entries with their respective status
*
* @return list of (entry, status) pairs
*/
public List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus();
List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus();
/**
* Return all statuses for inactive entries created on or later than the given timestamp
@ -93,7 +91,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param timestamp timestamp
* @return list of statuses
*/
public List<StatusType> getInactiveStatusesSince(DateTime timestamp);
List<StatusType> getInactiveStatusesSince(DateTime timestamp);
/**
* Add a lock to the given entry
@ -102,14 +100,25 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param lock lock to add
* @return true if the lock was added
*/
public boolean addLock(String entryId, LockType lock);
boolean addLock(String entryId, LockType lock);
/**
* Replace an existing lock with a new lock.
*
* @param entryId entry id
* @param oldLockId lock to be replaced
* @param newLock lock to be added
*
* @return true if the lock is replaced
*/
boolean replaceLock(String entryId, long oldLockId, LockType newLock);
/**
* Remove the lock with the given lock id.
*
* @param lockId lock id
*/
public void removeLock(long lockId);
void removeLock(long lockId);
/**
* Add a log to the entry with the given id.
@ -118,7 +127,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param log log to add
* @return true if the log was added
*/
public boolean addLog(String entryId, LogType log);
boolean addLog(String entryId, LogType log);
/**
* Returns the logs for the entry with the given id.
@ -126,7 +135,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param entryId entry id
* @return list of logs
*/
public List<LogType> getLogs(String entryId);
List<LogType> getLogs(String entryId);
/**
* Returns the locks for the given entry
@ -134,5 +143,13 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param entryId entry id
* @return map of lockId to lock
*/
public Map<Long, LockType> getLocks(String entryId);
Map<Long, LockType> getLocks(String entryId);
/**
* Returns the lock id for the given entry and the lock.
*
* @return lock id if found. Otherwise null.
*/
@Nullable
Long getLockId(String entryId, LockType lock);
}

View File

@ -88,10 +88,33 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|property|description|required?|
|--------|-----------|---------|
|type|The task type, this should always be "index".|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no|
|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes|
|context|Context containing various task configuration parameters. See below for more details.|no|
#### Task Priority
Druid's indexing tasks use locks for atomic data ingestion. Each lock is acquired for the combination of a dataSource and an interval. Once a task acquires a lock, it can write data for the dataSource and the interval of the acquired lock unless the lock is released or preempted. Please see [the below Locking section](#locking)
Each task has a priority which is used for lock acquisition. The locks of higher-priority tasks can preempt the locks of lower-priority tasks if they try to acquire for the same dataSource and interval. If some locks of a task are preempted, the behavior of the preempted task depends on the task implementation. Usually, most tasks finish as failed if they are preempted.
Tasks can have different default priorities depening on their types. Here are a list of default priorities. Higher the number, higher the priority.
|task type|default priority|
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Merge/Append task|25|
|Other tasks|0|
You can override the task priority by setting your priority in the task context like below.
```json
"context" : {
"priority" : 100
}
```
#### DataSchema
This field is required.
@ -322,7 +345,17 @@ These tasks start, sleep for a time and are used only for testing. The available
Locking
-------
Once an overlord node accepts a task, a lock is created for the data source and interval specified in the task.
Once an overlord node accepts a task, the task acquires locks for the data source and intervals specified in the task.
There are two lock types, i.e., _shared lock_ and _exclusive lock_.
- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other.
- A task needs to acquire an exclusive lock before it writes segments for an interval. An exclusive lock is also preemptable except while the task is publishing segments.
Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't compete for acquiring locks because they usually targets different dataSources or intervals.
A task writing data into a dataSource must acquire exclusive locks for target intervals. Note that exclusive locks are still preemptable. That is, they also be able to be preempted by higher priority locks unless they are _publishing segments_ in a critical section. Once publishing segments is finished, those locks become preemptable again.
Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release
locks early if they desire. Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created.
locks early if they desire. Task ids are unique by naming them using UUIDs or the timestamp in which the task was created.
Tasks are also part of a "task group", which is a set of tasks that can share interval locks.

View File

@ -114,6 +114,7 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@ -576,13 +577,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
sequenceNames.values()
).get();
final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
final SegmentsAndMetadata handedOff;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOff = driver.registerHandoff(published)
.get();
handedOff = handoffFuture.get();
} else {
handedOff = driver.registerHandoff(published)
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
}
if (handedOff == null) {

View File

@ -22,30 +22,73 @@ package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.joda.time.Interval;
import javax.annotation.Nullable;
/**
* Represents a lock held by some task. Immutable.
*/
public class TaskLock
{
private final TaskLockType type;
private final String groupId;
private final String dataSource;
private final Interval interval;
private final String version;
private final int priority;
private final boolean revoked;
@JsonCreator
public TaskLock(
@JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version
@JsonProperty("version") String version,
@JsonProperty("priority") int priority,
@JsonProperty("revoked") boolean revoked
)
{
this.groupId = groupId;
this.dataSource = dataSource;
this.interval = interval;
this.version = version;
this.type = type == null ? TaskLockType.EXCLUSIVE : type;
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Preconditions.checkNotNull(interval, "interval");
this.version = Preconditions.checkNotNull(version, "version");
this.priority = priority;
this.revoked = revoked;
}
public TaskLock(
TaskLockType type,
String groupId,
String dataSource,
Interval interval,
String version,
int priority
)
{
this(type, groupId, dataSource, interval, version, priority, false);
}
public TaskLock revokedCopy()
{
return new TaskLock(
type,
groupId,
dataSource,
interval,
version,
priority,
true
);
}
@JsonProperty
public TaskLockType getType()
{
return type;
}
@JsonProperty
@ -72,34 +115,52 @@ public class TaskLock
return version;
}
@JsonProperty
public int getPriority()
{
return priority;
}
@JsonProperty
public boolean isRevoked()
{
return revoked;
}
@Override
public boolean equals(Object o)
{
if (!(o instanceof TaskLock)) {
return false;
} else {
final TaskLock x = (TaskLock) o;
return Objects.equal(this.groupId, x.groupId) &&
Objects.equal(this.dataSource, x.dataSource) &&
Objects.equal(this.interval, x.interval) &&
Objects.equal(this.version, x.version);
final TaskLock that = (TaskLock) o;
return this.type.equals(that.type) &&
this.groupId.equals(that.groupId) &&
this.dataSource.equals(that.dataSource) &&
this.interval.equals(that.interval) &&
this.version.equals(that.version) &&
this.priority == that.priority &&
this.revoked == that.revoked;
}
}
@Override
public int hashCode()
{
return Objects.hashCode(groupId, dataSource, interval, version);
return Objects.hashCode(type, groupId, dataSource, interval, version, priority, revoked);
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("type", type)
.add("groupId", groupId)
.add("dataSource", dataSource)
.add("interval", interval)
.add("version", version)
.add("priority", priority)
.add("revoked", revoked)
.toString();
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common;
public enum TaskLockType
{
SHARED,
EXCLUSIVE
}

View File

@ -23,13 +23,20 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.LockResult;
import org.joda.time.Interval;
import javax.annotation.Nullable;
public class LockAcquireAction implements TaskAction<TaskLock>
{
private final TaskLockType type;
@JsonIgnore
private final Interval interval;
@ -38,14 +45,22 @@ public class LockAcquireAction implements TaskAction<TaskLock>
@JsonCreator
public LockAcquireAction(
@JsonProperty("lockType") @Nullable TaskLockType type, // nullable for backward compatibility
@JsonProperty("interval") Interval interval,
@JsonProperty("timeoutMs") long timeoutMs
)
{
this.interval = interval;
this.type = type == null ? TaskLockType.EXCLUSIVE : type;
this.interval = Preconditions.checkNotNull(interval, "interval");
this.timeoutMs = timeoutMs;
}
@JsonProperty("lockType")
public TaskLockType getType()
{
return type;
}
@JsonProperty
public Interval getInterval()
{
@ -70,11 +85,10 @@ public class LockAcquireAction implements TaskAction<TaskLock>
public TaskLock perform(Task task, TaskActionToolbox toolbox)
{
try {
if (timeoutMs == 0) {
return toolbox.getTaskLockbox().lock(task, interval);
} else {
return toolbox.getTaskLockbox().lock(task, interval, timeoutMs);
}
final LockResult result = timeoutMs == 0 ?
toolbox.getTaskLockbox().lock(type, task, interval) :
toolbox.getTaskLockbox().lock(type, task, interval, timeoutMs);
return result.isOk() ? result.getTaskLock() : null;
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
@ -91,8 +105,9 @@ public class LockAcquireAction implements TaskAction<TaskLock>
public String toString()
{
return "LockAcquireAction{" +
"interval=" + interval +
"timeoutMs=" + timeoutMs +
"lockType=" + type +
", interval=" + interval +
", timeoutMs=" + timeoutMs +
'}';
}
}

View File

@ -24,22 +24,37 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.LockResult;
import org.joda.time.Interval;
import javax.annotation.Nullable;
public class LockTryAcquireAction implements TaskAction<TaskLock>
{
@JsonIgnore
private final TaskLockType type;
@JsonIgnore
private final Interval interval;
@JsonCreator
public LockTryAcquireAction(
@JsonProperty("lockType") @Nullable TaskLockType type, // nullable for backward compatibility
@JsonProperty("interval") Interval interval
)
{
this.type = type == null ? TaskLockType.EXCLUSIVE : type;
this.interval = interval;
}
@JsonProperty("lockType")
public TaskLockType getType()
{
return type;
}
@JsonProperty
public Interval getInterval()
{
@ -57,7 +72,8 @@ public class LockTryAcquireAction implements TaskAction<TaskLock>
@Override
public TaskLock perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getTaskLockbox().tryLock(task, interval).orNull();
final LockResult result = toolbox.getTaskLockbox().tryLock(type, task, interval);
return result.isOk() ? result.getTaskLock() : null;
}
@Override
@ -70,7 +86,8 @@ public class LockTryAcquireAction implements TaskAction<TaskLock>
public String toString()
{
return "LockTryAcquireAction{" +
"interval=" + interval +
"lockType=" + type +
", interval=" + interval +
'}';
}
}

View File

@ -24,10 +24,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.LockResult;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.logger.Logger;
@ -254,14 +256,19 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
rowInterval,
tryInterval
);
final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull();
if (tryLock != null) {
final LockResult lockResult = toolbox.getTaskLockbox().tryLock(TaskLockType.EXCLUSIVE, task, tryInterval);
if (lockResult.isRevoked()) {
// We had acquired a lock but it was preempted by other locks
throw new ISE("The lock for interval[%s] is preempted and no longer valid", tryInterval);
}
if (lockResult.isOk()) {
final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
tryLock.getVersion()
lockResult.getTaskLock().getVersion()
);
if (identifier != null) {
return identifier;

View File

@ -26,11 +26,16 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.CriticalAction;
import io.druid.java.util.common.ISE;
import io.druid.query.DruidMetrics;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class SegmentMetadataUpdateAction implements TaskAction<Void>
{
@ -62,8 +67,32 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
Task task, TaskActionToolbox toolbox
) throws IOException
{
toolbox.verifyTaskLocks(task, segments);
toolbox.getIndexerMetadataStorageCoordinator().updateSegmentMetadata(segments);
TaskActionPreconditions.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
final List<Interval> intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
intervals,
CriticalAction.builder()
.onValidLocks(
() -> {
toolbox.getIndexerMetadataStorageCoordinator().updateSegmentMetadata(segments);
return null;
}
)
.onInvalidLocks(
() -> {
throw new ISE("Some locks for task[%s] are already revoked", task.getId());
}
)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -26,11 +26,16 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.CriticalAction;
import io.druid.java.util.common.ISE;
import io.druid.query.DruidMetrics;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class SegmentNukeAction implements TaskAction<Void>
{
@ -62,8 +67,32 @@ public class SegmentNukeAction implements TaskAction<Void>
@Override
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocks(task, segments);
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
TaskActionPreconditions.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
final List<Interval> intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
intervals,
CriticalAction.builder()
.onValidLocks(
() -> {
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
return null;
}
)
.onInvalidLocks(
() -> {
throw new ISE("Some locks for task[%s] are already revoked", task.getId());
}
)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -25,13 +25,16 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.CriticalAction;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DruidMetrics;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Insert segments into metadata storage. The segment versions must all be less than or equal to a lock held by
@ -43,6 +46,8 @@ import java.util.Set;
*/
public class SegmentTransactionalInsertAction implements TaskAction<SegmentPublishResult>
{
private static final Logger LOG = new Logger(SegmentTransactionalInsertAction.class);
private final Set<DataSegment> segments;
private final DataSourceMetadata startMetadata;
private final DataSourceMetadata endMetadata;
@ -99,28 +104,45 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocks(task, segments);
TaskActionPreconditions.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
final SegmentPublishResult retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
startMetadata,
endMetadata
);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, task.getDataSource())
.setDimension(DruidMetrics.TASK_TYPE, task.getType());
if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1));
final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
startMetadata,
endMetadata
)
)
.onInvalidLocks(SegmentPublishResult::fail)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize()));
if (retVal.isSuccess()) {
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, task.getDataSource())
.setDimension(DruidMetrics.TASK_TYPE, task.getType());
if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1));
}
for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize()));
}
}
return retVal;

View File

@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.google.common.annotations.VisibleForTesting;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.java.util.common.ISE;
import io.druid.timeline.DataSegment;
import java.util.List;
import java.util.Set;
public class TaskActionPreconditions
{
public static void checkLockCoversSegments(
final Task task,
final TaskLockbox taskLockbox,
final Set<DataSegment> segments
)
{
if (!isLockCoversSegments(task, taskLockbox, segments)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
}
@VisibleForTesting
static boolean isLockCoversSegments(
final Task task,
final TaskLockbox taskLockbox,
final Set<DataSegment> segments
)
{
// Verify that each of these segments falls under some lock
// NOTE: It is possible for our lock to be revoked (if the task has failed and given up its locks) after we check
// NOTE: it and before we perform the segment insert, but, that should be OK since the worst that happens is we
// NOTE: insert some segments from the task but not others.
final List<TaskLock> taskLocks = taskLockbox.findLocksForTask(task);
for (final DataSegment segment : segments) {
final boolean ok = taskLocks.stream().anyMatch(
taskLock -> taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval())
&& taskLock.getVersion().compareTo(segment.getVersion()) >= 0
);
if (!ok) {
return false;
}
}
return true;
}
}

View File

@ -19,20 +19,11 @@
package io.druid.indexing.common.actions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.ISE;
import io.druid.timeline.DataSegment;
import java.util.List;
import java.util.Set;
public class TaskActionToolbox
{
@ -74,48 +65,4 @@ public class TaskActionToolbox
{
return supervisorManager;
}
public void verifyTaskLocks(
final Task task,
final Set<DataSegment> segments
)
{
if (!taskLockCoversSegments(task, segments)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
}
public boolean taskLockCoversSegments(
final Task task,
final Set<DataSegment> segments
)
{
// Verify that each of these segments falls under some lock
// NOTE: It is possible for our lock to be revoked (if the task has failed and given up its locks) after we check
// NOTE: it and before we perform the segment insert, but, that should be OK since the worst that happens is we
// NOTE: insert some segments from the task but not others.
final List<TaskLock> taskLocks = getTaskLockbox().findLocksForTask(task);
for (final DataSegment segment : segments) {
final boolean ok = Iterables.any(
taskLocks, new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock taskLock)
{
return taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval())
&& taskLock.getVersion().compareTo(segment.getVersion()) >= 0;
}
}
);
if (!ok) {
return false;
}
}
return true;
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.Interval;
@ -89,7 +90,7 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
return taskActionClient.submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null;
}
@JsonProperty

View File

@ -26,14 +26,15 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.java.util.common.DateTimes;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public abstract class AbstractTask implements Task
@ -201,9 +202,9 @@ public abstract class AbstractTask implements Task
return id.hashCode();
}
protected Iterable<TaskLock> getTaskLocks(TaskToolbox toolbox) throws IOException
protected List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
{
return toolbox.getTaskActionClient().submit(new LockListAction());
return client.submit(new LockListAction());
}
@Override
@ -212,11 +213,4 @@ public abstract class AbstractTask implements Task
{
return context;
}
@Override
public Object getContextValue(String key)
{
return context == null ? null : context.get(key);
}
}

View File

@ -22,7 +22,6 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@ -65,7 +64,7 @@ public class ArchiveTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient()));
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());

View File

@ -36,6 +36,7 @@ import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.Jobby;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
@ -119,6 +120,12 @@ public class HadoopIndexTask extends HadoopTask
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMappper");
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override
public String getType()
{
@ -135,7 +142,7 @@ public class HadoopIndexTask extends HadoopTask
intervals.get()
)
);
return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
return taskActionClient.submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null;
} else {
return true;
}
@ -199,10 +206,15 @@ public class HadoopIndexTask extends HadoopTask
);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs));
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new LockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
),
"Cannot acquire a lock for interval[%s]", interval
);
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox);
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion();
}

View File

@ -49,8 +49,6 @@ import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
@ -98,9 +96,11 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -162,6 +162,12 @@ public class IndexTask extends AbstractTask
this.ingestionSchema = ingestionSchema;
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override
public String getType()
{
@ -171,11 +177,21 @@ public class IndexTask extends AbstractTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
Optional<SortedSet<Interval>> intervals = ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals();
final Optional<SortedSet<Interval>> intervals = ingestionSchema.getDataSchema()
.getGranularitySpec()
.bucketIntervals();
if (intervals.isPresent()) {
Interval interval = JodaUtils.umbrellaInterval(intervals.get());
return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
final List<TaskLock> locks = getTaskLocks(taskActionClient);
if (locks.size() == 0) {
try {
Tasks.tryAcquireExclusiveLocks(taskActionClient, intervals.get());
}
catch (Exception e) {
return false;
}
}
return true;
} else {
return true;
}
@ -208,14 +224,15 @@ public class IndexTask extends AbstractTask
final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir);
final String version;
final DataSchema dataSchema;
final Map<Interval, String> versions;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals());
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs));
version = lock.getVersion();
final SortedSet<Interval> intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
intervals.addAll(shardSpecs.getIntervals());
final Map<Interval, TaskLock> locks = Tasks.tryAcquireExclusiveLocks(toolbox.getTaskActionClient(), intervals);
versions = locks.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getVersion()));
dataSchema = ingestionSchema.getDataSchema().withGranularitySpec(
ingestionSchema.getDataSchema()
.getGranularitySpec()
@ -226,17 +243,28 @@ public class IndexTask extends AbstractTask
)
);
} else {
version = Iterables.getOnlyElement(getTaskLocks(toolbox)).getVersion();
versions = getTaskLocks(toolbox.getTaskActionClient())
.stream()
.collect(Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion));
dataSchema = ingestionSchema.getDataSchema();
}
if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, version, firehoseFactory, firehoseTempDir)) {
if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, versions, firehoseFactory, firehoseTempDir)) {
return TaskStatus.success(getId());
} else {
return TaskStatus.failure(getId());
}
}
private static String findVersion(Map<Interval, String> versions, Interval interval)
{
return versions.entrySet().stream()
.filter(entry -> entry.getKey().contains(interval))
.map(Entry::getValue)
.findFirst()
.orElse(null);
}
private static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
{
Preconditions.checkState(
@ -523,7 +551,7 @@ public class IndexTask extends AbstractTask
final TaskToolbox toolbox,
final DataSchema dataSchema,
final ShardSpecs shardSpecs,
final String version,
Map<Interval, String> versions,
final FirehoseFactory firehoseFactory,
final File firehoseTempDir
) throws IOException, InterruptedException
@ -571,6 +599,7 @@ public class IndexTask extends AbstractTask
shardSpecForPublishing = shardSpec;
}
final String version = findVersion(versions, entry.getKey());
lookup.put(
Appenderators.getSequenceName(entry.getKey(), version, shardSpec),
new SegmentIdentifier(getDataSource(), entry.getKey(), version, shardSpecForPublishing)
@ -599,12 +628,12 @@ public class IndexTask extends AbstractTask
}
final int partitionNum = counters.computeIfAbsent(interval, x -> new AtomicInteger()).getAndIncrement();
return new SegmentIdentifier(getDataSource(), interval, version, new NumberedShardSpec(partitionNum, 0));
return new SegmentIdentifier(getDataSource(), interval, findVersion(versions, interval), new NumberedShardSpec(partitionNum, 0));
};
}
final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments, null, null);
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
return toolbox.getTaskActionClient().submit(action).isSuccess();
};
@ -646,13 +675,12 @@ public class IndexTask extends AbstractTask
// Sequence name is based solely on the shardSpec, and there will only be one segment per sequence.
final Interval interval = optInterval.get();
final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow);
sequenceName = Appenderators.getSequenceName(interval, version, shardSpec);
sequenceName = Appenderators.getSequenceName(interval, findVersion(versions, interval), shardSpec);
} else {
// Segments are created as needed, using a single sequence name. They may be allocated from the overlord
// (in append mode) or may be created on our own authority (in overwrite mode).
sequenceName = getId();
}
final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);
if (addResult.isOk()) {

View File

@ -69,7 +69,7 @@ public class KillTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient()));
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());

View File

@ -126,10 +126,16 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
);
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient()));
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);

View File

@ -73,7 +73,7 @@ public class MoveTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient()));
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());

View File

@ -22,6 +22,8 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus;
@ -152,4 +154,16 @@ public class NoopTask extends AbstractTask
{
return new NoopTask(null, 0, 0, null, null, null);
}
@VisibleForTesting
public static NoopTask create(int priority)
{
return new NoopTask(null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
}
@VisibleForTesting
public static NoopTask create(String id, int priority)
{
return new NoopTask(id, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -35,6 +36,7 @@ import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
@ -159,6 +161,12 @@ public class RealtimeIndexTask extends AbstractTask
this.spec = fireDepartment;
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override
public String getType()
{
@ -222,7 +230,13 @@ public class RealtimeIndexTask extends AbstractTask
public void announceSegment(final DataSegment segment) throws IOException
{
// Side effect: Calling announceSegment causes a lock to be acquired
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs));
Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new LockAcquireAction(TaskLockType.EXCLUSIVE, segment.getInterval(), lockTimeoutMs)
),
"Cannot acquire a lock for interval[%s]",
segment.getInterval()
);
toolbox.getSegmentAnnouncer().announceSegment(segment);
}
@ -242,7 +256,13 @@ public class RealtimeIndexTask extends AbstractTask
{
// Side effect: Calling announceSegments causes locks to be acquired
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs));
Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new LockAcquireAction(TaskLockType.EXCLUSIVE, segment.getInterval(), lockTimeoutMs)
),
"Cannot acquire a lock for interval[%s]",
segment.getInterval()
);
}
toolbox.getSegmentAnnouncer().announceSegments(segments);
}
@ -274,10 +294,14 @@ public class RealtimeIndexTask extends AbstractTask
{
try {
// Side effect: Calling getVersion causes a lock to be acquired
final TaskLock myLock = toolbox.getTaskActionClient()
.submit(new LockAcquireAction(interval, lockTimeoutMs));
final LockAcquireAction action = new LockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs);
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(action),
"Cannot acquire a lock for interval[%s]",
interval
);
return myLock.getVersion();
return lock.getVersion();
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -65,7 +65,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient()));
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());

View File

@ -79,6 +79,19 @@ public interface Task
*/
public String getGroupId();
/**
* Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
* be used for task scheduling, cluster resource management, etc.
*
* @return task priority
*
* @see Tasks for default task priorities
*/
default int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
}
/**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.

View File

@ -19,8 +19,77 @@
package io.druid.indexing.common.task;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.overlord.LockResult;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
public class Tasks
{
public static String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
public static final int DEFAULT_REALTIME_TASK_PRIORITY = 75;
public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
public static final int DEFAULT_TASK_PRIORITY = 0;
public static final long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static void checkLockResult(LockResult result, Interval interval)
{
if (!result.isOk()) {
throw new ISE("Failed to lock for interval[%s]", interval);
}
}
public static Map<Interval, TaskLock> tryAcquireExclusiveLocks(TaskActionClient client, SortedSet<Interval> intervals)
throws IOException
{
final Map<Interval, TaskLock> lockMap = new HashMap<>();
for (Interval interval : computeCompactIntervals(intervals)) {
final TaskLock lock = Preconditions.checkNotNull(
client.submit(new LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)),
"Cannot acquire a lock for interval[%s]", interval
);
lockMap.put(interval, lock);
}
return lockMap;
}
public static SortedSet<Interval> computeCompactIntervals(SortedSet<Interval> intervals)
{
final SortedSet<Interval> compactIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
List<Interval> toBeAccumulated = null;
for (Interval interval : intervals) {
if (toBeAccumulated == null) {
toBeAccumulated = new ArrayList<>();
toBeAccumulated.add(interval);
} else {
if (toBeAccumulated.get(toBeAccumulated.size() - 1).abuts(interval)) {
toBeAccumulated.add(interval);
} else {
compactIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
toBeAccumulated = null;
}
}
}
if (toBeAccumulated != null) {
compactIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
}
return compactIntervals;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import org.joda.time.Interval;
/**
* This class represents a critical action must be done while the task's lock is guaranteed to not be revoked in the
* middle of the action.
*
* Implementations must not change the lock state by calling {@link TaskLockbox#lock(TaskLockType, Task, Interval)},
* {@link TaskLockbox#lock(TaskLockType, Task, Interval, long)}, {@link TaskLockbox#tryLock(TaskLockType, Task, Interval)},
* or {@link TaskLockbox#unlock(Task, Interval)}.
*
* Also, implementations should be finished as soon as possible because all methods in {@link TaskLockbox} are blocked
* until this action is finished.
*
* @see TaskLockbox#doInCriticalSection
*/
public class CriticalAction<T>
{
private final Action<T> actionOnValidLocks;
private final Action<T> actionOnInvalidLocks;
private CriticalAction(Action<T> actionOnValidLocks, Action<T> actionOnInvalidLocks)
{
this.actionOnValidLocks = Preconditions.checkNotNull(actionOnValidLocks, "actionOnValidLocks");
this.actionOnInvalidLocks = Preconditions.checkNotNull(actionOnInvalidLocks, "actionOnInvalidLocks");
}
T perform(boolean isTaskLocksValid) throws Exception
{
return isTaskLocksValid ? actionOnValidLocks.perform() : actionOnInvalidLocks.perform();
}
public static <T> Builder<T> builder()
{
return new Builder<>();
}
public static class Builder<T>
{
private Action<T> actionOnInvalidLocks;
private Action<T> actionOnValidLocks;
public Builder<T> onValidLocks(Action<T> action)
{
this.actionOnValidLocks = action;
return this;
}
public Builder<T> onInvalidLocks(Action<T> action)
{
this.actionOnInvalidLocks = action;
return this;
}
public CriticalAction<T> build()
{
return new CriticalAction<>(actionOnValidLocks, actionOnInvalidLocks);
}
}
public interface Action<T>
{
T perform() throws Exception;
}
}

View File

@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
@ -200,6 +199,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
giant.lock();
try {
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(taskLock, "taskLock");
taskLocks.put(taskid, taskLock);
}
@ -208,6 +208,27 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
}
@Override
public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock)
{
giant.lock();
try {
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(oldLock, "oldLock");
Preconditions.checkNotNull(newLock, "newLock");
if (!taskLocks.remove(taskid, oldLock)) {
log.warn("taskLock[%s] for replacement is not found for task[%s]", oldLock, taskid);
}
taskLocks.put(taskid, newLock);
}
finally {
giant.unlock();
}
}
@Override
public void removeLock(final String taskid, final TaskLock taskLock)
{

View File

@ -0,0 +1,83 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import org.joda.time.Interval;
import javax.annotation.Nullable;
/**
* This class represents the result of {@link TaskLockbox#tryLock(TaskLockType, Task, Interval)}. If the lock
* acquisition fails, the callers can tell that it was failed because it was preempted by other locks of higher
* priorities or not by checking the {@link #revoked} flag.
*
* The {@link #revoked} flag means that consecutive lock acquisitions for the same dataSource and interval are
* returning different locks because another lock of a higher priority preempted your lock at some point. In this case,
* the lock acquisition must fail.
*
* @see TaskLockbox#tryLock(TaskLockType, Task, Interval)
*/
public class LockResult
{
private final TaskLock taskLock;
private final boolean revoked;
public static LockResult ok(TaskLock taskLock)
{
return new LockResult(taskLock, false);
}
public static LockResult fail(boolean revoked)
{
return new LockResult(null, revoked);
}
@JsonCreator
public LockResult(
@JsonProperty("taskLock") @Nullable TaskLock taskLock,
@JsonProperty("revoked") boolean revoked
)
{
this.taskLock = taskLock;
this.revoked = revoked;
}
@JsonProperty("taskLock")
public TaskLock getTaskLock()
{
return taskLock;
}
@JsonProperty("revoked")
public boolean isRevoked()
{
return revoked;
}
public boolean isOk()
{
return taskLock != null;
}
}

View File

@ -247,22 +247,40 @@ public class MetadataTaskStorage implements TaskStorage
handler.addLock(taskid, taskLock);
}
@Override
public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock)
{
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(oldLock, "oldLock");
Preconditions.checkNotNull(newLock, "newLock");
log.info(
"Replacing lock on interval[%s] version[%s] for task: %s",
oldLock.getInterval(),
oldLock.getVersion(),
taskid
);
final Long oldLockId = handler.getLockId(taskid, oldLock);
if (oldLockId == null) {
throw new ISE("Cannot find lock[%s]", oldLock);
}
handler.replaceLock(taskid, oldLockId, newLock);
}
@Override
public void removeLock(String taskid, TaskLock taskLockToRemove)
{
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(taskLockToRemove, "taskLockToRemove");
final Map<Long, TaskLock> taskLocks = getLocksWithIds(taskid);
for (final Map.Entry<Long, TaskLock> taskLockWithId : taskLocks.entrySet()) {
final long id = taskLockWithId.getKey();
final TaskLock taskLock = taskLockWithId.getValue();
if (taskLock.equals(taskLockToRemove)) {
log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
handler.removeLock(id);
}
final Long lockId = handler.getLockId(taskid, taskLockToRemove);
if (lockId == null) {
log.warn("Cannot find lock[%s]", taskLockToRemove);
} else {
log.info("Deleting TaskLock with id[%d]: %s", lockId, taskLockToRemove);
handler.removeLock(lockId);
}
}

View File

@ -22,9 +22,7 @@ package io.druid.indexing.overlord;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@ -35,16 +33,19 @@ import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.FunctionalIterable;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -54,6 +55,9 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* Remembers which activeTasks have locked which intervals. Tasks are permitted to lock an interval if no other task
@ -62,8 +66,11 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class TaskLockbox
{
// Datasource -> Interval -> Tasks + TaskLock
private final Map<String, NavigableMap<Interval, TaskLockPosse>> running = Maps.newHashMap();
// Datasource -> Interval -> list of (Tasks + TaskLock)
// Multiple shared locks can be acquired for the same dataSource and interval.
// Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when
// they acquire the same locks again.
private final Map<String, NavigableMap<Interval, List<TaskLockPosse>>> running = Maps.newHashMap();
private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition();
@ -129,10 +136,11 @@ public class TaskLockbox
final TaskLockPosse taskLockPosse = createOrFindLockPosse(
task,
savedTaskLock.getInterval(),
Optional.of(savedTaskLock.getVersion())
savedTaskLock.getVersion(),
savedTaskLock.getType()
);
if (taskLockPosse != null) {
taskLockPosse.getTaskIds().add(task.getId());
taskLockPosse.addTask(task);
final TaskLock taskLock = taskLockPosse.getTaskLock();
@ -176,23 +184,33 @@ public class TaskLockbox
}
/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired.
* Acquires a lock on behalf of a task. Blocks until the lock is acquired.
*
* @param task task to acquire lock for
* @param lockType lock type
* @param task task to acquire lock for
* @param interval interval to lock
* @return acquired TaskLock
*
* @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a
* {@link LockResult#revoked} flag.
*
* @throws InterruptedException if the current thread is interrupted
*/
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
public LockResult lock(
final TaskLockType lockType,
final Task task,
final Interval interval
) throws InterruptedException
{
giant.lockInterruptibly();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
LockResult lockResult;
while (!(lockResult = tryLock(lockType, task, interval)).isOk()) {
if (lockResult.isRevoked()) {
return lockResult;
}
lockReleaseCondition.await();
}
return taskLock.get();
return lockResult;
}
finally {
giant.unlock();
@ -202,27 +220,34 @@ public class TaskLockbox
/**
* Acquires a lock on behalf of a task, waiting up to the specified wait time if necessary.
*
* @param lockType lock type
* @param task task to acquire a lock for
* @param interval interval to lock
* @param timeoutMs maximum time to wait
*
* @return acquired lock
* @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a
* {@link LockResult#revoked} flag.
*
* @throws InterruptedException if the current thread is interrupted
*/
public TaskLock lock(final Task task, final Interval interval, long timeoutMs) throws InterruptedException
public LockResult lock(
final TaskLockType lockType,
final Task task,
final Interval interval,
long timeoutMs
) throws InterruptedException
{
long nanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
giant.lockInterruptibly();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
if (nanos <= 0) {
return null;
LockResult lockResult;
while (!(lockResult = tryLock(lockType, task, interval)).isOk()) {
if (nanos <= 0 || lockResult.isRevoked()) {
return lockResult;
}
nanos = lockReleaseCondition.awaitNanos(nanos);
}
return taskLock.get();
return lockResult;
}
finally {
giant.unlock();
@ -230,35 +255,23 @@ public class TaskLockbox
}
/**
* Attempt to lock a task, without removing it from the queue. Equivalent to the long form of {@code tryLock}
* with no preferred version.
* Attempt to acquire a lock for a task, without removing it from the queue. Can safely be called multiple times on
* the same task until the lock is preempted.
*
* @param task task that wants a lock
* @param interval interval to lock
* @param lockType type of lock to be acquired
* @param task task that wants a lock
* @param interval interval to lock
*
* @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a
* {@link LockResult#revoked} flag.
*
* @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/
public Optional<TaskLock> tryLock(final Task task, final Interval interval)
{
return tryLock(task, interval, Optional.<String>absent());
}
/**
* Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task.
* This method will attempt to assign version strings that obey the invariant that every version string is
* lexicographically greater than any other version string previously assigned to the same interval. This invariant
* is only mostly guaranteed, however; we assume clock monotonicity and we assume that callers specifying
* {@code preferredVersion} are doing the right thing.
*
* @param task task that wants a lock
* @param interval interval to lock
* @param preferredVersion use this version string if one has not yet been assigned
*
* @return lock version if lock was acquired, absent otherwise
* @throws IllegalStateException if the task is not a valid active task
*/
private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
public LockResult tryLock(
final TaskLockType lockType,
final Task task,
final Interval interval
)
{
giant.lock();
@ -268,16 +281,16 @@ public class TaskLockbox
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final TaskLockPosse posseToUse = createOrFindLockPosse(task, interval, preferredVersion);
if (posseToUse != null) {
final TaskLockPosse posseToUse = createOrFindLockPosse(task, interval, lockType);
if (posseToUse != null && !posseToUse.getTaskLock().isRevoked()) {
// Add to existing TaskLockPosse, if necessary
if (posseToUse.getTaskIds().add(task.getId())) {
if (posseToUse.addTask(task)) {
log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
// Update task storage facility. If it fails, revoke the lock.
try {
taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
return Optional.of(posseToUse.getTaskLock());
return LockResult.ok(posseToUse.getTaskLock());
}
catch (Exception e) {
log.makeAlert("Failed to persist lock in storage")
@ -287,89 +300,191 @@ public class TaskLockbox
.addData("version", posseToUse.getTaskLock().getVersion())
.emit();
unlock(task, interval);
return Optional.absent();
return LockResult.fail(false);
}
} else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
return Optional.of(posseToUse.getTaskLock());
return LockResult.ok(posseToUse.getTaskLock());
}
} else {
return Optional.absent();
final boolean lockRevoked = posseToUse != null && posseToUse.getTaskLock().isRevoked();
return LockResult.fail(lockRevoked);
}
}
finally {
giant.unlock();
}
}
/**
* See {@link #createOrFindLockPosse(Task, Interval, String, TaskLockType)}
*/
@Nullable
private TaskLockPosse createOrFindLockPosse(
final Task task,
final Interval interval,
final Optional<String> preferredVersion
final TaskLockType lockType
)
{
return createOrFindLockPosse(task, interval, null, lockType);
}
/**
* Create a new {@link TaskLockPosse} or find an existing one for the given task and interval. Note that the returned
* {@link TaskLockPosse} can hold a revoked lock.
*
* @param task task acquiring a lock
* @param interval interval to be locked
* @param preferredVersion a preferred version string
* @param lockType type of lock to be acquired
*
* @return a lock posse or null if any posse is found and a new poss cannot be created
*
* @see #createNewTaskLockPosse(TaskLockType, String, String, Interval, String, int)
*/
@Nullable
private TaskLockPosse createOrFindLockPosse(
final Task task,
final Interval interval,
@Nullable final String preferredVersion,
final TaskLockType lockType
)
{
giant.lock();
try {
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
final TaskLockPosse posseToUse;
final int priority = task.getPriority();
final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(dataSource, interval);
if (foundPosses.size() > 1) {
if (foundPosses.size() > 0) {
// If we have some locks for dataSource and interval, check they can be reused.
// If they can't be reused, check lock priority and revoke existing locks if possible.
final List<TaskLockPosse> filteredPosses = foundPosses
.stream()
.filter(posse -> matchGroupIdAndContainInterval(posse.taskLock, task, interval))
.collect(Collectors.toList());
// Too many existing locks.
return null;
if (filteredPosses.size() == 0) {
// case 1) this task doesn't have any lock, but others do
} else if (foundPosses.size() == 1) {
if (lockType.equals(TaskLockType.SHARED) && isAllSharedLocks(foundPosses)) {
// Any number of shared locks can be acquired for the same dataSource and interval.
return createNewTaskLockPosse(
lockType,
task.getGroupId(),
dataSource,
interval,
preferredVersion,
priority
);
} else {
if (isAllRevocable(foundPosses, priority)) {
// Revoke all existing locks
foundPosses.forEach(this::revokeLock);
// One existing lock -- check if we can add to it.
final TaskLockPosse foundPosse = Iterables.getOnlyElement(foundPosses);
if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) {
posseToUse = foundPosse;
} else {
//Could be a deadlock for LockAcquireAction: same task trying to acquire lock for overlapping interval
if (foundPosse.getTaskIds().contains(task.getId())) {
log.makeAlert("Same Task is trying to acquire lock for overlapping interval")
.addData("task", task.getId())
.addData("interval", interval);
return createNewTaskLockPosse(
lockType,
task.getGroupId(),
dataSource,
interval,
preferredVersion,
priority
);
} else {
log.info("Cannot create a new taskLockPosse because some locks of same or higher priorities exist");
return null;
}
}
} else if (filteredPosses.size() == 1) {
// case 2) we found a lock posse for the given task
final TaskLockPosse foundPosse = filteredPosses.get(0);
if (lockType.equals(foundPosse.getTaskLock().getType())) {
return foundPosse;
} else {
throw new ISE(
"Task[%s] already acquired a lock for interval[%s] but different type[%s]",
task.getId(),
interval,
foundPosse.getTaskLock().getType()
);
}
return null;
}
} else {
// No existing locks. We can make a new one.
if (!running.containsKey(dataSource)) {
running.put(dataSource, new TreeMap<Interval, TaskLockPosse>(Comparators.intervalsByStartThenEnd()));
}
// Create new TaskLock and assign it a version.
// Assumption: We'll choose a version that is greater than any previously-chosen version for our interval. (This
// may not always be true, unfortunately. See below.)
final String version;
if (preferredVersion.isPresent()) {
// We have a preferred version. We'll trust our caller to not break our ordering assumptions and just use it.
version = preferredVersion.get();
} else {
// We are running under an interval lock right now, so just using the current time works as long as we can
// trust our clock to be monotonic and have enough resolution since the last time we created a TaskLock for
// the same interval. This may not always be true; to assure it we would need to use some method of
// timekeeping other than the wall clock.
version = DateTimes.nowUtc().toString();
// case 3) we found multiple lock posses for the given task
throw new ISE(
"Task group[%s] has multiple locks for the same interval[%s]?",
task.getGroupId(),
interval
);
}
} else {
// We don't have any locks for dataSource and interval.
// Let's make a new one.
return createNewTaskLockPosse(
lockType,
task.getGroupId(),
dataSource,
interval,
preferredVersion,
priority
);
}
}
finally {
giant.unlock();
}
}
posseToUse = new TaskLockPosse(new TaskLock(task.getGroupId(), dataSource, interval, version));
running.get(dataSource)
.put(interval, posseToUse);
/**
* Create a new {@link TaskLockPosse} for a new {@link TaskLock}. This method will attempt to assign version strings
* that obey the invariant that every version string is lexicographically greater than any other version string
* previously assigned to the same interval. This invariant is only mostly guaranteed, however; we assume clock
* monotonicity and that callers specifying {@code preferredVersion} are doing the right thing.
*
* @param lockType lock type
* @param groupId group id of task
* @param dataSource data source of task
* @param interval interval to be locked
* @param preferredVersion preferred version string
* @param priority lock priority
*
* @return a new {@link TaskLockPosse}
*/
private TaskLockPosse createNewTaskLockPosse(
TaskLockType lockType,
String groupId,
String dataSource,
Interval interval,
@Nullable String preferredVersion,
int priority
)
{
giant.lock();
try {
// Create new TaskLock and assign it a version.
// Assumption: We'll choose a version that is greater than any previously-chosen version for our interval. (This
// may not always be true, unfortunately. See below.)
log.info("Created new TaskLockPosse: %s", posseToUse);
final String version;
if (preferredVersion != null) {
// We have a preferred version. We'll trust our caller to not break our ordering assumptions and just use it.
version = preferredVersion;
} else {
// We are running under an interval lock right now, so just using the current time works as long as we can
// trustour clock to be monotonic and have enough resolution since the last time we created a TaskLock for
// the same interval. This may not always be true; to assure it we would need to use some method of
// timekeeping other than the wall clock.
version = DateTimes.nowUtc().toString();
}
final TaskLockPosse posseToUse = new TaskLockPosse(
new TaskLock(lockType, groupId, dataSource, interval, version, priority)
);
running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
.computeIfAbsent(interval, k -> new ArrayList<>())
.add(posseToUse);
return posseToUse;
}
finally {
@ -377,6 +492,104 @@ public class TaskLockbox
}
}
/**
* Perform the given action with a guarantee that the locks of the task are not revoked in the middle of action. This
* method first checks that all locks for the given task and intervals are valid and perform the right action.
*
* The given action should be finished as soon as possible because all other methods in this class are blocked until
* this method is finished.
*
* @param task task performing a critical action
* @param intervals intervals
* @param action action to be performed inside of the critical section
*/
public <T> T doInCriticalSection(
Task task,
List<Interval> intervals,
CriticalAction<T> action
) throws Exception
{
giant.lockInterruptibly();
try {
return action.perform(isTaskLocksValid(task, intervals));
}
finally {
giant.unlock();
}
}
private boolean isTaskLocksValid(Task task, List<Interval> intervals)
{
return intervals
.stream()
.allMatch(interval -> {
final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock();
// Tasks cannot enter the critical section with a shared lock
return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
});
}
private void revokeLock(TaskLockPosse lockPosse)
{
giant.lock();
try {
lockPosse.forEachTask(taskId -> revokeLock(taskId, lockPosse.getTaskLock()));
}
finally {
giant.unlock();
}
}
/**
* Mark the lock as revoked. Note that revoked locks are NOT removed. Instead, they are maintained in {@link #running}
* and {@link #taskStorage} as the normal locks do. This is to check locks are revoked when they are requested to be
* acquired and notify to the callers if revoked. Revoked locks are removed by calling
* {@link #unlock(Task, Interval)}.
*
* @param taskId an id of the task holding the lock
* @param lock lock to be revoked
*/
private void revokeLock(String taskId, TaskLock lock)
{
giant.lock();
try {
if (!activeTasks.contains(taskId)) {
throw new ISE("Cannot revoke lock for inactive task[%s]", taskId);
}
final Task task = taskStorage.getTask(taskId).orNull();
if (task == null) {
throw new ISE("Cannot revoke lock for unknown task[%s]", taskId);
}
log.info("Revoking task lock[%s] for task[%s]", lock, taskId);
if (lock.isRevoked()) {
log.warn("TaskLock[%s] is already revoked", lock);
} else {
final TaskLock revokedLock = lock.revokedCopy();
taskStorage.replaceLock(taskId, lock, revokedLock);
final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval());
final TaskLockPosse foundPosse = possesHolder.stream()
.filter(posse -> posse.getTaskLock().equals(lock))
.findFirst()
.orElseThrow(
() -> new ISE("Failed to find lock posse for lock[%s]", lock)
);
possesHolder.remove(foundPosse);
possesHolder.add(foundPosse.withTaskLock(revokedLock));
log.info("Revoked taskLock[%s]", lock);
}
}
finally {
giant.unlock();
}
}
/**
* Return the currently-active locks for some task.
*
@ -417,52 +630,63 @@ public class TaskLockbox
try {
final String dataSource = task.getDataSource();
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
// So we can alert if activeTasks try to release stuff they don't have
boolean removed = false;
if (dsRunning != null) {
final TaskLockPosse taskLockPosse = dsRunning.get(interval);
if (taskLockPosse != null) {
final TaskLock taskLock = taskLockPosse.getTaskLock();
// Remove task from live list
log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock.getGroupId());
removed = taskLockPosse.getTaskIds().remove(task.getId());
if (taskLockPosse.getTaskIds().isEmpty()) {
log.info("TaskLock is now empty: %s", taskLock);
running.get(dataSource).remove(taskLock.getInterval());
}
if (running.get(dataSource).size() == 0) {
running.remove(dataSource);
}
// Wake up blocking-lock waiters
lockReleaseCondition.signalAll();
// Remove lock from storage. If it cannot be removed, just ignore the failure.
try {
taskStorage.removeLock(task.getId(), taskLock);
}
catch (Exception e) {
log.makeAlert(e, "Failed to clean up lock from storage")
.addData("task", task.getId())
.addData("dataSource", taskLock.getDataSource())
.addData("interval", taskLock.getInterval())
.addData("version", taskLock.getVersion())
.emit();
}
}
if (dsRunning == null || dsRunning.isEmpty()) {
return;
}
if (!removed) {
log.makeAlert("Lock release without acquire")
.addData("task", task.getId())
.addData("interval", interval)
.emit();
final List<TaskLockPosse > possesHolder = dsRunning.get(interval);
if (possesHolder == null || possesHolder.isEmpty()) {
return;
}
final List<TaskLockPosse> posses = possesHolder.stream()
.filter(posse -> posse.containsTask(task))
.collect(Collectors.toList());
for (TaskLockPosse taskLockPosse : posses) {
final TaskLock taskLock = taskLockPosse.getTaskLock();
// Remove task from live list
log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock.getGroupId());
final boolean removed = taskLockPosse.removeTask(task);
if (taskLockPosse.isTasksEmpty()) {
log.info("TaskLock is now empty: %s", taskLock);
possesHolder.remove(taskLockPosse);
}
if (possesHolder.size() == 0) {
dsRunning.remove(interval);
}
if (running.get(dataSource).size() == 0) {
running.remove(dataSource);
}
// Wake up blocking-lock waiters
lockReleaseCondition.signalAll();
// Remove lock from storage. If it cannot be removed, just ignore the failure.
try {
taskStorage.removeLock(task.getId(), taskLock);
}
catch (Exception e) {
log.makeAlert(e, "Failed to clean up lock from storage")
.addData("task", task.getId())
.addData("dataSource", taskLock.getDataSource())
.addData("interval", taskLock.getInterval())
.addData("version", taskLock.getVersion())
.emit();
}
if (!removed) {
log.makeAlert("Lock release without acquire")
.addData("task", task.getId())
.addData("interval", interval)
.emit();
}
}
}
finally {
@ -504,19 +728,31 @@ public class TaskLockbox
giant.lock();
try {
final Iterable<TaskLockPosse> searchSpace;
// Scan through all locks for this datasource
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(task.getDataSource());
final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
if (dsRunning == null) {
searchSpace = ImmutableList.of();
return ImmutableList.of();
} else {
searchSpace = dsRunning.values();
return dsRunning.values().stream()
.flatMap(Collection::stream)
.filter(taskLockPosse -> taskLockPosse.containsTask(task))
.collect(Collectors.toList());
}
}
finally {
giant.unlock();
}
}
return ImmutableList.copyOf(
Iterables.filter(searchSpace, taskLock -> taskLock.getTaskIds().contains(task.getId()))
);
private List<TaskLockPosse> findLockPossesContainingInterval(final String dataSource, final Interval interval)
{
giant.lock();
try {
final List<TaskLockPosse> intervalOverlapsPosses = findLockPossesOverlapsInterval(dataSource, interval);
return intervalOverlapsPosses.stream()
.filter(taskLockPosse -> taskLockPosse.taskLock.getInterval().contains(interval))
.collect(Collectors.toList());
}
finally {
giant.unlock();
@ -526,12 +762,12 @@ public class TaskLockbox
/**
* Return all locks that overlap some search interval.
*/
private List<TaskLockPosse> findLockPossesForInterval(final String dataSource, final Interval interval)
private List<TaskLockPosse> findLockPossesOverlapsInterval(final String dataSource, final Interval interval)
{
giant.lock();
try {
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(dataSource);
if (dsRunning == null) {
// No locks at all
return Collections.emptyList();
@ -551,30 +787,10 @@ public class TaskLockbox
)
);
return Lists.newArrayList(
FunctionalIterable
.create(searchIntervals)
.filter(
new Predicate<Interval>()
{
@Override
public boolean apply(@Nullable Interval searchInterval)
{
return searchInterval != null && searchInterval.overlaps(interval);
}
}
)
.transform(
new Function<Interval, TaskLockPosse>()
{
@Override
public TaskLockPosse apply(Interval interval)
{
return dsRunning.get(interval);
}
}
)
);
return StreamSupport.stream(searchIntervals.spliterator(), false)
.filter(searchInterval -> searchInterval != null && searchInterval.overlaps(interval))
.flatMap(searchInterval -> dsRunning.get(searchInterval).stream())
.collect(Collectors.toList());
}
}
finally {
@ -594,6 +810,45 @@ public class TaskLockbox
}
}
private static boolean matchGroupIdAndContainInterval(TaskLock existingLock, Task task, Interval interval)
{
return existingLock.getInterval().contains(interval) &&
existingLock.getGroupId().equals(task.getGroupId());
}
private static boolean isAllSharedLocks(List<TaskLockPosse> lockPosses)
{
return lockPosses.stream()
.allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED));
}
private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
{
return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
}
private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
{
final TaskLock existingLock = lockPosse.getTaskLock();
return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority;
}
private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
{
final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
.stream()
.filter(lockPosse -> lockPosse.containsTask(task))
.collect(Collectors.toList());
if (filteredPosses.isEmpty()) {
throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
} else if (filteredPosses.size() > 1) {
throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
} else {
return filteredPosses.get(0);
}
}
@VisibleForTesting
Set<String> getActiveTasks()
{
@ -601,7 +856,7 @@ public class TaskLockbox
}
@VisibleForTesting
Map<String, NavigableMap<Interval, TaskLockPosse>> getAllLocks()
public Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
{
return running;
}
@ -611,20 +866,56 @@ public class TaskLockbox
final private TaskLock taskLock;
final private Set<String> taskIds;
public TaskLockPosse(TaskLock taskLock)
TaskLockPosse(TaskLock taskLock)
{
this.taskLock = taskLock;
taskIds = Sets.newHashSet();
this.taskIds = new HashSet<>();
}
public TaskLock getTaskLock()
private TaskLockPosse(TaskLock taskLock, Set<String> taskIds)
{
this.taskLock = taskLock;
this.taskIds = new HashSet<>(taskIds);
}
TaskLockPosse withTaskLock(TaskLock taskLock)
{
return new TaskLockPosse(taskLock, taskIds);
}
TaskLock getTaskLock()
{
return taskLock;
}
public Set<String> getTaskIds()
boolean addTask(Task task)
{
return taskIds;
Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId()));
Preconditions.checkArgument(taskLock.getPriority() == task.getPriority());
return taskIds.add(task.getId());
}
boolean containsTask(Task task)
{
Preconditions.checkNotNull(task, "task");
return taskIds.contains(task.getId());
}
boolean removeTask(Task task)
{
Preconditions.checkNotNull(task, "task");
return taskIds.remove(task.getId());
}
boolean isTasksEmpty()
{
return taskIds.isEmpty();
}
void forEachTask(Consumer<String> action)
{
Preconditions.checkNotNull(action);
taskIds.forEach(action);
}
@Override

View File

@ -37,7 +37,7 @@ public interface TaskStorage
* @param status task status
* @throws EntryExistsException if the task ID already exists
*/
public void insert(Task task, TaskStatus status) throws EntryExistsException;
void insert(Task task, TaskStatus status) throws EntryExistsException;
/**
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle
@ -45,14 +45,23 @@ public interface TaskStorage
*
* @param status task status
*/
public void setStatus(TaskStatus status);
void setStatus(TaskStatus status);
/**
* Persists lock state in the storage facility.
* @param taskid task ID
* @param taskLock lock state
*/
public void addLock(String taskid, TaskLock taskLock);
void addLock(String taskid, TaskLock taskLock);
/**
* Replace the old lock with the new lock. This method is not thread-safe.
*
* @param taskid an id of the task holding the old lock and new lock
* @param oldLock old lock
* @param newLock new lock
*/
void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock);
/**
* Removes lock state from the storage facility. It is harmless to keep old locks in the storage facility, but
@ -61,7 +70,7 @@ public interface TaskStorage
* @param taskid task ID
* @param taskLock lock state
*/
public void removeLock(String taskid, TaskLock taskLock);
void removeLock(String taskid, TaskLock taskLock);
/**
* Returns task as stored in the storage facility. If the task ID does not exist, this will return an
@ -72,7 +81,7 @@ public interface TaskStorage
* @param taskid task ID
* @return optional task
*/
public Optional<Task> getTask(String taskid);
Optional<Task> getTask(String taskid);
/**
* Returns task status as stored in the storage facility. If the task ID does not exist, this will return
@ -81,7 +90,7 @@ public interface TaskStorage
* @param taskid task ID
* @return task status
*/
public Optional<TaskStatus> getStatus(String taskid);
Optional<TaskStatus> getStatus(String taskid);
/**
* Add an action taken by a task to the audit log.
@ -91,7 +100,7 @@ public interface TaskStorage
*
* @param <T> task action return type
*/
public <T> void addAuditLog(Task task, TaskAction<T> taskAction);
<T> void addAuditLog(Task task, TaskAction<T> taskAction);
/**
* Returns all actions taken by a task.
@ -99,7 +108,7 @@ public interface TaskStorage
* @param taskid task ID
* @return list of task actions
*/
public List<TaskAction> getAuditLogs(String taskid);
List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently running or pending tasks as stored in the storage facility. No particular order
@ -107,7 +116,7 @@ public interface TaskStorage
*
* @return list of active tasks
*/
public List<Task> getActiveTasks();
List<Task> getActiveTasks();
/**
* Returns a list of recently finished task statuses as stored in the storage facility. No particular order
@ -116,7 +125,7 @@ public interface TaskStorage
*
* @return list of recently finished tasks
*/
public List<TaskStatus> getRecentlyFinishedTaskStatuses();
List<TaskStatus> getRecentlyFinishedTaskStatuses();
/**
* Returns a list of locks for a particular task.
@ -124,5 +133,5 @@ public interface TaskStorage
* @param taskid task ID
* @return list of TaskLocks for the given task
*/
public List<TaskLock> getLocks(String taskid);
List<TaskLock> getLocks(String taskid);
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
public class LockAcquireActionTest
{
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
private final ObjectMapper mapper = new DefaultObjectMapper();
@Test
public void testSerdeWithAllFields() throws IOException
{
final LockAcquireAction expected = new LockAcquireAction(
TaskLockType.SHARED,
Intervals.of("2017-01-01/2017-01-02"),
1000
);
final byte[] bytes = mapper.writeValueAsBytes(expected);
final LockAcquireAction actual = mapper.readValue(bytes, LockAcquireAction.class);
Assert.assertEquals(expected.getType(), actual.getType());
Assert.assertEquals(expected.getInterval(), actual.getInterval());
Assert.assertEquals(expected.getTimeoutMs(), actual.getTimeoutMs());
}
@Test
public void testSerdeFromJsonWithMissingFields() throws IOException
{
final String json = "{ \"type\": \"lockAcquire\", \"interval\" : \"2017-01-01/2017-01-02\" }";
final LockAcquireAction actual = mapper.readValue(json, LockAcquireAction.class);
final LockAcquireAction expected = new LockAcquireAction(
TaskLockType.EXCLUSIVE,
Intervals.of("2017-01-01/2017-01-02"),
0
);
Assert.assertEquals(expected.getType(), actual.getType());
Assert.assertEquals(expected.getInterval(), actual.getInterval());
Assert.assertEquals(expected.getTimeoutMs(), actual.getTimeoutMs());
}
@Test(timeout = 5000L)
public void testWithLockType()
{
final Task task = NoopTask.create();
final LockAcquireAction action = new LockAcquireAction(
TaskLockType.EXCLUSIVE,
Intervals.of("2017-01-01/2017-01-02"),
1000
);
actionTestKit.getTaskLockbox().add(task);
final TaskLock lock = action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertNotNull(lock);
}
@Test(timeout = 5000L)
public void testWithoutLockType()
{
final Task task = NoopTask.create();
final LockAcquireAction action = new LockAcquireAction(
null,
Intervals.of("2017-01-01/2017-01-02"),
1000
);
actionTestKit.getTaskLockbox().add(task);
final TaskLock lock = action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertNotNull(lock);
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
public class LockTryAcquireActionTest
{
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
private final ObjectMapper mapper = new DefaultObjectMapper();
@Test
public void testSerdeWithAllFields() throws IOException
{
final LockTryAcquireAction expected = new LockTryAcquireAction(
TaskLockType.SHARED,
Intervals.of("2017-01-01/2017-01-02")
);
final byte[] bytes = mapper.writeValueAsBytes(expected);
final LockTryAcquireAction actual = mapper.readValue(bytes, LockTryAcquireAction.class);
Assert.assertEquals(expected.getType(), actual.getType());
Assert.assertEquals(expected.getInterval(), actual.getInterval());
}
@Test
public void testSerdeFromJsonWithMissingFields() throws IOException
{
final String json = "{ \"type\": \"lockTryAcquire\", \"interval\" : \"2017-01-01/2017-01-02\" }";
final LockTryAcquireAction actual = mapper.readValue(json, LockTryAcquireAction.class);
final LockTryAcquireAction expected = new LockTryAcquireAction(
TaskLockType.EXCLUSIVE,
Intervals.of("2017-01-01/2017-01-02")
);
Assert.assertEquals(expected.getType(), actual.getType());
Assert.assertEquals(expected.getInterval(), actual.getInterval());
}
@Test(timeout = 5000L)
public void testWithLockType()
{
final Task task = NoopTask.create();
final LockTryAcquireAction action = new LockTryAcquireAction(
TaskLockType.EXCLUSIVE,
Intervals.of("2017-01-01/2017-01-02")
);
actionTestKit.getTaskLockbox().add(task);
final TaskLock lock = action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertNotNull(lock);
}
@Test(timeout = 5000L)
public void testWithoutLockType()
{
final Task task = NoopTask.create();
final LockTryAcquireAction action = new LockTryAcquireAction(
null,
Intervals.of("2017-01-01/2017-01-02")
);
actionTestKit.getTaskLockbox().add(task);
final TaskLock lock = action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertNotNull(lock);
}
}

View File

@ -26,6 +26,7 @@ import io.druid.discovery.DruidLeaderClient;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
@ -62,10 +63,12 @@ public class RemoteTaskActionClientTest
long now = System.currentTimeMillis();
result = Collections.singletonList(new TaskLock(
TaskLockType.SHARED,
"groupId",
"dataSource",
Intervals.utc(now - 30 * 1000, now),
"version"
"version",
0
));
}

View File

@ -22,8 +22,10 @@ package io.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.CriticalAction;
import io.druid.java.util.common.Intervals;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
@ -34,6 +36,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.Set;
public class SegmentInsertActionTest
@ -91,8 +94,20 @@ public class SegmentInsertActionTest
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, INTERVAL, 5000);
action.perform(task, actionTestKit.getTaskActionToolbox());
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singletonList(INTERVAL),
CriticalAction.builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
Assert.assertEquals(
ImmutableSet.of(SEGMENT1, SEGMENT2),
@ -109,11 +124,24 @@ public class SegmentInsertActionTest
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));
final Set<DataSegment> segments = action.perform(task, actionTestKit.getTaskActionToolbox());
final Set<DataSegment> segments = actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singletonList(INTERVAL),
CriticalAction.<Set<DataSegment>>builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
Assert.assertEquals(ImmutableSet.of(SEGMENT3), segments);
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ObjectMetadata;
@ -90,7 +91,7 @@ public class SegmentTransactionalInsertActionTest
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
SegmentPublishResult result1 = new SegmentTransactionalInsertAction(
ImmutableSet.of(SEGMENT1),
@ -131,7 +132,7 @@ public class SegmentTransactionalInsertActionTest
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
SegmentPublishResult result = new SegmentTransactionalInsertAction(
ImmutableSet.of(SEGMENT1),
@ -151,7 +152,7 @@ public class SegmentTransactionalInsertActionTest
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));

View File

@ -0,0 +1,104 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TaskActionPreconditionsTest
{
private TaskLockbox lockbox;
private Task task;
private Set<DataSegment> segments;
@Before
public void setup()
{
lockbox = new TaskLockbox(new HeapMemoryTaskStorage(new TaskStorageConfig(null)));
task = NoopTask.create();
lockbox.add(task);
segments = ImmutableSet.of(
new DataSegment.Builder()
.dataSource(task.getDataSource())
.interval(Intervals.of("2017-01-01/2017-01-02"))
.version(DateTimes.nowUtc().toString())
.shardSpec(new LinearShardSpec(2))
.build(),
new DataSegment.Builder()
.dataSource(task.getDataSource())
.interval(Intervals.of("2017-01-02/2017-01-03"))
.version(DateTimes.nowUtc().toString())
.shardSpec(new LinearShardSpec(2))
.build(),
new DataSegment.Builder()
.dataSource(task.getDataSource())
.interval(Intervals.of("2017-01-03/2017-01-04"))
.version(DateTimes.nowUtc().toString())
.shardSpec(new LinearShardSpec(2))
.build()
);
}
@Test
public void testCheckLockCoversSegments() throws Exception
{
final List<Interval> intervals = ImmutableList.of(
Intervals.of("2017-01-01/2017-01-02"),
Intervals.of("2017-01-02/2017-01-03"),
Intervals.of("2017-01-03/2017-01-04")
);
final Map<Interval, TaskLock> locks = intervals.stream().collect(
Collectors.toMap(
Function.identity(),
interval -> {
final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval).getTaskLock();
Assert.assertNotNull(lock);
return lock;
}
)
);
Assert.assertEquals(3, locks.size());
Assert.assertTrue(TaskActionPreconditions.isLockCoversSegments(task, lockbox, segments));
}
}

View File

@ -30,10 +30,12 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentAllocateAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
@ -228,7 +230,7 @@ public class IndexTaskTest
null,
new ArbitraryGranularitySpec(
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014/2015"))
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
createTuningConfig(10, null, false, true),
false
@ -249,8 +251,8 @@ public class IndexTaskTest
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2015-03-01T07:59:59.977Z,a,1\n");
writer.write("2015-03-01T08:00:00.000Z,b,1\n");
writer.write("2014-01-01T07:59:59.977Z,a,1\n");
writer.write("2014-01-01T08:00:00.000Z,b,1\n");
}
IndexTask indexTask = new IndexTask(
@ -262,7 +264,7 @@ public class IndexTaskTest
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.HOUR,
Collections.singletonList(Intervals.of("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
),
createTuningConfig(50, null, false, true),
false
@ -883,100 +885,121 @@ public class IndexTaskTest
{
final List<DataSegment> segments = Lists.newArrayList();
indexTask.run(
new TaskToolbox(
null,
new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Collections.singletonList(
new TaskLock(
"", "", null, DateTimes.nowUtc().toString()
)
);
}
final TaskActionClient actionClient = new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Collections.singletonList(
new TaskLock(
TaskLockType.EXCLUSIVE,
"",
"",
Intervals.of("2014/P1Y"), DateTimes.nowUtc().toString(),
Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY
)
);
}
if (taskAction instanceof LockAcquireAction) {
return (RetType) new TaskLock(
"groupId",
"test",
((LockAcquireAction) taskAction).getInterval(),
DateTimes.nowUtc().toString()
);
}
if (taskAction instanceof LockAcquireAction) {
return (RetType) new TaskLock(
TaskLockType.EXCLUSIVE, "groupId",
"test",
((LockAcquireAction) taskAction).getInterval(),
DateTimes.nowUtc().toString(),
Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY
);
}
if (taskAction instanceof SegmentTransactionalInsertAction) {
return (RetType) new SegmentPublishResult(
((SegmentTransactionalInsertAction) taskAction).getSegments(),
true
);
}
if (taskAction instanceof LockTryAcquireAction) {
return (RetType) new TaskLock(
TaskLockType.EXCLUSIVE,
"groupId",
"test",
((LockTryAcquireAction) taskAction).getInterval(),
DateTimes.nowUtc().toString(),
Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY
);
}
if (taskAction instanceof SegmentAllocateAction) {
SegmentAllocateAction action = (SegmentAllocateAction) taskAction;
Interval interval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp());
ShardSpec shardSpec = new NumberedShardSpec(segmentAllocatePartitionCounter++, 0);
return (RetType) new SegmentIdentifier(action.getDataSource(), interval, "latestVersion", shardSpec);
}
if (taskAction instanceof SegmentTransactionalInsertAction) {
return (RetType) new SegmentPublishResult(
((SegmentTransactionalInsertAction) taskAction).getSegments(),
true
);
}
return null;
}
},
null,
new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
if (taskAction instanceof SegmentAllocateAction) {
SegmentAllocateAction action = (SegmentAllocateAction) taskAction;
Interval interval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp());
ShardSpec shardSpec = new NumberedShardSpec(segmentAllocatePartitionCounter++, 0);
return (RetType) new SegmentIdentifier(action.getDataSource(), interval, "latestVersion", shardSpec);
}
@Override
public String getPathForHadoop()
{
return null;
}
return null;
}
};
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
segments.add(segment);
return segment;
}
final DataSegmentPusher pusher = new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
},
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
jsonMapper,
temporaryFolder.newFolder(),
indexIO,
null,
null,
indexMergerV9,
null,
null,
null,
null
)
@Override
public String getPathForHadoop()
{
return null;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
segments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
};
final TaskToolbox box = new TaskToolbox(
null,
actionClient,
null,
pusher,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
jsonMapper,
temporaryFolder.newFolder(),
indexIO,
null,
null,
indexMergerV9,
null,
null,
null,
null
);
indexTask.isReady(box.getTaskActionClient());
indexTask.run(box);
Collections.sort(segments);
return segments;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LockListAction;
@ -124,10 +125,12 @@ public class SameIntervalMergeTaskTest
Assert.assertEquals(mergeTask.getInterval(), ((LockTryAcquireAction) taskAction).getInterval());
isRedayCountDown.countDown();
taskLock = new TaskLock(
TaskLockType.EXCLUSIVE,
mergeTask.getGroupId(),
mergeTask.getDataSource(),
mergeTask.getInterval(),
version
version,
Tasks.DEFAULT_TASK_PRIORITY
);
return (RetType) taskLock;
}

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
@ -73,7 +74,10 @@ public class RealtimeishTask extends AbstractTask
// Sort of similar to what realtime tasks do:
// Acquire lock for first interval
final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1, 5000));
final TaskLock lock1 = toolbox.getTaskActionClient().submit(
new LockAcquireAction(TaskLockType.EXCLUSIVE, interval1, 5000)
);
Assert.assertNotNull(lock1);
final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
@ -81,7 +85,10 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1);
// Acquire lock for second interval
final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2, 5000));
final TaskLock lock2 = toolbox.getTaskActionClient().submit(
new LockAcquireAction(TaskLockType.EXCLUSIVE, interval2, 5000)
);
Assert.assertNotNull(lock2);
final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)

View File

@ -0,0 +1,233 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.druid.common.guava.SettableSupplier;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TaskLockBoxConcurrencyTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper objectMapper = new DefaultObjectMapper();
private ExecutorService service;
private TaskStorage taskStorage;
private TaskLockbox lockbox;
@Before
public void setup()
{
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createTaskTables();
taskStorage = new MetadataTaskStorage(
derbyConnector,
new TaskStorageConfig(null),
new SQLMetadataStorageActionHandlerFactory(
derbyConnector,
derby.metadataTablesConfigSupplier().get(),
objectMapper
)
);
lockbox = new TaskLockbox(taskStorage);
service = Executors.newFixedThreadPool(2);
}
@After
public void teardown()
{
service.shutdownNow();
}
@Test(timeout = 5000L)
public void testDoInCriticalSectionWithDifferentTasks()
throws ExecutionException, InterruptedException, EntryExistsException
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task lowPriorityTask = NoopTask.create(10);
final Task highPriorityTask = NoopTask.create(100);
lockbox.add(lowPriorityTask);
lockbox.add(highPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
final SettableSupplier<Integer> intSupplier = new SettableSupplier<>(0);
final CountDownLatch latch = new CountDownLatch(1);
// lowPriorityTask acquires a lock first and increases the int of intSupplier in the critical section
final Future<Integer> lowPriorityFuture = service.submit(() -> {
final LockResult result = lockbox.tryLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval);
Assert.assertTrue(result.isOk());
Assert.assertFalse(result.isRevoked());
return lockbox.doInCriticalSection(
lowPriorityTask,
Collections.singletonList(interval),
CriticalAction.<Integer>builder()
.onValidLocks(
() -> {
latch.countDown();
Thread.sleep(100);
intSupplier.set(intSupplier.get() + 1);
return intSupplier.get();
}
)
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
});
// highPriorityTask awaits for the latch, acquires a lock, and increases the int of intSupplier in the critical
// section
final Future<Integer> highPriorityFuture = service.submit(() -> {
latch.await();
final LockResult result = lockbox.lock(TaskLockType.EXCLUSIVE, highPriorityTask, interval);
Assert.assertTrue(result.isOk());
Assert.assertFalse(result.isRevoked());
return lockbox.doInCriticalSection(
highPriorityTask,
Collections.singletonList(interval),
CriticalAction.<Integer>builder()
.onValidLocks(
() -> {
Thread.sleep(100);
intSupplier.set(intSupplier.get() + 1);
return intSupplier.get();
}
)
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
});
Assert.assertEquals(1, lowPriorityFuture.get().intValue());
Assert.assertEquals(2, highPriorityFuture.get().intValue());
// the lock for lowPriorityTask must be revoked by the highPriorityTask after its work is done in critical section
final LockResult result = lockbox.tryLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval);
Assert.assertFalse(result.isOk());
Assert.assertTrue(result.isRevoked());
}
@Test(timeout = 5000L)
public void testDoInCriticalSectionWithOverlappedIntervals() throws Exception
{
final List<Interval> intervals = ImmutableList.of(
Intervals.of("2017-01-01/2017-01-02"),
Intervals.of("2017-01-02/2017-01-03"),
Intervals.of("2017-01-03/2017-01-04")
);
final Task task = NoopTask.create();
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
for (Interval interval : intervals) {
final LockResult result = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval);
Assert.assertTrue(result.isOk());
}
final SettableSupplier<Integer> intSupplier = new SettableSupplier<>(0);
final CountDownLatch latch = new CountDownLatch(1);
final Future<Integer> future1 = service.submit(() -> lockbox.doInCriticalSection(
task,
ImmutableList.of(intervals.get(0), intervals.get(1)),
CriticalAction.<Integer>builder()
.onValidLocks(
() -> {
latch.countDown();
Thread.sleep(100);
intSupplier.set(intSupplier.get() + 1);
return intSupplier.get();
}
)
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
));
final Future<Integer> future2 = service.submit(() -> {
latch.await();
return lockbox.doInCriticalSection(
task,
ImmutableList.of(intervals.get(1), intervals.get(2)),
CriticalAction.<Integer>builder()
.onValidLocks(
() -> {
Thread.sleep(100);
intSupplier.set(intSupplier.get() + 1);
return intSupplier.get();
}
)
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
});
Assert.assertEquals(1, future1.get().intValue());
Assert.assertEquals(2, future2.get().intValue());
}
}

View File

@ -20,11 +20,11 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
@ -37,13 +37,18 @@ import io.druid.metadata.EntryExistsException;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class TaskLockboxTest
@ -51,6 +56,9 @@ public class TaskLockboxTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper objectMapper = new DefaultObjectMapper();
private TaskStorage taskStorage;
private TaskLockbox lockbox;
@ -84,13 +92,13 @@ public class TaskLockboxTest
{
Task task = NoopTask.create();
lockbox.add(task);
Assert.assertNotNull(lockbox.lock(task, Intervals.of("2015-01-01/2015-01-02")));
Assert.assertNotNull(lockbox.lock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")));
}
@Test(expected = IllegalStateException.class)
public void testLockForInactiveTask() throws InterruptedException
{
lockbox.lock(NoopTask.create(), Intervals.of("2015-01-01/2015-01-02"));
lockbox.lock(TaskLockType.EXCLUSIVE, NoopTask.create(), Intervals.of("2015-01-01/2015-01-02"));
}
@Test
@ -101,57 +109,97 @@ public class TaskLockboxTest
exception.expectMessage("Unable to grant lock to inactive Task");
lockbox.add(task);
lockbox.remove(task);
lockbox.lock(task, Intervals.of("2015-01-01/2015-01-02"));
lockbox.lock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02"));
}
@Test
public void testTryLock()
public void testTrySharedLock()
{
final Interval interval = Intervals.of("2017-01/2017-02");
final List<Task> tasks = new ArrayList<>();
final Set<TaskLock> actualLocks = new HashSet<>();
// test creating new locks
for (int i = 0; i < 5; i++) {
final Task task = NoopTask.create(Math.min(0, (i - 1) * 10)); // the first two tasks have the same priority
tasks.add(task);
lockbox.add(task);
final TaskLock lock = lockbox.tryLock(TaskLockType.SHARED, task, interval).getTaskLock();
Assert.assertNotNull(lock);
actualLocks.add(lock);
}
Assert.assertEquals(5, getAllLocks(tasks).size());
Assert.assertEquals(getAllLocks(tasks), actualLocks);
}
@Test
public void testTryMixedLocks() throws EntryExistsException
{
final Task lowPriorityTask = NoopTask.create(0);
final Task lowPriorityTask2 = NoopTask.create(0);
final Task highPiorityTask = NoopTask.create(10);
final Interval interval1 = Intervals.of("2017-01-01/2017-01-02");
final Interval interval2 = Intervals.of("2017-01-02/2017-01-03");
final Interval interval3 = Intervals.of("2017-01-03/2017-01-04");
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
taskStorage.insert(lowPriorityTask2, TaskStatus.running(lowPriorityTask2.getId()));
taskStorage.insert(highPiorityTask, TaskStatus.running(highPiorityTask.getId()));
lockbox.add(lowPriorityTask);
lockbox.add(lowPriorityTask2);
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval1).isOk());
Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, lowPriorityTask, interval2).isOk());
Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, lowPriorityTask2, interval2).isOk());
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval3).isOk());
lockbox.add(highPiorityTask);
Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, highPiorityTask, interval1).isOk());
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval2).isOk());
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval3).isOk());
Assert.assertTrue(lockbox.findLocksForTask(lowPriorityTask).stream().allMatch(TaskLock::isRevoked));
Assert.assertTrue(lockbox.findLocksForTask(lowPriorityTask2).stream().allMatch(TaskLock::isRevoked));
lockbox.remove(lowPriorityTask);
lockbox.remove(lowPriorityTask2);
lockbox.remove(highPiorityTask);
lockbox.add(highPiorityTask);
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval1).isOk());
Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, highPiorityTask, interval2).isOk());
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, highPiorityTask, interval3).isOk());
lockbox.add(lowPriorityTask);
Assert.assertFalse(lockbox.tryLock(TaskLockType.SHARED, lowPriorityTask, interval1).isOk());
Assert.assertFalse(lockbox.tryLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval2).isOk());
Assert.assertFalse(lockbox.tryLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval3).isOk());
}
@Test
public void testTryExclusiveLock()
{
Task task = NoopTask.create();
lockbox.add(task);
Assert.assertTrue(lockbox.tryLock(task, Intervals.of("2015-01-01/2015-01-03")).isPresent());
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-03")).isOk());
// try to take lock for task 2 for overlapping interval
Task task2 = NoopTask.create();
lockbox.add(task2);
Assert.assertFalse(lockbox.tryLock(task2, Intervals.of("2015-01-01/2015-01-02")).isPresent());
Assert.assertFalse(lockbox.tryLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2015-01-01/2015-01-02")).isOk());
// task 1 unlocks the lock
lockbox.remove(task);
// Now task2 should be able to get the lock
Assert.assertTrue(lockbox.tryLock(task2, Intervals.of("2015-01-01/2015-01-02")).isPresent());
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2015-01-01/2015-01-02")).isOk());
}
@Test
public void testTrySmallerLock()
{
Task task = NoopTask.create();
lockbox.add(task);
Optional<TaskLock> lock1 = lockbox.tryLock(task, Intervals.of("2015-01-01/2015-01-03"));
Assert.assertTrue(lock1.isPresent());
Assert.assertEquals(Intervals.of("2015-01-01/2015-01-03"), lock1.get().getInterval());
// same task tries to take partially overlapping interval; should fail
Assert.assertFalse(lockbox.tryLock(task, Intervals.of("2015-01-02/2015-01-04")).isPresent());
// same task tries to take contained interval; should succeed and should match the original lock
Optional<TaskLock> lock2 = lockbox.tryLock(task, Intervals.of("2015-01-01/2015-01-02"));
Assert.assertTrue(lock2.isPresent());
Assert.assertEquals(Intervals.of("2015-01-01/2015-01-03"), lock2.get().getInterval());
// only the first lock should actually exist
Assert.assertEquals(
ImmutableList.of(lock1.get()),
lockbox.findLocksForTask(task)
);
}
@Test(expected = IllegalStateException.class)
public void testTryLockForInactiveTask()
{
Assert.assertFalse(lockbox.tryLock(NoopTask.create(), Intervals.of("2015-01-01/2015-01-02")).isPresent());
Assert.assertFalse(lockbox.tryLock(TaskLockType.EXCLUSIVE, NoopTask.create(), Intervals.of("2015-01-01/2015-01-02")).isOk());
}
@Test
@ -162,7 +210,7 @@ public class TaskLockboxTest
exception.expectMessage("Unable to grant lock to inactive Task");
lockbox.add(task);
lockbox.remove(task);
Assert.assertFalse(lockbox.tryLock(task, Intervals.of("2015-01-01/2015-01-02")).isPresent());
Assert.assertFalse(lockbox.tryLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")).isOk());
}
@Test
@ -173,8 +221,8 @@ public class TaskLockboxTest
lockbox.add(task1);
lockbox.add(task2);
lockbox.lock(task1, Intervals.of("2015-01-01/2015-01-02"), 5000);
lockbox.lock(task2, Intervals.of("2015-01-01/2015-01-15"), 5000);
Assert.assertTrue(lockbox.lock(TaskLockType.EXCLUSIVE, task1, Intervals.of("2015-01-01/2015-01-02"), 5000).isOk());
Assert.assertFalse(lockbox.lock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2015-01-01/2015-01-15"), 1000).isOk());
}
@Test
@ -186,8 +234,11 @@ public class TaskLockboxTest
taskStorage.insert(task, TaskStatus.running(task.getId()));
originalBox.add(task);
Assert.assertTrue(
originalBox.tryLock(task, Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))))
.isPresent()
originalBox.tryLock(
TaskLockType.EXCLUSIVE,
task,
Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2)))
).isOk()
);
}
@ -207,4 +258,206 @@ public class TaskLockboxTest
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
@Test
public void testDoInCriticalSectionWithSharedLock() throws Exception
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task task = NoopTask.create();
lockbox.add(task);
Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, task, interval).isOk());
Assert.assertFalse(
lockbox.doInCriticalSection(
task,
Collections.singletonList(interval),
CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build()
)
);
}
@Test
public void testDoInCriticalSectionWithExclusiveLock() throws Exception
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task task = NoopTask.create();
lockbox.add(task);
final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval).getTaskLock();
Assert.assertNotNull(lock);
Assert.assertTrue(
lockbox.doInCriticalSection(
task,
Collections.singletonList(interval),
CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build()
)
);
}
@Test
public void testDoInCriticalSectionWithSmallerInterval() throws Exception
{
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
final Interval smallInterval = Intervals.of("2017-01-10/2017-01-11");
final Task task = NoopTask.create();
lockbox.add(task);
final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval).getTaskLock();
Assert.assertNotNull(lock);
Assert.assertTrue(
lockbox.doInCriticalSection(
task,
Collections.singletonList(smallInterval),
CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build()
)
);
}
@Test
public void testPreemptionAndDoInCriticalSection() throws Exception
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
for (int i = 0; i < 5; i++) {
final Task task = NoopTask.create();
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, task, interval).isOk());
}
final Task highPriorityTask = NoopTask.create(100);
lockbox.add(highPriorityTask);
taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, highPriorityTask, interval).getTaskLock();
Assert.assertNotNull(lock);
Assert.assertTrue(
lockbox.doInCriticalSection(
highPriorityTask,
Collections.singletonList(interval),
CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build()
)
);
}
@Test
public void testDoInCriticalSectionWithRevokedLock() throws Exception
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task lowPriorityTask = NoopTask.create("task1", 0);
final Task highPriorityTask = NoopTask.create("task2", 10);
lockbox.add(lowPriorityTask);
lockbox.add(highPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
final TaskLock lowPriorityLock = lockbox.tryLock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval).getTaskLock();
Assert.assertNotNull(lowPriorityLock);
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, highPriorityTask, interval).isOk());
Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked());
Assert.assertFalse(
lockbox.doInCriticalSection(
lowPriorityTask,
Collections.singletonList(interval),
CriticalAction.<Boolean>builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build()
)
);
}
@Test(timeout = 5000L)
public void testAcquireLockAfterRevoked() throws EntryExistsException, InterruptedException
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task lowPriorityTask = NoopTask.create("task1", 0);
final Task highPriorityTask = NoopTask.create("task2", 10);
lockbox.add(lowPriorityTask);
lockbox.add(highPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
final TaskLock lowPriorityLock = lockbox.lock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval).getTaskLock();
Assert.assertNotNull(lowPriorityLock);
Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, highPriorityTask, interval).isOk());
Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked());
lockbox.unlock(highPriorityTask, interval);
// Acquire again
final LockResult lockResult = lockbox.lock(TaskLockType.EXCLUSIVE, lowPriorityTask, interval);
Assert.assertFalse(lockResult.isOk());
Assert.assertTrue(lockResult.isRevoked());
Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked());
}
@Test
public void testUnlock() throws EntryExistsException
{
final List<Task> lowPriorityTasks = new ArrayList<>();
final List<Task> highPriorityTasks = new ArrayList<>();
for (int i = 0; i < 8; i++) {
final Task task = NoopTask.create(10);
lowPriorityTasks.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
lockbox.add(task);
Assert.assertTrue(
lockbox.tryLock(
TaskLockType.EXCLUSIVE,
task,
Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2)))
).isOk()
);
}
// Revoke some locks
for (int i = 0; i < 4; i++) {
final Task task = NoopTask.create(100);
highPriorityTasks.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
lockbox.add(task);
Assert.assertTrue(
lockbox.tryLock(
TaskLockType.EXCLUSIVE,
task,
Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2)))
).isOk()
);
}
for (int i = 0; i < 4; i++) {
Assert.assertTrue(taskStorage.getLocks(lowPriorityTasks.get(i).getId()).stream().allMatch(TaskLock::isRevoked));
Assert.assertFalse(taskStorage.getLocks(highPriorityTasks.get(i).getId()).stream().allMatch(TaskLock::isRevoked));
}
for (int i = 4; i < 8; i++) {
Assert.assertFalse(taskStorage.getLocks(lowPriorityTasks.get(i).getId()).stream().allMatch(TaskLock::isRevoked));
}
for (int i = 0; i < 4; i++) {
lockbox.unlock(
lowPriorityTasks.get(i),
Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2)))
);
lockbox.unlock(
highPriorityTasks.get(i),
Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2)))
);
}
for (int i = 4; i < 8; i++) {
lockbox.unlock(
lowPriorityTasks.get(i),
Intervals.of(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2)))
);
}
Assert.assertTrue(lockbox.getAllLocks().isEmpty());
}
private Set<TaskLock> getAllLocks(List<Task> tasks)
{
return tasks.stream()
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toSet());
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import io.druid.timeline.DataSegment;
import java.util.Objects;
@ -42,6 +43,11 @@ public class SegmentPublishResult
private final Set<DataSegment> segments;
private final boolean success;
public static SegmentPublishResult fail()
{
return new SegmentPublishResult(ImmutableSet.of(), false);
}
@JsonCreator
public SegmentPublishResult(
@JsonProperty("segments") Set<DataSegment> segments,

View File

@ -19,6 +19,7 @@
package io.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
@ -27,9 +28,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.FoldController;
@ -42,11 +41,13 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
implements MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -317,20 +318,46 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (%2$s_id, lock_payload) VALUES (:entryId, :payload)",
lockTable, entryTypeName
)
)
.bind("entryId", entryId)
.bind("payload", jsonMapper.writeValueAsBytes(lock))
.execute() == 1;
return addLock(handle, entryId, lock);
}
}
);
}
private boolean addLock(Handle handle, String entryId, LockType lock) throws JsonProcessingException
{
final String statement = StringUtils.format(
"INSERT INTO %1$s (%2$s_id, lock_payload) VALUES (:entryId, :payload)",
lockTable, entryTypeName
);
return handle.createStatement(statement)
.bind("entryId", entryId)
.bind("payload", jsonMapper.writeValueAsBytes(lock))
.execute() == 1;
}
@Override
public boolean replaceLock(final String entryId, final long oldLockId, final LockType newLock)
{
return connector.retryTransaction(
(handle, transactionStatus) -> {
int numDeletedRows = removeLock(handle, oldLockId);
if (numDeletedRows != 1) {
transactionStatus.setRollbackOnly();
final String message = numDeletedRows == 0 ?
StringUtils.format("Cannot find lock[%d]", oldLockId) :
StringUtils.format("Found multiple locks for lockId[%d]", oldLockId);
throw new RuntimeException(message);
}
return addLock(handle, entryId, newLock);
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}
@Override
public void removeLock(final long lockId)
{
@ -340,9 +367,7 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(StringUtils.format("DELETE FROM %s WHERE id = :id", lockTable))
.bind("id", lockId)
.execute();
removeLock(handle, lockId);
return null;
}
@ -350,6 +375,13 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
);
}
private int removeLock(Handle handle, long lockId)
{
return handle.createStatement(StringUtils.format("DELETE FROM %s WHERE id = :id", lockTable))
.bind("id", lockId)
.execute();
}
@Override
public boolean addLog(final String entryId, final LogType log)
{
@ -488,4 +520,15 @@ public class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, Loc
}
);
}
@Override
@Nullable
public Long getLockId(String entryId, LockType lock)
{
return getLocks(entryId).entrySet().stream()
.filter(entry -> entry.getValue().equals(lock))
.map(Entry::getKey)
.findAny()
.orElse(null);
}
}

View File

@ -564,8 +564,7 @@ public class AppenderatorDriver implements Closeable
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info("Our segments really do exist, awaiting handoff.");
} else {
log.warn("Our segments don't exist, giving up.");
return null;
throw new ISE("Failed to publish segments[%s]", segmentIdentifiers);
}
}
}

View File

@ -268,4 +268,62 @@ public class SQLMetadataStorageActionHandlerTest
);
Assert.assertEquals(updated.keySet(), locks.keySet());
}
@Test
public void testReplaceLock() throws EntryExistsException
{
final String entryId = "ABC123";
Map<String, Integer> entry = ImmutableMap.of("a", 1);
Map<String, Integer> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
handler.getLocks("non_exist_entry")
);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
handler.getLocks(entryId)
);
final ImmutableMap<String, Integer> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Integer> lock2 = ImmutableMap.of("lock", 2);
Assert.assertTrue(handler.addLock(entryId, lock1));
final Long lockId1 = handler.getLockId(entryId, lock1);
Assert.assertNotNull(lockId1);
Assert.assertTrue(handler.replaceLock(entryId, lockId1, lock2));
}
@Test
public void testGetLockId() throws EntryExistsException
{
final String entryId = "ABC123";
Map<String, Integer> entry = ImmutableMap.of("a", 1);
Map<String, Integer> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
handler.getLocks("non_exist_entry")
);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
handler.getLocks(entryId)
);
final ImmutableMap<String, Integer> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Integer> lock2 = ImmutableMap.of("lock", 2);
Assert.assertTrue(handler.addLock(entryId, lock1));
Assert.assertNotNull(handler.getLockId(entryId, lock1));
Assert.assertNull(handler.getLockId(entryId, lock2));
}
}