mirror of https://github.com/apache/druid.git
Allow for appending tasks to co-exist with each other. (#12041)
* Allow for appending tasks to co-exist with each other. Add a config parameter for appending tasks to allow them to use a SHARED lock. This will allow multiple appending tasks to add segments to the same datasource at the same time. This config should actually be the default, but it is added as a config to enable a smooth transition/validation in production settings before forcing it as the default behavior going forward. This change leverages the TaskLockType.SHARED that existed previously, this used to carry the semantics of a READ lock, which was "escalated" when the task wanted to actually persist the segment. As of many moons before this diff, the SHARED lock had stopped being used but was still piped into the code. It turns out that with a few tweaks, it can be adjusted to be a shared lock for append tasks to allow them all to write to the same datasource, so that is what this does. * Can only reuse the shared lock if using the same groupId * Need to serialize out the task lock type * Adjust Unit tests to expect new field in JSON
This commit is contained in:
parent
229f82a6f0
commit
a8b916576d
|
@ -80,6 +80,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
|
||||||
private final boolean skipSegmentLineageCheck;
|
private final boolean skipSegmentLineageCheck;
|
||||||
private final PartialShardSpec partialShardSpec;
|
private final PartialShardSpec partialShardSpec;
|
||||||
private final LockGranularity lockGranularity;
|
private final LockGranularity lockGranularity;
|
||||||
|
private final TaskLockType taskLockType;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public SegmentAllocateAction(
|
public SegmentAllocateAction(
|
||||||
|
@ -92,7 +93,8 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
|
||||||
@JsonProperty("skipSegmentLineageCheck") boolean skipSegmentLineageCheck,
|
@JsonProperty("skipSegmentLineageCheck") boolean skipSegmentLineageCheck,
|
||||||
// nullable for backward compatibility
|
// nullable for backward compatibility
|
||||||
@JsonProperty("shardSpecFactory") @Nullable PartialShardSpec partialShardSpec,
|
@JsonProperty("shardSpecFactory") @Nullable PartialShardSpec partialShardSpec,
|
||||||
@JsonProperty("lockGranularity") @Nullable LockGranularity lockGranularity // nullable for backward compatibility
|
@JsonProperty("lockGranularity") @Nullable LockGranularity lockGranularity,
|
||||||
|
@JsonProperty("taskLockType") @Nullable TaskLockType taskLockType
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
|
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
|
||||||
|
@ -107,6 +109,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
|
||||||
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
|
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
|
||||||
this.partialShardSpec = partialShardSpec == null ? NumberedPartialShardSpec.instance() : partialShardSpec;
|
this.partialShardSpec = partialShardSpec == null ? NumberedPartialShardSpec.instance() : partialShardSpec;
|
||||||
this.lockGranularity = lockGranularity == null ? LockGranularity.TIME_CHUNK : lockGranularity;
|
this.lockGranularity = lockGranularity == null ? LockGranularity.TIME_CHUNK : lockGranularity;
|
||||||
|
this.taskLockType = taskLockType == null ? TaskLockType.EXCLUSIVE : taskLockType;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -163,6 +166,12 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
|
||||||
return lockGranularity;
|
return lockGranularity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public TaskLockType getTaskLockType()
|
||||||
|
{
|
||||||
|
return taskLockType;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TypeReference<SegmentIdWithShardSpec> getReturnTypeReference()
|
public TypeReference<SegmentIdWithShardSpec> getReturnTypeReference()
|
||||||
{
|
{
|
||||||
|
@ -290,13 +299,13 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
|
||||||
boolean logOnFail
|
boolean logOnFail
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
// This action is always used by appending tasks, which cannot change the segment granularity of existing
|
// This action is always used by appending tasks, so if it is a time_chunk lock then we allow it to be
|
||||||
// dataSources. So, all lock requests should be segmentLock.
|
// shared with other appending tasks as well
|
||||||
final LockResult lockResult = toolbox.getTaskLockbox().tryLock(
|
final LockResult lockResult = toolbox.getTaskLockbox().tryLock(
|
||||||
task,
|
task,
|
||||||
new LockRequestForNewSegment(
|
new LockRequestForNewSegment(
|
||||||
lockGranularity,
|
lockGranularity,
|
||||||
TaskLockType.EXCLUSIVE,
|
taskLockType,
|
||||||
task.getGroupId(),
|
task.getGroupId(),
|
||||||
dataSource,
|
dataSource,
|
||||||
tryInterval,
|
tryInterval,
|
||||||
|
|
|
@ -279,17 +279,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
||||||
*
|
*
|
||||||
* @return whether the lock was acquired
|
* @return whether the lock was acquired
|
||||||
*/
|
*/
|
||||||
public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals)
|
public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals, IndexIOConfig ioConfig)
|
||||||
throws IOException
|
throws IOException
|
||||||
{
|
{
|
||||||
final boolean forceTimeChunkLock = getContextValue(
|
final boolean forceTimeChunkLock = getContextValue(
|
||||||
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
|
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
|
||||||
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
|
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
|
||||||
);
|
);
|
||||||
|
final boolean useSharedLock = ioConfig.isAppendToExisting() && getContextValue(Tasks.USE_SHARED_LOCK, false);
|
||||||
// Respect task context value most.
|
// Respect task context value most.
|
||||||
if (forceTimeChunkLock) {
|
if (forceTimeChunkLock) {
|
||||||
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
|
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
|
||||||
taskLockHelper = new TaskLockHelper(false);
|
taskLockHelper = new TaskLockHelper(false, useSharedLock);
|
||||||
if (!intervals.isEmpty()) {
|
if (!intervals.isEmpty()) {
|
||||||
return tryTimeChunkLock(client, intervals);
|
return tryTimeChunkLock(client, intervals);
|
||||||
} else {
|
} else {
|
||||||
|
@ -298,7 +299,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
||||||
} else {
|
} else {
|
||||||
if (!intervals.isEmpty()) {
|
if (!intervals.isEmpty()) {
|
||||||
final LockGranularityDetermineResult result = determineSegmentGranularity(client, intervals);
|
final LockGranularityDetermineResult result = determineSegmentGranularity(client, intervals);
|
||||||
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
|
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT, useSharedLock);
|
||||||
return tryLockWithDetermineResult(client, result);
|
return tryLockWithDetermineResult(client, result);
|
||||||
} else {
|
} else {
|
||||||
// This branch is the only one that will not initialize taskLockHelper.
|
// This branch is the only one that will not initialize taskLockHelper.
|
||||||
|
@ -326,9 +327,11 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
||||||
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
|
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
|
||||||
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
|
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
|
||||||
);
|
);
|
||||||
|
final boolean useSharedLock = getContextValue(Tasks.USE_SHARED_LOCK, false);
|
||||||
|
|
||||||
if (forceTimeChunkLock) {
|
if (forceTimeChunkLock) {
|
||||||
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
|
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
|
||||||
taskLockHelper = new TaskLockHelper(false);
|
taskLockHelper = new TaskLockHelper(false, useSharedLock);
|
||||||
segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
|
segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
|
||||||
return tryTimeChunkLock(
|
return tryTimeChunkLock(
|
||||||
client,
|
client,
|
||||||
|
@ -336,7 +339,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
final LockGranularityDetermineResult result = determineSegmentGranularity(segments);
|
final LockGranularityDetermineResult result = determineSegmentGranularity(segments);
|
||||||
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
|
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT, useSharedLock);
|
||||||
segmentCheckFunction.accept(result.lockGranularity, segments);
|
segmentCheckFunction.accept(result.lockGranularity, segments);
|
||||||
return tryLockWithDetermineResult(client, result);
|
return tryLockWithDetermineResult(client, result);
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,7 +276,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
|
||||||
DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
|
DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
|
||||||
|
|
||||||
appenderator = newAppenderator(dataSchema, tuningConfig, metrics, toolbox);
|
appenderator = newAppenderator(dataSchema, tuningConfig, metrics, toolbox);
|
||||||
StreamAppenderatorDriver driver = newDriver(dataSchema, appenderator, toolbox, metrics);
|
TaskLockType lockType = getContextValue(Tasks.USE_SHARED_LOCK, false) ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
|
||||||
|
StreamAppenderatorDriver driver = newDriver(dataSchema, appenderator, toolbox, metrics, lockType);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName());
|
log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName());
|
||||||
|
@ -787,7 +788,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
|
||||||
final DataSchema dataSchema,
|
final DataSchema dataSchema,
|
||||||
final Appenderator appenderator,
|
final Appenderator appenderator,
|
||||||
final TaskToolbox toolbox,
|
final TaskToolbox toolbox,
|
||||||
final FireDepartmentMetrics metrics
|
final FireDepartmentMetrics metrics,
|
||||||
|
final TaskLockType lockType
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return new StreamAppenderatorDriver(
|
return new StreamAppenderatorDriver(
|
||||||
|
@ -804,7 +806,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
|
||||||
previousSegmentId,
|
previousSegmentId,
|
||||||
skipSegmentLineageCheck,
|
skipSegmentLineageCheck,
|
||||||
NumberedPartialShardSpec.instance(),
|
NumberedPartialShardSpec.instance(),
|
||||||
LockGranularity.TIME_CHUNK
|
LockGranularity.TIME_CHUNK,
|
||||||
|
lockType
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
toolbox.getSegmentHandoffNotifierFactory(),
|
toolbox.getSegmentHandoffNotifierFactory(),
|
||||||
|
|
|
@ -238,7 +238,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
||||||
}
|
}
|
||||||
return determineLockGranularityAndTryLock(
|
return determineLockGranularityAndTryLock(
|
||||||
taskActionClient,
|
taskActionClient,
|
||||||
ingestionSchema.dataSchema.getGranularitySpec().inputIntervals()
|
ingestionSchema.dataSchema.getGranularitySpec().inputIntervals(),
|
||||||
|
ingestionSchema.getIOConfig()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -489,7 +490,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
||||||
final List<Interval> allocateIntervals = new ArrayList<>(partitionAnalysis.getAllIntervalsToIndex());
|
final List<Interval> allocateIntervals = new ArrayList<>(partitionAnalysis.getAllIntervalsToIndex());
|
||||||
final DataSchema dataSchema;
|
final DataSchema dataSchema;
|
||||||
if (determineIntervals) {
|
if (determineIntervals) {
|
||||||
if (!determineLockGranularityAndTryLock(toolbox.getTaskActionClient(), allocateIntervals)) {
|
final boolean gotLocks = determineLockGranularityAndTryLock(
|
||||||
|
toolbox.getTaskActionClient(), allocateIntervals, ingestionSchema.getIOConfig()
|
||||||
|
);
|
||||||
|
if (!gotLocks) {
|
||||||
throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals);
|
throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,8 @@ public class OverlordCoordinatingSegmentAllocator implements SegmentAllocatorFor
|
||||||
previousSegmentId,
|
previousSegmentId,
|
||||||
skipSegmentLineageCheck,
|
skipSegmentLineageCheck,
|
||||||
partialShardSpec,
|
partialShardSpec,
|
||||||
taskLockHelper.getLockGranularityToUse()
|
taskLockHelper.getLockGranularityToUse(),
|
||||||
|
taskLockHelper.getLockTypeToUse()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -57,6 +57,7 @@ public class TaskLockHelper
|
||||||
private final Map<Interval, OverwritingRootGenerationPartitions> overwritingRootGenPartitions = new HashMap<>();
|
private final Map<Interval, OverwritingRootGenerationPartitions> overwritingRootGenPartitions = new HashMap<>();
|
||||||
private final Set<DataSegment> lockedExistingSegments = new HashSet<>();
|
private final Set<DataSegment> lockedExistingSegments = new HashSet<>();
|
||||||
private final boolean useSegmentLock;
|
private final boolean useSegmentLock;
|
||||||
|
private final boolean useSharedLock;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private Granularity knownSegmentGranularity;
|
private Granularity knownSegmentGranularity;
|
||||||
|
@ -90,9 +91,10 @@ public class TaskLockHelper
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public TaskLockHelper(boolean useSegmentLock)
|
public TaskLockHelper(boolean useSegmentLock, boolean useSharedLock)
|
||||||
{
|
{
|
||||||
this.useSegmentLock = useSegmentLock;
|
this.useSegmentLock = useSegmentLock;
|
||||||
|
this.useSharedLock = useSharedLock;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isUseSegmentLock()
|
public boolean isUseSegmentLock()
|
||||||
|
@ -105,6 +107,16 @@ public class TaskLockHelper
|
||||||
return useSegmentLock ? LockGranularity.SEGMENT : LockGranularity.TIME_CHUNK;
|
return useSegmentLock ? LockGranularity.SEGMENT : LockGranularity.TIME_CHUNK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isUseSharedLock()
|
||||||
|
{
|
||||||
|
return useSharedLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TaskLockType getLockTypeToUse()
|
||||||
|
{
|
||||||
|
return useSharedLock ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean hasLockedExistingSegments()
|
public boolean hasLockedExistingSegments()
|
||||||
{
|
{
|
||||||
return !lockedExistingSegments.isEmpty();
|
return !lockedExistingSegments.isEmpty();
|
||||||
|
|
|
@ -50,6 +50,8 @@ public class Tasks
|
||||||
public static final String PRIORITY_KEY = "priority";
|
public static final String PRIORITY_KEY = "priority";
|
||||||
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
|
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
|
||||||
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
|
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
|
||||||
|
public static final String USE_SHARED_LOCK = "useSharedLock";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This context is used in compaction. When it is set in the context, the segments created by the task
|
* This context is used in compaction. When it is set in the context, the segments created by the task
|
||||||
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
|
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
|
||||||
|
|
|
@ -384,7 +384,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
||||||
{
|
{
|
||||||
return determineLockGranularityAndTryLock(
|
return determineLockGranularityAndTryLock(
|
||||||
taskActionClient,
|
taskActionClient,
|
||||||
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
|
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals(),
|
||||||
|
ingestionSchema.getIOConfig()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -194,7 +194,8 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
|
||||||
{
|
{
|
||||||
return determineLockGranularityAndTryLock(
|
return determineLockGranularityAndTryLock(
|
||||||
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
|
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
|
||||||
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
|
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals(),
|
||||||
|
ingestionSchema.getIOConfig()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -579,9 +579,8 @@ public class TaskLockbox
|
||||||
.stream()
|
.stream()
|
||||||
.allMatch(interval -> {
|
.allMatch(interval -> {
|
||||||
final List<TaskLockPosse> lockPosses = getOnlyTaskLockPosseContainingInterval(task, interval);
|
final List<TaskLockPosse> lockPosses = getOnlyTaskLockPosseContainingInterval(task, interval);
|
||||||
// Tasks cannot enter the critical section with a shared lock
|
return lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch(
|
||||||
return lockPosses.stream().map(TaskLockPosse::getTaskLock).allMatch(
|
lock -> lock.isRevoked()
|
||||||
lock -> !lock.isRevoked() && lock.getType() != TaskLockType.SHARED
|
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1134,8 +1133,10 @@ public class TaskLockbox
|
||||||
if (taskLock.getType() == request.getType() && taskLock.getGranularity() == request.getGranularity()) {
|
if (taskLock.getType() == request.getType() && taskLock.getGranularity() == request.getGranularity()) {
|
||||||
switch (taskLock.getType()) {
|
switch (taskLock.getType()) {
|
||||||
case SHARED:
|
case SHARED:
|
||||||
// All shared lock is not reusable. Instead, a new lock posse is created for each lock request.
|
if (request instanceof TimeChunkLockRequest) {
|
||||||
// See createOrFindLockPosse().
|
return taskLock.getInterval().contains(request.getInterval())
|
||||||
|
&& taskLock.getGroupId().equals(request.getGroupId());
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
case EXCLUSIVE:
|
case EXCLUSIVE:
|
||||||
if (request instanceof TimeChunkLockRequest) {
|
if (request instanceof TimeChunkLockRequest) {
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.druid.indexer.TaskStatus;
|
||||||
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
|
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
|
||||||
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
|
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
|
||||||
import org.apache.druid.indexing.common.LockGranularity;
|
import org.apache.druid.indexing.common.LockGranularity;
|
||||||
|
import org.apache.druid.indexing.common.TaskLockType;
|
||||||
import org.apache.druid.indexing.common.TaskToolbox;
|
import org.apache.druid.indexing.common.TaskToolbox;
|
||||||
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
|
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
|
||||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||||
|
@ -69,6 +70,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
|
||||||
protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
|
protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
|
||||||
protected final Map<String, Object> context;
|
protected final Map<String, Object> context;
|
||||||
protected final LockGranularity lockGranularityToUse;
|
protected final LockGranularity lockGranularityToUse;
|
||||||
|
protected final TaskLockType lockTypeToUse;
|
||||||
|
|
||||||
// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
|
// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
|
||||||
// See https://github.com/apache/druid/issues/7724 for issues that can cause.
|
// See https://github.com/apache/druid/issues/7724 for issues that can cause.
|
||||||
|
@ -103,6 +105,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
|
||||||
this.lockGranularityToUse = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
|
this.lockGranularityToUse = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
|
||||||
? LockGranularity.TIME_CHUNK
|
? LockGranularity.TIME_CHUNK
|
||||||
: LockGranularity.SEGMENT;
|
: LockGranularity.SEGMENT;
|
||||||
|
this.lockTypeToUse = getContextValue(Tasks.USE_SHARED_LOCK, false) ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static String getFormattedGroupId(String dataSource, String type)
|
protected static String getFormattedGroupId(String dataSource, String type)
|
||||||
|
@ -222,7 +225,8 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
|
||||||
previousSegmentId,
|
previousSegmentId,
|
||||||
skipSegmentLineageCheck,
|
skipSegmentLineageCheck,
|
||||||
NumberedPartialShardSpec.instance(),
|
NumberedPartialShardSpec.instance(),
|
||||||
lockGranularityToUse
|
lockGranularityToUse,
|
||||||
|
lockTypeToUse
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
toolbox.getSegmentHandoffNotifierFactory(),
|
toolbox.getSegmentHandoffNotifierFactory(),
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.druid.indexing.common.LockGranularity;
|
import org.apache.druid.indexing.common.LockGranularity;
|
||||||
|
import org.apache.druid.indexing.common.TaskLockType;
|
||||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
|
@ -52,7 +53,8 @@ public class SegmentAllocateActionSerdeTest
|
||||||
"prev",
|
"prev",
|
||||||
false,
|
false,
|
||||||
NumberedPartialShardSpec.instance(),
|
NumberedPartialShardSpec.instance(),
|
||||||
LockGranularity.SEGMENT
|
LockGranularity.SEGMENT,
|
||||||
|
null
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,6 +73,7 @@ public class SegmentAllocateActionSerdeTest
|
||||||
Assert.assertEquals(target.getSequenceName(), fromJson.getSequenceName());
|
Assert.assertEquals(target.getSequenceName(), fromJson.getSequenceName());
|
||||||
Assert.assertEquals(target.getPreviousSegmentId(), fromJson.getPreviousSegmentId());
|
Assert.assertEquals(target.getPreviousSegmentId(), fromJson.getPreviousSegmentId());
|
||||||
Assert.assertEquals(target.isSkipSegmentLineageCheck(), fromJson.isSkipSegmentLineageCheck());
|
Assert.assertEquals(target.isSkipSegmentLineageCheck(), fromJson.isSkipSegmentLineageCheck());
|
||||||
|
Assert.assertEquals(TaskLockType.EXCLUSIVE, target.getTaskLockType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -81,7 +84,7 @@ public class SegmentAllocateActionSerdeTest
|
||||||
Map.class
|
Map.class
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertEquals(10, fromJson.size());
|
Assert.assertEquals(11, fromJson.size());
|
||||||
Assert.assertEquals(SegmentAllocateAction.TYPE, fromJson.get("type"));
|
Assert.assertEquals(SegmentAllocateAction.TYPE, fromJson.get("type"));
|
||||||
Assert.assertEquals(target.getDataSource(), fromJson.get("dataSource"));
|
Assert.assertEquals(target.getDataSource(), fromJson.get("dataSource"));
|
||||||
Assert.assertEquals(target.getTimestamp(), DateTimes.of((String) fromJson.get("timestamp")));
|
Assert.assertEquals(target.getTimestamp(), DateTimes.of((String) fromJson.get("timestamp")));
|
||||||
|
@ -98,5 +101,6 @@ public class SegmentAllocateActionSerdeTest
|
||||||
Assert.assertEquals(target.isSkipSegmentLineageCheck(), fromJson.get("skipSegmentLineageCheck"));
|
Assert.assertEquals(target.isSkipSegmentLineageCheck(), fromJson.get("skipSegmentLineageCheck"));
|
||||||
Assert.assertEquals(ImmutableMap.of("type", "numbered"), fromJson.get("shardSpecFactory"));
|
Assert.assertEquals(ImmutableMap.of("type", "numbered"), fromJson.get("shardSpecFactory"));
|
||||||
Assert.assertEquals(target.getLockGranularity(), LockGranularity.valueOf((String) fromJson.get("lockGranularity")));
|
Assert.assertEquals(target.getLockGranularity(), LockGranularity.valueOf((String) fromJson.get("lockGranularity")));
|
||||||
|
Assert.assertEquals(target.getTaskLockType(), TaskLockType.valueOf((String) fromJson.get("taskLockType")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -873,7 +873,8 @@ public class SegmentAllocateActionTest
|
||||||
null,
|
null,
|
||||||
true,
|
true,
|
||||||
new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 1, 2, null),
|
new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 1, 2, null),
|
||||||
lockGranularity
|
lockGranularity,
|
||||||
|
null
|
||||||
);
|
);
|
||||||
final SegmentIdWithShardSpec segmentIdentifier = action.perform(task, taskActionTestKit.getTaskActionToolbox());
|
final SegmentIdWithShardSpec segmentIdentifier = action.perform(task, taskActionTestKit.getTaskActionToolbox());
|
||||||
Assert.assertNotNull(segmentIdentifier);
|
Assert.assertNotNull(segmentIdentifier);
|
||||||
|
@ -986,7 +987,8 @@ public class SegmentAllocateActionTest
|
||||||
sequencePreviousId,
|
sequencePreviousId,
|
||||||
false,
|
false,
|
||||||
partialShardSpec,
|
partialShardSpec,
|
||||||
lockGranularity
|
lockGranularity,
|
||||||
|
null
|
||||||
);
|
);
|
||||||
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
|
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,7 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
|
@ -618,6 +619,23 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
|
||||||
Assert.assertEquals(new HashSet<>(newSegments), visibles);
|
Assert.assertEquals(new HashSet<>(newSegments), visibles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleAppends()
|
||||||
|
{
|
||||||
|
final Interval interval = null;
|
||||||
|
final ParallelIndexSupervisorTask task = newTask(interval, Granularities.DAY, true, true);
|
||||||
|
final ParallelIndexSupervisorTask task2 = newTask(interval, Granularities.DAY, true, true);
|
||||||
|
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
|
||||||
|
task.addToContext(Tasks.USE_SHARED_LOCK, true);
|
||||||
|
task2.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
|
||||||
|
task2.addToContext(Tasks.USE_SHARED_LOCK, true);
|
||||||
|
getIndexingServiceClient().runTask(task.getId(), task);
|
||||||
|
getIndexingServiceClient().runTask(task2.getId(), task2);
|
||||||
|
|
||||||
|
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().waitToFinish(task, 1, TimeUnit.DAYS).getStatusCode());
|
||||||
|
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().waitToFinish(task2, 1, TimeUnit.DAYS).getStatusCode());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRunParallelWithNoInputSplitToProcess()
|
public void testRunParallelWithNoInputSplitToProcess()
|
||||||
{
|
{
|
||||||
|
|
|
@ -491,7 +491,7 @@ public class TaskLockboxTest
|
||||||
lockbox.add(task);
|
lockbox.add(task);
|
||||||
Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, task, interval).isOk());
|
Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, task, interval).isOk());
|
||||||
|
|
||||||
Assert.assertFalse(
|
Assert.assertTrue(
|
||||||
lockbox.doInCriticalSection(
|
lockbox.doInCriticalSection(
|
||||||
task,
|
task,
|
||||||
Collections.singletonList(interval),
|
Collections.singletonList(interval),
|
||||||
|
|
Loading…
Reference in New Issue