Add support for concurrent batch Append and Replace (#14407)

Changes:
- Add task context parameter `taskLockType`. This determines the type of lock used by a batch task.
- Add new task actions for transactional replace and append of segments
- Add methods StorageCoordinator.commitAppendSegments and commitReplaceSegments
- Upgrade segments to appropriate versions when performing replace and append
- Add new metadata table `upgradeSegments` to track segments that need to be upgraded
- Add tests
This commit is contained in:
AmatyaAvadhanula 2023-09-25 07:06:37 +05:30 committed by GitHub
parent d7c152c82c
commit c62193c4d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 3061 additions and 124 deletions

View File

@ -90,6 +90,7 @@ public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageCo
null,
null,
null,
null,
segmentTable,
null,
null,

View File

@ -30,9 +30,6 @@ import org.apache.druid.timeline.DataSegment;
import java.util.Set;
/**
* Insert segments into metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
* <p/>
* Word of warning: Very large "segments" sets can cause oversized audit log entries, which is bad because it means
* that the task cannot actually complete. Callers should avoid this by avoiding inserting too many segments in the
* same action.

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*/
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
private final Set<DataSegment> segments;
public static SegmentTransactionalAppendAction create(Set<DataSegment> segments)
{
return new SegmentTransactionalAppendAction(segments);
}
@JsonCreator
private SegmentTransactionalAppendAction(
@JsonProperty("segments") Set<DataSegment> segments
)
{
this.segments = segments;
}
@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}
@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
return new TypeReference<SegmentPublishResult>()
{
};
}
/**
* Performs some sanity checks and publishes the given segments.
*/
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
final String datasource = task.getDataSource();
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);
final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
)
)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
+ " Please check the overlord log for details."
)
)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
for (DataSegment segment : retVal.getSegments()) {
IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}
return retVal;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "SegmentTransactionalAppendAction{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
'}';
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*/
public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
{
/**
* Set of segments to be inserted into metadata storage
*/
private final Set<DataSegment> segments;
public static SegmentTransactionalReplaceAction create(
Set<DataSegment> segmentsToPublish
)
{
return new SegmentTransactionalReplaceAction(segmentsToPublish);
}
@JsonCreator
private SegmentTransactionalReplaceAction(
@JsonProperty("segments") Set<DataSegment> segments
)
{
this.segments = ImmutableSet.copyOf(segments);
}
@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}
@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
return new TypeReference<SegmentPublishResult>()
{
};
}
/**
* Performs some sanity checks and publishes the given segments.
*/
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
// Find the active replace locks held only by this task
final Set<ReplaceTaskLock> replaceLocksForTask
= toolbox.getTaskLockbox().findReplaceLocksForTask(task);
final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator()
.commitReplaceSegments(segments, replaceLocksForTask)
)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
+ " Please check the overlord log for details."
)
)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
for (DataSegment segment : retVal.getSegments()) {
final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType();
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}
return retVal;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "SegmentTransactionalReplaceAction{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
'}';
}
}

View File

@ -36,6 +36,8 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "lockRelease", value = LockReleaseAction.class),
@JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.

View File

@ -23,19 +23,28 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
public class TaskLocks
@ -77,6 +86,13 @@ public class TaskLocks
return isLockCoversSegments(taskLockMap, segments);
}
public static String defaultLockVersion(TaskLockType lockType)
{
return lockType == TaskLockType.APPEND
? DateTimes.EPOCH.toString()
: DateTimes.nowUtc().toString();
}
public static boolean isLockCoversSegments(
NavigableMap<DateTime, List<TaskLock>> taskLockMap,
Collection<DataSegment> segments
@ -96,7 +112,8 @@ public class TaskLocks
final TimeChunkLock timeChunkLock = (TimeChunkLock) lock;
return timeChunkLock.getInterval().contains(segment.getInterval())
&& timeChunkLock.getDataSource().equals(segment.getDataSource())
&& timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0;
&& (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
|| TaskLockType.APPEND.equals(timeChunkLock.getType()));
} else {
final SegmentLock segmentLock = (SegmentLock) lock;
return segmentLock.getInterval().contains(segment.getInterval())
@ -110,6 +127,63 @@ public class TaskLocks
);
}
/**
* Determines the type of time chunk lock to use for appending segments.
* <p>
* This method should be de-duplicated with {@link AbstractBatchIndexTask#determineLockType}
* by passing the ParallelIndexSupervisorTask instance into the
* SinglePhaseParallelIndexTaskRunner.
*/
public static TaskLockType determineLockTypeForAppend(
Map<String, Object> taskContext
)
{
final Object lockType = taskContext.get(Tasks.TASK_LOCK_TYPE);
if (lockType == null) {
final boolean useSharedLock = (boolean) taskContext.getOrDefault(Tasks.USE_SHARED_LOCK, false);
return useSharedLock ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
} else {
return TaskLockType.valueOf(lockType.toString());
}
}
/**
* Finds locks of type {@link TaskLockType#REPLACE} for each of the given segments
* that have an interval completely covering the interval of the respective segments.
*
* @return Map from segment to REPLACE lock that completely covers it. The map
* does not contain an entry for segments that have no covering REPLACE lock.
*/
public static Map<DataSegment, ReplaceTaskLock> findReplaceLocksCoveringSegments(
final String datasource,
final TaskLockbox taskLockbox,
final Set<DataSegment> segments
)
{
// Identify unique segment intervals
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(
segment.getInterval(), interval -> new ArrayList<>()
).add(segment)
);
final Set<ReplaceTaskLock> replaceLocks = taskLockbox.getAllReplaceLocksForDatasource(datasource);
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock = new HashMap<>();
intervalToSegments.forEach((interval, segmentsInInterval) -> {
// For each interval, find the lock that covers it, if any
for (ReplaceTaskLock lock : replaceLocks) {
if (lock.getInterval().contains(interval)) {
segmentsInInterval.forEach(s -> segmentToReplaceLock.put(s, lock));
return;
}
}
});
return segmentToReplaceLock;
}
public static List<TaskLock> findLocksForSegments(
final Task task,
final TaskLockbox taskLockbox,

View File

@ -36,14 +36,18 @@ import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -289,23 +293,21 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
*
* @return whether the lock was acquired
*/
public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals, IndexIOConfig ioConfig)
public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals)
throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
IngestionMode ingestionMode = getIngestionMode();
final boolean useSharedLock = ingestionMode == IngestionMode.APPEND
&& getContextValue(Tasks.USE_SHARED_LOCK, false);
final IngestionMode ingestionMode = getIngestionMode();
// Respect task context value most.
if (forceTimeChunkLock || ingestionMode == IngestionMode.REPLACE) {
log.info(
"forceTimeChunkLock[%s] is set to true or mode[%s] is replace. Use timeChunk lock",
"Using time chunk lock since forceTimeChunkLock is [%s] and mode is [%s].",
forceTimeChunkLock, ingestionMode
);
taskLockHelper = new TaskLockHelper(false, useSharedLock);
taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK);
if (!intervals.isEmpty()) {
return tryTimeChunkLock(client, intervals);
} else {
@ -314,7 +316,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
} else {
if (!intervals.isEmpty()) {
final LockGranularityDetermineResult result = determineSegmentGranularity(client, intervals);
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT, useSharedLock);
taskLockHelper = createLockHelper(result.lockGranularity);
return tryLockWithDetermineResult(client, result);
} else {
// This branch is the only one that will not initialize taskLockHelper.
@ -342,11 +344,10 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
final boolean useSharedLock = getContextValue(Tasks.USE_SHARED_LOCK, false);
if (forceTimeChunkLock) {
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
taskLockHelper = new TaskLockHelper(false, useSharedLock);
taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK);
segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
return tryTimeChunkLock(
client,
@ -354,7 +355,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
);
} else {
final LockGranularityDetermineResult result = determineSegmentGranularity(segments);
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT, useSharedLock);
taskLockHelper = createLockHelper(result.lockGranularity);
segmentCheckFunction.accept(result.lockGranularity, segments);
return tryLockWithDetermineResult(client, result);
}
@ -398,6 +399,27 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
}
}
/**
* Builds a TaskAction to publish segments based on the type of locks that this
* task acquires (determined by context property {@link Tasks#TASK_LOCK_TYPE}).
*/
protected TaskAction<SegmentPublishResult> buildPublishAction(
Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish
)
{
TaskLockType lockType = TaskLockType.valueOf(
getContextValue(Tasks.TASK_LOCK_TYPE, Tasks.DEFAULT_TASK_LOCK_TYPE.name())
);
switch (lockType) {
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
return SegmentTransactionalAppendAction.create(segmentsToPublish);
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
}
}
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
@ -430,7 +452,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
}
prev = cur;
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
final TaskLockType taskLockType = determineLockType(LockGranularity.TIME_CHUNK);
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(taskLockType, cur));
if (lock == null) {
return false;
}
@ -443,6 +466,42 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
return true;
}
private TaskLockHelper createLockHelper(LockGranularity lockGranularity)
{
return new TaskLockHelper(
lockGranularity == LockGranularity.SEGMENT,
determineLockType(lockGranularity)
);
}
/**
* Determines the type of lock to use with the given lock granularity.
*/
private TaskLockType determineLockType(LockGranularity lockGranularity)
{
if (lockGranularity == LockGranularity.SEGMENT) {
return TaskLockType.EXCLUSIVE;
}
final String contextLockType = getContextValue(Tasks.TASK_LOCK_TYPE);
final TaskLockType lockType;
if (contextLockType == null) {
lockType = getContextValue(Tasks.USE_SHARED_LOCK, false)
? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
} else {
lockType = TaskLockType.valueOf(contextLockType);
}
final IngestionMode ingestionMode = getIngestionMode();
if ((lockType == TaskLockType.SHARED || lockType == TaskLockType.APPEND)
&& ingestionMode != IngestionMode.APPEND) {
// Lock types SHARED and APPEND are allowed only in APPEND ingestion mode
return Tasks.DEFAULT_TASK_LOCK_TYPE;
} else {
return lockType;
}
}
private LockGranularityDetermineResult determineSegmentGranularity(List<DataSegment> segments)
{
if (segments.isEmpty()) {
@ -671,7 +730,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
public static NonnullPair<Interval, String> findIntervalAndVersion(
TaskToolbox toolbox,
IngestionSpec<?, ?> ingestionSpec,
DateTime timestamp
DateTime timestamp,
TaskLockType taskLockType
) throws IOException
{
// This method is called whenever subtasks need to allocate a new segment via the supervisor task.
@ -727,7 +787,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
// We don't have a lock for this interval, so we should lock it now.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)
new TimeChunkLockTryAcquireAction(taskLockType, interval)
),
"Cannot acquire a lock for interval[%s]",
interval
@ -783,8 +843,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
protected SegmentIdWithShardSpec allocateNewSegmentForTombstone(
IngestionSpec ingestionSchema,
DateTime timestamp,
TaskToolbox toolbox
DateTime timestamp
)
{
// Since tombstones are derived from inputIntervals, inputIntervals cannot be empty for replace, and locks are

View File

@ -55,6 +55,7 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
@ -293,7 +294,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
appenderator = newAppenderator(dataSchema, tuningConfig, metrics, toolbox);
TaskLockType lockType = getContextValue(Tasks.USE_SHARED_LOCK, false) ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
final TaskLockType lockType = TaskLocks.determineLockTypeForAppend(getContext());
StreamAppenderatorDriver driver = newDriver(dataSchema, appenderator, toolbox, metrics, lockType);
try {

View File

@ -53,7 +53,6 @@ import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
@ -253,8 +252,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
return determineLockGranularityAndTryLock(
taskActionClient,
ingestionSchema.dataSchema.getGranularitySpec().inputIntervals(),
ingestionSchema.getIOConfig()
ingestionSchema.dataSchema.getGranularitySpec().inputIntervals()
);
}
@ -524,7 +522,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
final DataSchema dataSchema;
if (determineIntervals) {
final boolean gotLocks = determineLockGranularityAndTryLock(
toolbox.getTaskActionClient(), allocateIntervals, ingestionSchema.getIOConfig()
toolbox.getTaskActionClient(),
allocateIntervals
);
if (!gotLocks) {
throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals);
@ -912,9 +911,9 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten, segmentsToPublish));
String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
@ -980,8 +979,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
for (Interval interval : tombstoneIntervals) {
SegmentIdWithShardSpec segmentIdWithShardSpec = allocateNewSegmentForTombstone(
ingestionSchema,
interval.getStart(),
toolbox
interval.getStart()
);
tombstonesAndVersions.put(interval, segmentIdWithShardSpec);
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer;
import org.joda.time.DateTime;
@ -141,4 +142,14 @@ public class IndexTaskUtils
metricBuilder.setDimension(DruidMetrics.TASK_ID, taskStatus.getId());
metricBuilder.setDimension(DruidMetrics.TASK_STATUS, taskStatus.getStatusCode().toString());
}
public static void setSegmentDimensions(
ServiceMetricEvent.Builder metricBuilder,
DataSegment segment
)
{
final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType();
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
}
}

View File

@ -34,9 +34,11 @@ import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@ -198,6 +200,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
if (nextBatchSize <= 0) {
break;
}
unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
@ -218,7 +221,22 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
// abandoned.
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
// Fetch the load specs of all segments overlapping with the given interval
final Set<Map<String, Object>> usedSegmentLoadSpecs = toolbox
.getTaskActionClient()
.submit(new RetrieveUsedSegmentsAction(getDataSource(), getInterval(), null, Segments.INCLUDING_OVERSHADOWED))
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet());
// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
.stream()
.filter(unusedSegment -> !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
.collect(Collectors.toList());
toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
numBatchesProcessed++;
numSegmentsKilled += unusedSegments.size();

View File

@ -57,7 +57,7 @@ public class TaskLockHelper
private final Map<Interval, OverwritingRootGenerationPartitions> overwritingRootGenPartitions = new HashMap<>();
private final Set<DataSegment> lockedExistingSegments = new HashSet<>();
private final boolean useSegmentLock;
private final boolean useSharedLock;
private final TaskLockType taskLockType;
@Nullable
private Granularity knownSegmentGranularity;
@ -91,10 +91,10 @@ public class TaskLockHelper
}
}
public TaskLockHelper(boolean useSegmentLock, boolean useSharedLock)
public TaskLockHelper(boolean useSegmentLock, TaskLockType taskLockType)
{
this.useSegmentLock = useSegmentLock;
this.useSharedLock = useSharedLock;
this.taskLockType = taskLockType;
}
public boolean isUseSegmentLock()
@ -107,19 +107,9 @@ public class TaskLockHelper
return useSegmentLock ? LockGranularity.SEGMENT : LockGranularity.TIME_CHUNK;
}
public boolean isUseSharedLock()
{
return useSharedLock;
}
public TaskLockType getLockTypeToUse()
{
return useSharedLock ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
}
public boolean hasLockedExistingSegments()
{
return !lockedExistingSegments.isEmpty();
return taskLockType;
}
public boolean hasOverwritingRootGenerationPartition(Interval interval)
@ -137,6 +127,9 @@ public class TaskLockHelper
return overwritingRootGenPartitions.get(interval);
}
/**
* Verify and lock existing segments when using a SegmentLock
*/
boolean verifyAndLockExistingSegments(TaskActionClient actionClient, List<DataSegment> segments)
throws IOException
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task;
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -47,12 +48,14 @@ public class Tasks
public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
public static final boolean DEFAULT_USE_MAX_MEMORY_ESTIMATES = false;
public static final TaskLockType DEFAULT_TASK_LOCK_TYPE = TaskLockType.EXCLUSIVE;
public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String USE_SHARED_LOCK = "useSharedLock";
public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns";
public static final String USE_SHARED_LOCK = "useSharedLock";
public static final String TASK_LOCK_TYPE = "taskLockType";
/**
* Context flag denoting if maximum possible values should be used to estimate

View File

@ -44,7 +44,6 @@ import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
@ -433,8 +432,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
{
return determineLockGranularityAndTryLock(
taskActionClient,
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals(),
ingestionSchema.getIOConfig()
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
);
}
@ -1156,8 +1154,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
for (Interval interval : tombstoneIntervals) {
SegmentIdWithShardSpec segmentIdWithShardSpec = allocateNewSegmentForTombstone(
ingestionSchema,
interval.getStart(),
toolbox
interval.getStart()
);
tombstonesAnShards.put(interval, segmentIdWithShardSpec);
}
@ -1170,10 +1167,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
}
}
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)
);
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten, segmentsToPublish));
final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess();

View File

@ -26,7 +26,10 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
@ -216,7 +219,7 @@ public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner
/**
* Allocate a new segment for the given timestamp locally. This method is called when dynamic partitioning is used
* and {@link org.apache.druid.indexing.common.LockGranularity} is {@code TIME_CHUNK}.
* and {@link LockGranularity} is {@code TIME_CHUNK}.
*
* The allocation algorithm is similar to the Overlord-based segment allocation. It keeps the segment allocation
* history per sequenceName. If the prevSegmentId is found in the segment allocation history, this method
@ -283,7 +286,8 @@ public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner
NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp) throws IOException
{
return AbstractBatchIndexTask.findIntervalAndVersion(getToolbox(), ingestionSchema, timestamp);
TaskLockType taskLockType = TaskLocks.determineLockTypeForAppend(getContext());
return AbstractBatchIndexTask.findIntervalAndVersion(getToolbox(), ingestionSchema, timestamp, taskLockType);
}
@Override

View File

@ -218,8 +218,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
{
return determineLockGranularityAndTryLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals(),
ingestionSchema.getIOConfig()
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
);
}

View File

@ -23,8 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.Interval;
@ -142,7 +142,7 @@ public class LockRequestForNewSegment implements LockRequest
public String getVersion()
{
if (version == null) {
version = DateTimes.nowUtc().toString();
version = TaskLocks.defaultLockVersion(lockType);
}
return version;
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
@ -43,6 +44,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -889,6 +891,71 @@ public class TaskLockbox
}
}
/**
* Finds the active non-revoked REPLACE locks held by the given task.
*/
public Set<ReplaceTaskLock> findReplaceLocksForTask(Task task)
{
giant.lock();
try {
return getNonRevokedReplaceLocks(findLockPossesForTask(task), task.getDataSource());
}
finally {
giant.unlock();
}
}
/**
* Finds all the active non-revoked REPLACE locks for the given datasource.
*/
public Set<ReplaceTaskLock> getAllReplaceLocksForDatasource(String datasource)
{
giant.lock();
try {
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> activeLocks = running.get(datasource);
if (activeLocks == null) {
return ImmutableSet.of();
}
List<TaskLockPosse> lockPosses
= activeLocks.values()
.stream()
.flatMap(map -> map.values().stream())
.flatMap(Collection::stream)
.collect(Collectors.toList());
return getNonRevokedReplaceLocks(lockPosses, datasource);
}
finally {
giant.unlock();
}
}
private Set<ReplaceTaskLock> getNonRevokedReplaceLocks(List<TaskLockPosse> posses, String datasource)
{
final Set<ReplaceTaskLock> replaceLocks = new HashSet<>();
for (TaskLockPosse posse : posses) {
final TaskLock lock = posse.getTaskLock();
if (lock.isRevoked() || !TaskLockType.REPLACE.equals(posse.getTaskLock().getType())) {
continue;
}
// Replace locks are always held by the supervisor task
if (posse.taskIds.size() > 1) {
throw new ISE(
"Replace lock[%s] for datasource[%s] is held by multiple tasks[%s]",
lock, datasource, posse.taskIds
);
}
String supervisorTaskId = posse.taskIds.iterator().next();
replaceLocks.add(
new ReplaceTaskLock(supervisorTaskId, lock.getInterval(), lock.getVersion())
);
}
return replaceLocks;
}
/**
* Gets a List of Intervals locked by higher priority tasks for each datasource.
* Here, Segment Locks are being treated the same as Time Chunk Locks i.e.

View File

@ -23,8 +23,8 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -115,7 +115,7 @@ public class TimeChunkLockRequest implements LockRequest
@Override
public String getVersion()
{
return preferredVersion == null ? DateTimes.nowUtc().toString() : preferredVersion;
return preferredVersion == null ? TaskLocks.defaultLockVersion(lockType) : preferredVersion;
}
@Override

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
@ -28,6 +29,7 @@ import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.SpecificSegmentLockRequest;
@ -36,6 +38,7 @@ import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -136,9 +139,13 @@ public class TaskLocksTest
);
}
private LockResult tryTimeChunkLock(Task task, Interval interval)
private TaskLock tryTimeChunkLock(Task task, Interval interval, TaskLockType lockType)
{
return lockbox.tryLock(task, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task, interval, null));
final TaskLock taskLock = lockbox
.tryLock(task, new TimeChunkLockRequest(lockType, task, interval, null))
.getTaskLock();
Assert.assertNotNull(taskLock);
return taskLock;
}
private LockResult trySegmentLock(Task task, Interval interval, String version, int partitonId)
@ -162,11 +169,7 @@ public class TaskLocksTest
final Map<Interval, TaskLock> locks = intervals.stream().collect(
Collectors.toMap(
Function.identity(),
interval -> {
final TaskLock lock = tryTimeChunkLock(task, interval).getTaskLock();
Assert.assertNotNull(lock);
return lock;
}
interval -> tryTimeChunkLock(task, interval, TaskLockType.EXCLUSIVE)
)
);
@ -206,11 +209,7 @@ public class TaskLocksTest
final Map<Interval, TaskLock> locks = intervals.stream().collect(
Collectors.toMap(
Function.identity(),
interval -> {
final TaskLock lock = tryTimeChunkLock(task, interval).getTaskLock();
Assert.assertNotNull(lock);
return lock;
}
interval -> tryTimeChunkLock(task, interval, TaskLockType.EXCLUSIVE)
)
);
@ -231,11 +230,7 @@ public class TaskLocksTest
final Map<Interval, TaskLock> locks = lockIntervals.stream().collect(
Collectors.toMap(
Function.identity(),
interval -> {
final TaskLock lock = tryTimeChunkLock(task, interval).getTaskLock();
Assert.assertNotNull(lock);
return lock;
}
interval -> tryTimeChunkLock(task, interval, TaskLockType.EXCLUSIVE)
)
);
@ -256,11 +251,7 @@ public class TaskLocksTest
final Map<Interval, TaskLock> locks = intervals.stream().collect(
Collectors.toMap(
Function.identity(),
interval -> {
final TaskLock lock = tryTimeChunkLock(task, interval).getTaskLock();
Assert.assertNotNull(lock);
return lock;
}
interval -> tryTimeChunkLock(task, interval, TaskLockType.EXCLUSIVE)
)
);
@ -305,6 +296,82 @@ public class TaskLocksTest
);
}
@Test
public void testFindReplaceLocksCoveringSegments()
{
final Set<DataSegment> segments = createTimeChunkedSegments();
final Map<DataSegment, TaskLock> lockResults = segments.stream().collect(
Collectors.toMap(
segment -> segment,
segment -> tryTimeChunkLock(task, segment.getInterval(), TaskLockType.REPLACE)
)
);
final Map<DataSegment, ReplaceTaskLock> observedLocks
= TaskLocks.findReplaceLocksCoveringSegments(task.getDataSource(), lockbox, segments);
Assert.assertEquals(segments.size(), observedLocks.size());
for (DataSegment segment : segments) {
TaskLock lockFromResult = lockResults.get(segment);
Assert.assertEquals(
new ReplaceTaskLock(task.getId(), lockFromResult.getInterval(), lockFromResult.getVersion()),
observedLocks.get(segment)
);
}
}
@Test
public void testLockTypeForAppendWithLockTypeInContext()
{
Assert.assertEquals(
TaskLockType.REPLACE,
TaskLocks.determineLockTypeForAppend(
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, "REPLACE")
)
);
Assert.assertEquals(
TaskLockType.APPEND,
TaskLocks.determineLockTypeForAppend(
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, "APPEND")
)
);
Assert.assertEquals(
TaskLockType.SHARED,
TaskLocks.determineLockTypeForAppend(
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, "SHARED")
)
);
Assert.assertEquals(
TaskLockType.EXCLUSIVE,
TaskLocks.determineLockTypeForAppend(
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, "EXCLUSIVE", Tasks.USE_SHARED_LOCK, true)
)
);
}
@Test
public void testLockTypeForAppendWithNoLockTypeInContext()
{
Assert.assertEquals(
TaskLockType.EXCLUSIVE,
TaskLocks.determineLockTypeForAppend(
ImmutableMap.of()
)
);
Assert.assertEquals(
TaskLockType.EXCLUSIVE,
TaskLocks.determineLockTypeForAppend(
ImmutableMap.of(Tasks.USE_SHARED_LOCK, false)
)
);
Assert.assertEquals(
TaskLockType.SHARED,
TaskLocks.determineLockTypeForAppend(
ImmutableMap.of(Tasks.USE_SHARED_LOCK, true)
)
);
}
private TimeChunkLock newTimeChunkLock(Interval interval, String version)
{
return new TimeChunkLock(

View File

@ -107,19 +107,21 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final TestUtils testUtils = new TestUtils();
protected final TestUtils testUtils = new TestUtils();
private final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
private SegmentCacheManagerFactory segmentCacheManagerFactory;
private TaskStorage taskStorage;
private IndexerSQLMetadataStorageCoordinator storageCoordinator;
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
private File baseDir;
@Before
public void setUpIngestionTestBase() throws IOException
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
temporaryFolder.create();
baseDir = temporaryFolder.newFolder();
final SQLMetadataConnector connector = derbyConnectorRule.getConnector();
connector.createTaskTables();
@ -225,6 +227,30 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
);
}
public TaskToolbox createTaskToolbox(TaskConfig config, Task task)
{
return new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
.taskActionClient(createActionClient(task))
.segmentPusher(new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()))
.dataSegmentKiller(new NoopDataSegmentKiller())
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(objectMapper)
.taskWorkDir(baseDir)
.indexIO(getIndexIO())
.indexMergerV9(testUtils.getIndexMergerV9Factory()
.create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory())
.appenderatorsManager(new TestAppenderatorsManager())
.taskLogPusher(null)
.attemptId("1")
.build();
}
public IndexIO getIndexIO()
{
return testUtils.getTestIndexIO();
@ -383,7 +409,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
.dataSegmentKiller(new NoopDataSegmentKiller())
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(objectMapper)
.taskWorkDir(temporaryFolder.newFolder())
.taskWorkDir(baseDir)
.indexIO(getIndexIO())
.indexMergerV9(testUtils.getIndexMergerV9Factory()
.create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.concurrent;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test task that can only invoke task actions.
*/
public class ActionsTestTask extends CommandQueueTask
{
private final TaskActionClient client;
private final AtomicInteger sequenceId = new AtomicInteger(0);
public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory)
{
super(datasource, groupId);
this.client = factory.create(this);
}
public TaskLock acquireReplaceLockOn(Interval interval)
{
return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.REPLACE, interval));
}
public TaskLock acquireAppendLockOn(Interval interval)
{
return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.APPEND, interval));
}
public SegmentPublishResult commitReplaceSegments(DataSegment... segments)
{
return runAction(
SegmentTransactionalReplaceAction.create(Sets.newHashSet(segments))
);
}
public SegmentPublishResult commitAppendSegments(DataSegment... segments)
{
return runAction(
SegmentTransactionalAppendAction.create(Sets.newHashSet(segments))
);
}
public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Granularity preferredSegmentGranularity)
{
return runAction(
new SegmentAllocateAction(
getDataSource(),
timestamp,
Granularities.SECOND,
preferredSegmentGranularity,
getId() + "__" + sequenceId.getAndIncrement(),
null,
false,
NumberedPartialShardSpec.instance(),
LockGranularity.TIME_CHUNK,
TaskLockType.APPEND
)
);
}
private <T> T runAction(TaskAction<T> action)
{
return execute(() -> client.submit(action));
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.concurrent;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test task that can be given a series of commands to execute in its {@link #runTask} method.
*/
public class CommandQueueTask extends AbstractTask
{
private static final Logger log = new Logger(CommandQueueTask.class);
private final Object queueNotification = new Object();
private final BlockingQueue<Command<?>> commandQueue = new LinkedBlockingQueue<>();
private final AtomicBoolean finishRequested = new AtomicBoolean(false);
private final AtomicInteger numCommandsExecuted = new AtomicInteger(0);
private final CompletableFuture<TaskStatus> finalTaskStatus = new CompletableFuture<>();
public CommandQueueTask(String datasource, String groupId)
{
super(
StringUtils.format("test_%s_%s", datasource, UUID.randomUUID().toString()),
groupId,
null,
datasource,
null
);
}
/**
* Marks the run of this task as finished so that no new commands are accepted.
* This methods waits for all the commands submitted so far to finish execution
* and returns the final TaskStatus.
*/
public TaskStatus finishRunAndGetStatus()
{
synchronized (finishRequested) {
finishRequested.set(true);
}
synchronized (queueNotification) {
queueNotification.notifyAll();
}
try {
return finalTaskStatus.get(10, TimeUnit.SECONDS);
}
catch (Exception e) {
throw new ISE(e, "Error waiting for task[%s] to finish", getId());
}
}
/**
* Submits the given runnable for execution on the task thread. This method
* returns immediately and does not wait for the execution to finish.
*/
public void submit(Runnable runnable)
{
// Add a command with a dummy return value
Command<?> command = new Command<>(
() -> {
runnable.run();
return 1;
}
);
addToQueue(command);
}
/**
* Executes the given callable on the task thread. This method waits until the
* execution has finished and returns the computed value.
*/
public <V> V execute(Callable<V> callable)
{
Command<V> command = new Command<>(callable);
addToQueue(command);
return waitForCommandToFinish(command);
}
private <V> void addToQueue(Command<V> command)
{
synchronized (finishRequested) {
if (finishRequested.get()) {
throw new ISE("Task[%s] cannot accept any more commands as it is already shutting down.", getId());
} else {
boolean added = commandQueue.offer(command);
if (!added) {
throw new ISE("Could not add command to task[%s].", getId());
}
}
}
synchronized (queueNotification) {
queueNotification.notifyAll();
}
}
private <V> V waitForCommandToFinish(Command<V> command)
{
try {
return command.value.get(10, TimeUnit.SECONDS);
}
catch (Exception e) {
throw new ISE(e, "Error waiting for command on task[%s] to finish", getId());
}
}
@Override
public TaskStatus runTask(TaskToolbox taskToolbox)
{
TaskStatus status;
try {
while (true) {
synchronized (finishRequested) {
if (finishRequested.get() && commandQueue.isEmpty()) {
break;
}
}
Command<?> command = commandQueue.poll();
if (command == null) {
synchronized (queueNotification) {
queueNotification.wait(10_000);
}
} else {
log.info("Running command[%d] for task[%s]", numCommandsExecuted.get(), getId());
command.execute();
numCommandsExecuted.incrementAndGet();
}
}
status = TaskStatus.success(getId());
}
catch (Exception e) {
log.error(e, "Error while running command[%d] for task[%s]", numCommandsExecuted.get(), getId());
status = TaskStatus.failure(getId(), e.getMessage());
}
finalTaskStatus.complete(status);
return status;
}
@Override
public String getType()
{
return "test_command_executing";
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return true;
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
}
private static class Command<V>
{
final Callable<V> callable;
final CompletableFuture<V> value = new CompletableFuture<>();
Command(Callable<V> callable)
{
this.callable = callable;
}
void execute() throws Exception
{
try {
V result = callable.call();
value.complete(result);
}
catch (Exception e) {
value.completeExceptionally(e);
throw e;
}
}
}
}

View File

@ -0,0 +1,854 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.concurrent;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TestTaskToolboxFactory;
import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Contains tests to verify behaviour of concurrently running REPLACE and APPEND
* tasks on the same interval of a datasource.
* <p>
* The tests verify the interleaving of the following actions:
* <ul>
* <li>LOCK: Acquisition of a lock on an interval by a replace task</li>
* <li>ALLOCATE: Allocation of a pending segment by an append task</li>
* <li>REPLACE: Commit of segments created by a replace task</li>
* <li>APPEND: Commit of segments created by an append task</li>
* </ul>
*/
public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
{
/**
* The version used by append jobs when no previous replace job has run on an interval.
*/
private static final String SEGMENT_V0 = DateTimes.EPOCH.toString();
private static final Interval YEAR_23 = Intervals.of("2023/2024");
private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
private static final Interval DEC_23 = Intervals.of("2023-12/2024-01");
private static final Interval OCT_NOV_DEC_23 = Intervals.of("2023-10-01/2024-01-01");
private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");
private static final String WIKI = "wiki";
private TaskQueue taskQueue;
private TaskActionClientFactory taskActionClientFactory;
private TaskActionClient dummyTaskActionClient;
private final List<ActionsTestTask> runningTasks = new ArrayList<>();
private ActionsTestTask appendTask;
private ActionsTestTask replaceTask;
private final AtomicInteger groupId = new AtomicInteger(0);
@Before
public void setup()
{
final TaskConfig taskConfig = new TaskConfigBuilder().build();
taskActionClientFactory = createActionClientFactory();
dummyTaskActionClient = taskActionClientFactory.create(NoopTask.create());
final WorkerConfig workerConfig = new WorkerConfig().setCapacity(10);
TaskRunner taskRunner = new ThreadingTaskRunner(
createToolboxFactory(taskConfig, taskActionClientFactory),
taskConfig,
workerConfig,
new NoopTaskLogs(),
getObjectMapper(),
new TestAppenderatorsManager(),
new MultipleFileTaskReportFileWriter(),
new DruidNode("middleManager", "host", false, 8091, null, true, false),
TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
);
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(0L), null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
taskActionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
runningTasks.clear();
taskQueue.start();
groupId.set(0);
appendTask = createAndStartTask();
replaceTask = createAndStartTask();
}
@After
public void tearDown()
{
for (ActionsTestTask task : runningTasks) {
task.finishRunAndGetStatus();
}
}
@Test
public void testLockReplaceAllocateAppend()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(segmentV10.getVersion(), pendingSegment.getVersion());
final DataSegment segmentV11 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendDayReplaceDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceDayAppendDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceDayAppendDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockAppendDayReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
replaceTask.finishRunAndGetStatus();
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateAppendDayLockReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
// Verify that the segment appended to v0 gets fully overshadowed
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
}
@Test
public void testLockReplaceMonthAllocateAppendDay()
{
String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
// Verify that the allocated segment takes the version and interval of previous replace
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(v1, pendingSegment.getVersion());
final DataSegment segmentV11 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendDayReplaceMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceMonthAppendDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceMonthAppendDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockAppendDayReplaceMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateAppendDayLockReplaceMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
// Verify that the old segment gets completely replaced
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10);
}
@Test
public void testLockReplaceDayAllocateAppendMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that new segment gets allocated with DAY granularity even though preferred was MONTH
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(v1, pendingSegment.getVersion());
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
final DataSegment segmentV11 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendMonthReplaceDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that the segment is allocated for DAY granularity
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceDayAppendMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that the segment is allocated for DAY granularity instead of MONTH
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.interval(FIRST_OF_JAN_23)
.version(v1)
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceDayAppendMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
// Verify that replace lock cannot be acquired on MONTH
TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
Assert.assertNull(replaceLock);
// Verify that segment cannot be committed since there is no lock
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, SEGMENT_V0);
final ISE exception = Assert.assertThrows(ISE.class, () -> replaceTask.commitReplaceSegments(segmentV10));
final Throwable throwable = Throwables.getRootCause(exception);
Assert.assertEquals(
StringUtils.format(
"Segments[[%s]] are not covered by locks[[]] for task[%s]",
segmentV10, replaceTask.getId()
),
throwable.getMessage()
);
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
}
@Test
public void testAllocateAppendMonthLockReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
// Verify that replace lock cannot be acquired on DAY as MONTH is already locked
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
Assert.assertNull(replaceLock);
}
@Test
public void testLockReplaceAllocateLockReplaceLockReplaceAppend()
{
// Commit initial segments for v1
final ActionsTestTask replaceTask1 = createAndStartTask();
final String v1 = replaceTask1.acquireReplaceLockOn(YEAR_23).getVersion();
final DataSegment segmentV10 = createSegment(YEAR_23, v1);
replaceTask1.commitReplaceSegments(segmentV10);
replaceTask1.finishRunAndGetStatus();
verifyIntervalHasUsedSegments(YEAR_23, segmentV10);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV10);
// Allocate an append segment for v1
final ActionsTestTask appendTask1 = createAndStartTask();
appendTask1.acquireAppendLockOn(YEAR_23);
final SegmentIdWithShardSpec pendingSegmentV11
= appendTask1.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
Assert.assertEquals(segmentV10.getVersion(), pendingSegmentV11.getVersion());
// Commit replace segment for v2
final ActionsTestTask replaceTask2 = createAndStartTask();
final String v2 = replaceTask2.acquireReplaceLockOn(YEAR_23).getVersion();
final DataSegment segmentV20 = DataSegment.builder(segmentV10).version(v2).build();
replaceTask2.commitReplaceSegments(segmentV20);
replaceTask2.finishRunAndGetStatus();
verifyIntervalHasUsedSegments(YEAR_23, segmentV10, segmentV20);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV20);
final ActionsTestTask replaceTask3 = createAndStartTask();
final String v3 = replaceTask3.acquireReplaceLockOn(YEAR_23).getVersion();
// Commit append segment to v1 and verify that it gets upgraded to v2
final DataSegment segmentV11 = asSegment(pendingSegmentV11);
final DataSegment segmentV21 = DataSegment.builder(segmentV11).version(v2).build();
Set<DataSegment> appendedSegments = appendTask1.commitAppendSegments(segmentV11).getSegments();
Assert.assertEquals(Sets.newHashSet(segmentV21, segmentV11), appendedSegments);
appendTask1.finishRunAndGetStatus();
verifyIntervalHasUsedSegments(
YEAR_23,
segmentV20, segmentV21, segmentV10, segmentV11
);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV20, segmentV21);
// Commit replace segment v2 and verify that append segment gets upgraded to v2
final DataSegment segmentV30 = DataSegment.builder(segmentV20).version(v3).build();
replaceTask3.commitReplaceSegments(segmentV30);
replaceTask3.finishRunAndGetStatus();
final DataSegment segmentV31 = DataSegment.builder(segmentV21).version(v3).build();
verifyIntervalHasUsedSegments(
YEAR_23,
segmentV10, segmentV11, segmentV20, segmentV21, segmentV30, segmentV31
);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV30, segmentV31);
}
@Test
public void testLockReplaceMultipleAppends()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
appendTask.acquireAppendLockOn(FIRST_OF_JAN_23);
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(segmentV10.getVersion(), pendingSegment.getVersion());
final ActionsTestTask appendTask2 = createAndStartTask();
appendTask2.acquireAppendLockOn(FIRST_OF_JAN_23);
final SegmentIdWithShardSpec pendingSegment2
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(segmentV10.getVersion(), pendingSegment2.getVersion());
final DataSegment segmentV11 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV11);
final DataSegment segmentV12 = asSegment(pendingSegment2);
appendTask.commitAppendSegments(segmentV12);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10, segmentV11, segmentV12);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11, segmentV12);
}
@Test
public void testMultipleGranularities()
{
// Allocate segment for Jan 1st
appendTask.acquireAppendLockOn(FIRST_OF_JAN_23);
final SegmentIdWithShardSpec pendingSegment01
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment01.getVersion());
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment01.getInterval());
// Allocate segment for Oct-Dec
final ActionsTestTask appendTask2 = createAndStartTask();
appendTask2.acquireAppendLockOn(OCT_NOV_DEC_23);
final SegmentIdWithShardSpec pendingSegment02
= appendTask2.allocateSegmentForTimestamp(OCT_NOV_DEC_23.getStart(), Granularities.QUARTER);
Assert.assertEquals(SEGMENT_V0, pendingSegment02.getVersion());
Assert.assertEquals(OCT_NOV_DEC_23, pendingSegment02.getInterval());
// Append segment for Oct-Dec
final DataSegment segmentV02 = asSegment(pendingSegment02);
appendTask2.commitAppendSegments(segmentV02);
verifyIntervalHasUsedSegments(YEAR_23, segmentV02);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV02);
// Try to Allocate segment for Dec
final ActionsTestTask appendTask3 = createAndStartTask();
appendTask3.acquireAppendLockOn(DEC_23);
final SegmentIdWithShardSpec pendingSegment03
= appendTask3.allocateSegmentForTimestamp(DEC_23.getStart(), Granularities.MONTH);
// Verify that segment gets allocated for quarter instead of month
Assert.assertEquals(SEGMENT_V0, pendingSegment03.getVersion());
Assert.assertEquals(OCT_NOV_DEC_23, pendingSegment03.getInterval());
// Acquire replace lock on whole year
final String v1 = replaceTask.acquireReplaceLockOn(YEAR_23).getVersion();
// Append segment for Jan 1st
final DataSegment segmentV01 = asSegment(pendingSegment01);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV02);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01, segmentV02);
// Replace segment for whole year
final DataSegment segmentV10 = createSegment(YEAR_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(YEAR_23)
.shardSpec(new NumberedShardSpec(1, 1))
.build();
// Verify that segmentV01 is upgraded to segmentV11 and segmentV02 is replaced
verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV02, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV10, segmentV11);
// Append segment for quarter
final DataSegment segmentV03 = asSegment(pendingSegment03);
appendTask3.commitAppendSegments(segmentV03);
final DataSegment segmentV13 = DataSegment.builder(segmentV03)
.version(v1)
.interval(YEAR_23)
.shardSpec(new NumberedShardSpec(2, 1))
.build();
verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV02, segmentV03, segmentV10, segmentV11, segmentV13);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV10, segmentV11, segmentV13);
}
private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
return new DataSegment(
id,
Collections.singletonMap(id.toString(), id.toString()),
Collections.emptyList(),
Collections.emptyList(),
pendingSegment.getShardSpec(),
null,
0,
0
);
}
private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... expectedSegments)
{
verifySegments(interval, Segments.INCLUDING_OVERSHADOWED, expectedSegments);
}
private void verifyIntervalHasVisibleSegments(Interval interval, DataSegment... expectedSegments)
{
verifySegments(interval, Segments.ONLY_VISIBLE, expectedSegments);
}
private void verifySegments(Interval interval, Segments visibility, DataSegment... expectedSegments)
{
try {
Collection<DataSegment> allUsedSegments = dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(interval),
visibility
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
}
catch (IOException e) {
throw new ISE(e, "Error while fetching used segments in interval[%s]", interval);
}
}
private TaskToolboxFactory createToolboxFactory(
TaskConfig taskConfig,
TaskActionClientFactory taskActionClientFactory
)
{
TestTaskToolboxFactory.Builder builder = new TestTaskToolboxFactory.Builder()
.setConfig(taskConfig)
.setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT))
.setTaskActionClientFactory(taskActionClientFactory);
return new TestTaskToolboxFactory(builder)
{
@Override
public TaskToolbox build(TaskConfig config, Task task)
{
return createTaskToolbox(config, task);
}
};
}
private DataSegment createSegment(Interval interval, String version)
{
return DataSegment.builder()
.dataSource(WIKI)
.interval(interval)
.version(version)
.size(100)
.build();
}
private ActionsTestTask createAndStartTask()
{
ActionsTestTask task = new ActionsTestTask(WIKI, "test_" + groupId.incrementAndGet(), taskActionClientFactory);
taskQueue.add(task);
runningTasks.add(task);
return task;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartialShardSpec;
@ -156,6 +157,24 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return Collections.emptyMap();
}
@Override
public SegmentPublishResult commitReplaceSegments(
Set<DataSegment> replaceSegments,
Set<ReplaceTaskLock> locksHeldByReplaceTask
)
{
return SegmentPublishResult.ok(commitSegments(replaceSegments));
}
@Override
public SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
)
{
return SegmentPublishResult.ok(commitSegments(appendSegments));
}
@Override
public SegmentPublishResult commitSegmentsAndMetadata(
Set<DataSegment> segments,

View File

@ -77,6 +77,8 @@ public interface MetadataStorageConnector
void createSegmentTable();
void createUpgradeSegmentsTable();
void createRulesTable();
void createConfigTable();

View File

@ -34,7 +34,7 @@ public class MetadataStorageTablesConfig
public static MetadataStorageTablesConfig fromBase(String base)
{
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null, null, null);
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null, null, null, null);
}
public static final String TASK_ENTRY_TYPE = "task";
@ -57,6 +57,9 @@ public class MetadataStorageTablesConfig
@JsonProperty("segments")
private final String segmentsTable;
@JsonProperty("upgradeSegments")
private final String upgradeSegmentsTable;
@JsonProperty("rules")
private final String rulesTable;
@ -90,13 +93,15 @@ public class MetadataStorageTablesConfig
@JsonProperty("taskLog") String taskLogTable,
@JsonProperty("taskLock") String taskLockTable,
@JsonProperty("audit") String auditTable,
@JsonProperty("supervisors") String supervisorTable
@JsonProperty("supervisors") String supervisorTable,
@JsonProperty("upgradeSegments") String upgradeSegmentsTable
)
{
this.base = (base == null) ? DEFAULT_BASE : base;
this.dataSourceTable = makeTableName(dataSourceTable, "dataSource");
this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments");
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.upgradeSegmentsTable = makeTableName(upgradeSegmentsTable, "upgradeSegments");
this.rulesTable = makeTableName(rulesTable, "rules");
this.configTable = makeTableName(configTable, "config");
@ -142,6 +147,11 @@ public class MetadataStorageTablesConfig
return segmentsTable;
}
public String getUpgradeSegmentsTable()
{
return upgradeSegmentsTable;
}
public String getRulesTable()
{
return rulesTable;

View File

@ -80,5 +80,6 @@ public class MetadataStorageTablesConfigTest
);
Assert.assertEquals(props.getProperty("druid.metadata.storage.tables.dataSource"), config.getDataSourceTable());
Assert.assertEquals(props.getProperty("druid.metadata.storage.tables.supervisors"), config.getSupervisorTable());
Assert.assertEquals(props.getProperty("druid.metadata.storage.tables.upgradeSegments"), config.getUpgradeSegmentsTable());
}
}

View File

@ -54,6 +54,12 @@ public class TestMetadataStorageConnector implements MetadataStorageConnector
throw new UnsupportedOperationException();
}
@Override
public void createUpgradeSegmentsTable()
{
throw new UnsupportedOperationException();
}
@Override
public void createRulesTable()
{

View File

@ -37,6 +37,7 @@ public class TestMetadataStorageTablesConfig extends MetadataStorageTablesConfig
null,
null,
null,
null,
null
);
}

View File

@ -27,5 +27,6 @@ druid.metadata.storage.tables.taskLock=fff_tasklock
druid.metadata.storage.tables.audit=ggg_audit
druid.metadata.storage.tables.dataSource=hhh_dataSource
druid.metadata.storage.tables.supervisors=iii_supervisors
druid.metadata.storage.tables.upgradeSegments=jjj_upgradeSegments
druid.query.segmentMetadata.defaultAnalysisTypes=["cardinality", "size"]
druid.query.segmentMetadata.defaultHistory=P2W

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartialShardSpec;
@ -254,7 +255,7 @@ public interface IndexerMetadataStorageCoordinator
* commit metadata.
*
* If segmentsToDrop is not null and not empty, this insertion will be atomic with a insert-and-drop on inserting
* {@param segments} and dropping {@param segmentsToDrop}
* {@param segments} and dropping {@param segmentsToDrop}.
*
* @param segments set of segments to add, must all be from the same dataSource
* @param startMetadata dataSource metadata pre-insert must match this startMetadata according to
@ -277,6 +278,47 @@ public interface IndexerMetadataStorageCoordinator
@Nullable DataSourceMetadata endMetadata
) throws IOException;
/**
* Commits segments created by an APPEND task. This method also handles segment
* upgrade scenarios that may result from concurrent append and replace.
* <ul>
* <li>If a REPLACE task committed a segment that overlaps with any of the
* appendSegments while this APPEND task was in progress, the appendSegments
* are upgraded to the version of the replace segment.</li>
* <li>If an appendSegment is covered by a currently active REPLACE lock, then
* an entry is created for it in the upgrade_segments table, so that when the
* REPLACE task finishes, it can upgrade the appendSegment as required.</li>
* </ul>
*
* @param appendSegments All segments created by an APPEND task that
* must be committed in a single transaction.
* @param appendSegmentToReplaceLock Map from append segment to the currently
* active REPLACE lock (if any) covering it
*/
SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
);
/**
* Commits segments created by a REPLACE task. This method also handles the
* segment upgrade scenarios that may result from concurrent append and replace.
* <ul>
* <li>If an APPEND task committed a segment to an interval locked by this task,
* the append segment is upgraded to the version of the corresponding lock.
* This is done with the help of entries created in the upgrade_segments table
* in {@link #commitAppendSegments}</li>
* </ul>
*
* @param replaceSegments All segments created by a REPLACE task that
* must be committed in a single transaction.
* @param locksHeldByReplaceTask All active non-revoked REPLACE locks held by the task
*/
SegmentPublishResult commitReplaceSegments(
Set<DataSegment> replaceSegments,
Set<ReplaceTaskLock> locksHeldByReplaceTask
);
/**
* Retrieves data source's metadata from the metadata store. Returns null if there is no metadata.
*/

View File

@ -55,9 +55,12 @@ import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -89,6 +92,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -123,6 +127,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
connector.createDataSourceTable();
connector.createPendingSegmentsTable();
connector.createSegmentTable();
connector.createUpgradeSegmentsTable();
}
@Override
@ -399,6 +404,69 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
@Override
public SegmentPublishResult commitReplaceSegments(
final Set<DataSegment> replaceSegments,
final Set<ReplaceTaskLock> locksHeldByReplaceTask
)
{
verifySegmentsToCommit(replaceSegments);
try {
return connector.retryTransaction(
(handle, transactionStatus) -> {
final Set<DataSegment> segmentsToInsert = new HashSet<>(replaceSegments);
segmentsToInsert.addAll(
getSegmentsToUpgradeOnReplace(handle, replaceSegments, locksHeldByReplaceTask)
);
return SegmentPublishResult.ok(
insertSegments(handle, segmentsToInsert)
);
},
3,
getSqlMetadataMaxRetry()
);
}
catch (CallbackFailedException e) {
return SegmentPublishResult.fail(e.getMessage());
}
}
@Override
public SegmentPublishResult commitAppendSegments(
final Set<DataSegment> appendSegments,
final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
)
{
verifySegmentsToCommit(appendSegments);
final String dataSource = appendSegments.iterator().next().getDataSource();
final Set<DataSegment> upgradedSegments = connector.retryTransaction(
(handle, transactionStatus)
-> getSegmentsToUpgradeOnAppend(handle, dataSource, appendSegments),
0,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
// Create entries for all required versions of the append segments
final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
allSegmentsToInsert.addAll(upgradedSegments);
try {
return connector.retryTransaction(
(handle, transactionStatus) -> {
insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
return SegmentPublishResult.ok(insertSegments(handle, allSegmentsToInsert));
},
3,
getSqlMetadataMaxRetry()
);
}
catch (CallbackFailedException e) {
return SegmentPublishResult.fail(e.getMessage());
}
}
@Override
public SegmentPublishResult commitMetadataOnly(
String dataSource,
@ -977,6 +1045,175 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.execute();
}
/**
* Allocates and returns any extra versions that need to be committed for the
* given append segments.
* <p>
* This is typically needed when a REPLACE task started and finished after
* these append segments had already been allocated. As such,
* there would be some used segments in the DB with versions higher than these
* append segments.
*/
private Set<DataSegment> getSegmentsToUpgradeOnAppend(
Handle handle,
String dataSource,
Set<DataSegment> segmentsToAppend
) throws IOException
{
if (segmentsToAppend.isEmpty()) {
return Collections.emptySet();
}
final Set<Interval> appendIntervals = new HashSet<>();
final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new TreeMap<>();
for (DataSegment segment : segmentsToAppend) {
appendIntervals.add(segment.getInterval());
appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
.add(segment);
}
// Fetch all used segments that overlap with any of the append intervals
final Collection<DataSegment> overlappingSegments = retrieveUsedSegmentsForIntervals(
dataSource,
new ArrayList<>(appendIntervals),
Segments.INCLUDING_OVERSHADOWED
);
final Map<String, Set<Interval>> committedVersionToIntervals = new HashMap<>();
final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new HashMap<>();
for (DataSegment segment : overlappingSegments) {
committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
.add(segment.getInterval());
committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>())
.add(segment);
}
final Set<DataSegment> upgradedSegments = new HashSet<>();
for (Map.Entry<String, Set<Interval>> entry : committedVersionToIntervals.entrySet()) {
final String upgradeVersion = entry.getKey();
Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan(
upgradeVersion,
entry.getValue(),
appendVersionToSegments
);
for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : segmentsToUpgrade.entrySet()) {
Set<DataSegment> segmentsUpgradedToVersion = upgradeSegmentsToVersion(
handle,
upgradeVersion,
upgradeEntry.getKey(),
upgradeEntry.getValue(),
committedIntervalToSegments
);
log.info("Upgraded [%d] segments to version[%s].", segmentsUpgradedToVersion.size(), upgradeVersion);
upgradedSegments.addAll(segmentsUpgradedToVersion);
}
}
return upgradedSegments;
}
/**
* Creates a Map from eligible interval to Set of segments that are fully
* contained in that interval and have a version strictly lower than {@code #cutoffVersion}.
*/
private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(
String cutoffVersion,
Set<Interval> eligibleIntervals,
TreeMap<String, Set<DataSegment>> versionToSegments
)
{
final Set<DataSegment> eligibleSegments
= versionToSegments.headMap(cutoffVersion).values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());
final Map<Interval, Set<DataSegment>> eligibleIntervalToSegments = new HashMap<>();
for (DataSegment segment : eligibleSegments) {
final Interval segmentInterval = segment.getInterval();
for (Interval eligibleInterval : eligibleIntervals) {
if (eligibleInterval.contains(segmentInterval)) {
eligibleIntervalToSegments.computeIfAbsent(eligibleInterval, itvl -> new HashSet<>())
.add(segment);
break;
} else if (eligibleInterval.overlaps(segmentInterval)) {
// Committed interval overlaps only partially
throw new ISE(
"Committed interval[%s] conflicts with interval[%s] of append segment[%s].",
eligibleInterval, segmentInterval, segment.getId()
);
}
}
}
return eligibleIntervalToSegments;
}
/**
* Computes new Segment IDs for the {@code segmentsToUpgrade} being upgraded
* to the given {@code upgradeVersion}.
*/
private Set<DataSegment> upgradeSegmentsToVersion(
Handle handle,
String upgradeVersion,
Interval interval,
Set<DataSegment> segmentsToUpgrade,
Map<Interval, Set<DataSegment>> committedSegmentsByInterval
) throws IOException
{
final Set<DataSegment> committedSegments
= committedSegmentsByInterval.getOrDefault(interval, Collections.emptySet())
.stream()
.filter(s -> s.getVersion().equals(upgradeVersion))
.collect(Collectors.toSet());
SegmentIdWithShardSpec committedMaxId = null;
for (DataSegment committedSegment : committedSegments) {
if (committedMaxId == null
|| committedMaxId.getShardSpec().getPartitionNum() < committedSegment.getShardSpec().getPartitionNum()) {
committedMaxId = SegmentIdWithShardSpec.fromDataSegment(committedSegment);
}
}
// Get pending segments for the new version, if any
final String dataSource = segmentsToUpgrade.iterator().next().getDataSource();
final Set<SegmentIdWithShardSpec> pendingSegments
= getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
// Determine new IDs for each append segment by taking into account both
// committed and pending segments for this version
final Set<DataSegment> upgradedSegments = new HashSet<>();
for (DataSegment segment : segmentsToUpgrade) {
SegmentCreateRequest request = new SegmentCreateRequest(
segment.getId() + "__" + upgradeVersion,
null,
upgradeVersion,
NumberedPartialShardSpec.instance()
);
// allocate new segment id
final SegmentIdWithShardSpec newId = createNewSegment(
request,
dataSource,
interval,
upgradeVersion,
committedMaxId,
pendingSegments
);
// Add to set of pending segments so that shard specs are computed taking the new id into account
pendingSegments.add(newId);
upgradedSegments.add(
DataSegment.builder(segment)
.interval(newId.getInterval())
.version(newId.getVersion())
.shardSpec(newId.getShardSpec())
.build()
);
}
return upgradedSegments;
}
private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
Handle handle,
String dataSource,
@ -1138,8 +1375,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
} else if (!overallMaxId.getInterval().equals(interval)
|| overallMaxId.getVersion().compareTo(existingVersion) > 0) {
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
dataSource,
@ -1297,8 +1534,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
} else if (!overallMaxId.getInterval().equals(interval)
|| overallMaxId.getVersion().compareTo(existingVersion) > 0) {
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
dataSource,
@ -1366,12 +1602,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
/**
* Attempts to insert a single segment to the database. If the segment already exists, will do nothing; although,
* this checking is imperfect and callers must be prepared to retry their entire transaction on exceptions.
*
* @return DataSegment set inserted
*/
private Set<DataSegment> announceHistoricalSegmentBatch(
final Handle handle,
final Set<DataSegment> segments,
@ -1396,18 +1626,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE
);
PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload, used_status_last_updated) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload, :used_status_last_updated)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
);
PreparedBatch preparedBatch = handle.prepareBatch(buildSqlToInsertSegments());
for (List<DataSegment> partition : partitionedSegments) {
for (DataSegment segment : partition) {
String now = DateTimes.nowUtc().toString();
final String now = DateTimes.nowUtc().toString();
preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
@ -1444,6 +1666,296 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return toInsertSegments;
}
private Set<DataSegment> getSegmentsToUpgradeOnReplace(
final Handle handle,
final Set<DataSegment> replaceSegments,
final Set<ReplaceTaskLock> locksHeldByReplaceTask
)
{
// If a REPLACE task has locked an interval, it would commit some segments
// (or at least tombstones) in that interval (except in LEGACY_REPLACE ingestion mode)
if (replaceSegments.isEmpty() || locksHeldByReplaceTask.isEmpty()) {
return Collections.emptySet();
}
// For each replace interval, find the number of core partitions and total partitions
final Map<Interval, Integer> intervalToNumCorePartitions = new HashMap<>();
final Map<Interval, Integer> intervalToCurrentPartitionNum = new HashMap<>();
for (DataSegment segment : replaceSegments) {
intervalToNumCorePartitions.put(segment.getInterval(), segment.getShardSpec().getNumCorePartitions());
int partitionNum = segment.getShardSpec().getPartitionNum();
intervalToCurrentPartitionNum.compute(
segment.getInterval(),
(i, value) -> value == null ? partitionNum : Math.max(value, partitionNum)
);
}
// Find the segments that need to be upgraded
final String taskId = locksHeldByReplaceTask.stream()
.map(ReplaceTaskLock::getSupervisorTaskId)
.findFirst().orElse(null);
final Map<String, String> upgradeSegmentToLockVersion
= getAppendSegmentsCommittedDuringTask(handle, taskId);
final List<DataSegment> segmentsToUpgrade
= retrieveSegmentsById(handle, upgradeSegmentToLockVersion.keySet());
if (segmentsToUpgrade.isEmpty()) {
return Collections.emptySet();
}
final Set<Interval> replaceIntervals = intervalToNumCorePartitions.keySet();
final Set<DataSegment> upgradedSegments = new HashSet<>();
for (DataSegment oldSegment : segmentsToUpgrade) {
// Determine interval of the upgraded segment
final Interval oldInterval = oldSegment.getInterval();
Interval newInterval = null;
for (Interval replaceInterval : replaceIntervals) {
if (replaceInterval.contains(oldInterval)) {
newInterval = replaceInterval;
break;
} else if (replaceInterval.overlaps(oldInterval)) {
throw new ISE(
"Incompatible segment intervals for commit: [%s] and [%s].",
oldInterval, replaceInterval
);
}
}
if (newInterval == null) {
// This can happen only if no replace interval contains this segment
// but a (revoked) REPLACE lock covers this segment
newInterval = oldInterval;
}
// Compute shard spec of the upgraded segment
final int partitionNum = intervalToCurrentPartitionNum.compute(
newInterval,
(i, value) -> value == null ? 0 : value + 1
);
final int numCorePartitions = intervalToNumCorePartitions.get(newInterval);
ShardSpec shardSpec = new NumberedShardSpec(partitionNum, numCorePartitions);
// Create upgraded segment with the correct interval, version and shard spec
String lockVersion = upgradeSegmentToLockVersion.get(oldSegment.getId().toString());
upgradedSegments.add(
DataSegment.builder(oldSegment)
.interval(newInterval)
.version(lockVersion)
.shardSpec(shardSpec)
.build()
);
}
return upgradedSegments;
}
/**
* Verifies that:
* <ul>
* <li>The set of segments being committed is non-empty.</li>
* <li>All segments belong to the same datasource.</li>
* </ul>
*/
private void verifySegmentsToCommit(Collection<DataSegment> segments)
{
if (segments.isEmpty()) {
throw new IllegalArgumentException("No segment to commit");
}
final String dataSource = segments.iterator().next().getDataSource();
for (DataSegment segment : segments) {
if (!dataSource.equals(segment.getDataSource())) {
throw new IllegalArgumentException("Segments to commit must all belong to the same datasource");
}
}
}
/**
* Inserts the given segments into the DB in batches of size
* {@link #MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE} and returns the set of
* segments actually inserted.
* <p>
* This method avoids inserting segment IDs which already exist in the DB.
* Callers of this method might need to retry as INSERT followed by SELECT
* might fail due to race conditions.
*/
private Set<DataSegment> insertSegments(Handle handle, Set<DataSegment> segments)
throws IOException
{
// Do not insert segment IDs which already exist
Set<String> existingSegmentIds = segmentExistsBatch(handle, segments);
final Set<DataSegment> segmentsToInsert = segments.stream().filter(
s -> !existingSegmentIds.contains(s.getId().toString())
).collect(Collectors.toSet());
// Insert the segments in batches of manageable size
final List<List<DataSegment>> partitionedSegments = Lists.partition(
new ArrayList<>(segmentsToInsert),
MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE
);
final PreparedBatch batch = handle.prepareBatch(buildSqlToInsertSegments());
for (List<DataSegment> partition : partitionedSegments) {
for (DataSegment segment : partition) {
final String now = DateTimes.nowUtc().toString();
batch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", now)
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("used_status_last_updated", now);
}
final int[] affectedRows = batch.execute();
final List<DataSegment> failedInserts = new ArrayList<>();
for (int i = 0; i < partition.size(); ++i) {
if (affectedRows[i] != 1) {
failedInserts.add(partition.get(i));
}
}
if (failedInserts.isEmpty()) {
log.infoSegments(partition, "Published segments to DB");
} else {
throw new ISE(
"Failed to publish segments to DB: %s",
SegmentUtils.commaSeparatedIdentifiers(failedInserts)
);
}
}
return segmentsToInsert;
}
/**
* Inserts entries into the upgrade_segments table in batches of size
* {@link #MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}.
*/
private void insertIntoUpgradeSegmentsTable(
Handle handle,
Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
)
{
if (segmentToReplaceLock.isEmpty()) {
return;
}
final PreparedBatch batch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (task_id, segment_id, lock_version)"
+ " VALUES (:task_id, :segment_id, :lock_version)",
dbTables.getUpgradeSegmentsTable()
)
);
final List<List<Map.Entry<DataSegment, ReplaceTaskLock>>> partitions = Lists.partition(
new ArrayList<>(segmentToReplaceLock.entrySet()),
MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE
);
for (List<Map.Entry<DataSegment, ReplaceTaskLock>> partition : partitions) {
for (Map.Entry<DataSegment, ReplaceTaskLock> entry : partition) {
DataSegment segment = entry.getKey();
ReplaceTaskLock lock = entry.getValue();
batch.add()
.bind("task_id", lock.getSupervisorTaskId())
.bind("segment_id", segment.getId().toString())
.bind("lock_version", lock.getVersion());
}
final int[] affectedAppendRows = batch.execute();
final List<DataSegment> failedInserts = new ArrayList<>();
for (int i = 0; i < partition.size(); ++i) {
if (affectedAppendRows[i] != 1) {
failedInserts.add(partition.get(i).getKey());
}
}
if (failedInserts.size() > 0) {
throw new ISE(
"Failed to insert upgrade segments in DB: %s",
SegmentUtils.commaSeparatedIdentifiers(failedInserts)
);
}
}
}
private List<DataSegment> retrieveSegmentsById(Handle handle, Set<String> segmentIds)
{
if (segmentIds.isEmpty()) {
return Collections.emptyList();
}
final String segmentIdCsv = segmentIds.stream()
.map(id -> "'" + id + "'")
.collect(Collectors.joining(","));
ResultIterator<DataSegment> resultIterator = handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE id in (%s)",
dbTables.getSegmentsTable(), segmentIdCsv
)
)
.setFetchSize(connector.getStreamingFetchSize())
.map(
(index, r, ctx) ->
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class)
)
.iterator();
return Lists.newArrayList(resultIterator);
}
private String buildSqlToInsertSegments()
{
return StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s,"
+ " partitioned, version, used, payload, used_status_last_updated) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end,"
+ " :partitioned, :version, :used, :payload, :used_status_last_updated)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
);
}
/**
* Finds the append segments that were covered by the given task REPLACE locks.
* These append segments must now be upgraded to the same version as the segments
* being committed by this replace task.
*
* @return Map from append Segment ID to REPLACE lock version
*/
private Map<String, String> getAppendSegmentsCommittedDuringTask(
Handle handle,
String taskId
)
{
final String sql = StringUtils.format(
"SELECT segment_id, lock_version FROM %1$s WHERE task_id = :task_id",
dbTables.getUpgradeSegmentsTable()
);
ResultIterator<Pair<String, String>> resultIterator = handle
.createQuery(sql)
.bind("task_id", taskId)
.map(
(index, r, ctx) -> Pair.of(r.getString("segment_id"), r.getString("lock_version"))
)
.iterator();
final Map<String, String> segmentIdToLockVersion = new HashMap<>();
while (resultIterator.hasNext()) {
Pair<String, String> result = resultIterator.next();
segmentIdToLockVersion.put(result.lhs, result.rhs);
}
return segmentIdToLockVersion;
}
private Set<String> segmentExistsBatch(final Handle handle, final Set<DataSegment> segments)
{
Set<String> existedSegments = new HashSet<>();
@ -1512,8 +2024,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
*
* @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or
* TRY_AGAIN if it definitely was not updated. This guarantee is meant to help
* {@link #commitSegmentsAndMetadata(Set, DataSourceMetadata, DataSourceMetadata)}
* achieve its own guarantee.
* {@link #commitSegmentsAndMetadata} achieve its own guarantee.
*
* @throws RuntimeException if state is unknown after this call
*/

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import org.joda.time.Interval;
import java.util.Objects;
/**
* Details of a REPLACE lock held by a batch supervisor task.
* <p>
* Replace locks are always held by the supervisor task, i.e. ParallelIndexSupervisorTask
* in case of native batch ingestion and ControllerTask in case of MSQ ingestion.
*/
public class ReplaceTaskLock
{
private final String supervisorTaskId;
private final Interval interval;
private final String version;
public ReplaceTaskLock(String supervisorTaskId, Interval interval, String version)
{
this.supervisorTaskId = supervisorTaskId;
this.interval = interval;
this.version = version;
}
public String getSupervisorTaskId()
{
return supervisorTaskId;
}
public Interval getInterval()
{
return interval;
}
public String getVersion()
{
return version;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReplaceTaskLock that = (ReplaceTaskLock) o;
return Objects.equals(supervisorTaskId, that.supervisorTaskId)
&& Objects.equals(interval, that.interval)
&& Objects.equals(version, that.version);
}
@Override
public int hashCode()
{
return Objects.hash(supervisorTaskId, interval, version);
}
}

View File

@ -105,13 +105,18 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
}
/**
* Auto-incrementing SQL type to use for IDs
* Must be an integer type, which values will be automatically set by the database
* <p/>
* The resulting string will be interpolated into the table creation statement, e.g.
* <code>CREATE TABLE druid_table ( id <type> NOT NULL, ... )</code>
* Auto-incrementing integer SQL type to use for IDs.
* The returned string is interpolated into the table creation statement as follows:
* <pre>
* CREATE TABLE druid_table (
* id &lt;serial-type&gt; NOT NULL,
* col_2 VARCHAR(255) NOT NULL,
* col_3 VARCHAR(255) NOT NULL
* ...
* )
* </pre>
*
* @return String representing the SQL type and auto-increment statement
* @return String representing auto-incrementing SQL integer type to use for IDs.
*/
public abstract String getSerialType();
@ -335,6 +340,29 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
);
}
private void createUpgradeSegmentsTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " task_id VARCHAR(255) NOT NULL,\n"
+ " segment_id VARCHAR(255) NOT NULL,\n"
+ " lock_version VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getSerialType()
),
StringUtils.format(
"CREATE INDEX idx_%1$s_task ON %1$s(task_id)",
tableName
)
)
);
}
public void createRulesTable(final String tableName)
{
createTable(
@ -664,6 +692,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
validateSegmentsTable();
}
@Override
public void createUpgradeSegmentsTable()
{
if (config.get().isCreateTables()) {
createUpgradeSegmentsTable(tablesConfigSupplier.get().getUpgradeSegmentsTable());
}
}
@Override
public void createRulesTable()
{

View File

@ -17,7 +17,6 @@
* under the License.
*/
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -25,7 +24,6 @@ import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManagerProvider
{
private final ObjectMapper jsonMapper;
@ -60,6 +58,7 @@ public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManag
public void start()
{
connector.createSegmentTable();
connector.createUpgradeSegmentsTable();
}
@Override

View File

@ -33,7 +33,9 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
@ -48,6 +50,7 @@ import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
@ -57,9 +60,9 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.util.StringMapper;
import java.io.IOException;
@ -68,8 +71,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -81,9 +86,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
private final DataSegment defaultSegment = new DataSegment(
@ -330,6 +332,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
derbyConnector.createDataSourceTable();
derbyConnector.createTaskTables();
derbyConnector.createSegmentTable();
derbyConnector.createUpgradeSegmentsTable();
derbyConnector.createPendingSegmentsTable();
metadataUpdateCounter.set(0);
segmentTableDropUpdateCounter.set(0);
@ -403,6 +406,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
private List<DataSegment> retrieveUsedSegments()
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
return derbyConnector.retryWithHandle(
handle -> handle.createQuery("SELECT payload FROM " + table + " WHERE used = true ORDER BY id")
.map((index, result, context) -> JacksonUtils.readValue(mapper, result.getBytes(1), DataSegment.class))
.list()
);
}
private List<String> retrieveUnusedSegmentIds()
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
@ -451,6 +464,208 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
private Map<String, String> getSegmentsCommittedDuringReplaceTask(String taskId)
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable();
return derbyConnector.retryWithHandle(handle -> {
final String sql = StringUtils.format(
"SELECT segment_id, lock_version FROM %1$s WHERE task_id = :task_id",
table
);
ResultIterator<Pair<String, String>> resultIterator = handle
.createQuery(sql)
.bind("task_id", taskId)
.map(
(index, r, ctx) -> Pair.of(r.getString("segment_id"), r.getString("lock_version"))
)
.iterator();
final Map<String, String> segmentIdToLockVersion = new HashMap<>();
while (resultIterator.hasNext()) {
Pair<String, String> result = resultIterator.next();
segmentIdToLockVersion.put(result.lhs, result.rhs);
}
return segmentIdToLockVersion;
});
}
private void insertIntoUpgradeSegmentsTable(Map<DataSegment, ReplaceTaskLock> segmentToTaskLockMap)
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable();
derbyConnector.retryWithHandle(
handle -> {
PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
StringUtils.format(
"INSERT INTO %1$s (task_id, segment_id, lock_version) "
+ "VALUES (:task_id, :segment_id, :lock_version)",
table
)
)
);
for (Map.Entry<DataSegment, ReplaceTaskLock> entry : segmentToTaskLockMap.entrySet()) {
final DataSegment segment = entry.getKey();
final ReplaceTaskLock lock = entry.getValue();
preparedBatch.add()
.bind("task_id", lock.getSupervisorTaskId())
.bind("segment_id", segment.getId().toString())
.bind("lock_version", lock.getVersion());
}
final int[] affectedRows = preparedBatch.execute();
final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
if (!succeeded) {
throw new ISE("Failed to insert upgrade segments in DB");
}
return true;
}
);
}
@Test
public void testCommitAppendSegments()
{
final String v1 = "2023-01-01";
final String v2 = "2023-01-02";
final String v3 = "2023-01-03";
final String lockVersion = "2024-01-01";
final String replaceTaskId = "replaceTask1";
final ReplaceTaskLock replaceLock = new ReplaceTaskLock(
replaceTaskId,
Intervals.of("2023-01-01/2023-01-03"),
lockVersion
);
final Set<DataSegment> appendSegments = new HashSet<>();
final Set<DataSegment> expectedSegmentsToUpgrade = new HashSet<>();
for (int i = 0; i < 10; i++) {
final DataSegment segment = createSegment(
Intervals.of("2023-01-01/2023-01-02"),
v1,
new LinearShardSpec(i)
);
appendSegments.add(segment);
expectedSegmentsToUpgrade.add(segment);
}
for (int i = 0; i < 10; i++) {
final DataSegment segment = createSegment(
Intervals.of("2023-01-02/2023-01-03"),
v2,
new LinearShardSpec(i)
);
appendSegments.add(segment);
expectedSegmentsToUpgrade.add(segment);
}
for (int i = 0; i < 10; i++) {
final DataSegment segment = createSegment(
Intervals.of("2023-01-03/2023-01-04"),
v3,
new LinearShardSpec(i)
);
appendSegments.add(segment);
}
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= expectedSegmentsToUpgrade.stream()
.collect(Collectors.toMap(s -> s, s -> replaceLock));
// Commit the segment and verify the results
SegmentPublishResult commitResult
= coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock);
Assert.assertTrue(commitResult.isSuccess());
Assert.assertEquals(appendSegments, commitResult.getSegments());
// Verify the segments present in the metadata store
Assert.assertEquals(
appendSegments,
ImmutableSet.copyOf(retrieveUsedSegments())
);
// Verify entries in the segment task lock table
final Set<String> expectedUpgradeSegmentIds
= expectedSegmentsToUpgrade.stream()
.map(s -> s.getId().toString())
.collect(Collectors.toSet());
final Map<String, String> observedSegmentToLock = getSegmentsCommittedDuringReplaceTask(replaceTaskId);
Assert.assertEquals(expectedUpgradeSegmentIds, observedSegmentToLock.keySet());
final Set<String> observedLockVersions = new HashSet<>(observedSegmentToLock.values());
Assert.assertEquals(1, observedLockVersions.size());
Assert.assertEquals(replaceLock.getVersion(), Iterables.getOnlyElement(observedLockVersions));
}
@Test
public void testCommitReplaceSegments()
{
final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap = new HashMap<>();
for (int i = 1; i < 9; i++) {
final DataSegment segment = new DataSegment(
"foo",
Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)),
"2023-01-0" + i,
ImmutableMap.of("path", "a-" + i),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
segmentsAppendedWithReplaceLock.add(segment);
appendedSegmentToReplaceLockMap.put(segment, replaceLock);
}
insertUsedSegments(segmentsAppendedWithReplaceLock);
insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap);
final Set<DataSegment> replacingSegments = new HashSet<>();
for (int i = 1; i < 9; i++) {
final DataSegment segment = new DataSegment(
"foo",
Intervals.of("2023-01-01/2023-02-01"),
"2023-02-01",
ImmutableMap.of("path", "b-" + i),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new NumberedShardSpec(i, 9),
9,
100
);
replacingSegments.add(segment);
}
coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock));
Assert.assertEquals(
2L * segmentsAppendedWithReplaceLock.size() + replacingSegments.size(),
retrieveUsedSegmentIds().size()
);
final Set<DataSegment> usedSegments = new HashSet<>(retrieveUsedSegments());
Assert.assertTrue(usedSegments.containsAll(segmentsAppendedWithReplaceLock));
usedSegments.removeAll(segmentsAppendedWithReplaceLock);
Assert.assertTrue(usedSegments.containsAll(replacingSegments));
usedSegments.removeAll(replacingSegments);
Assert.assertEquals(segmentsAppendedWithReplaceLock.size(), usedSegments.size());
for (DataSegment segmentReplicaWithNewVersion : usedSegments) {
boolean hasBeenCarriedForward = false;
for (DataSegment appendedSegment : segmentsAppendedWithReplaceLock) {
if (appendedSegment.getLoadSpec().equals(segmentReplicaWithNewVersion.getLoadSpec())) {
hasBeenCarriedForward = true;
break;
}
}
Assert.assertTrue(hasBeenCarriedForward);
}
}
@Test
public void testSimpleAnnounce() throws IOException
{
@ -2338,4 +2553,20 @@ public class IndexerSQLMetadataStorageCoordinatorTest
)
);
}
private static class DS
{
static final String WIKI = "wiki";
}
private DataSegment createSegment(Interval interval, String version, ShardSpec shardSpec)
{
return DataSegment.builder()
.dataSource(DS.WIKI)
.interval(interval)
.version(version)
.shardSpec(shardSpec)
.size(100)
.build();
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class ReplaceTaskLockTest
{
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(ReplaceTaskLock.class)
.usingGetClass()
.withNonnullFields("supervisorTaskId", "interval", "version")
.verify();
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.util.Locale;
public class SqlSegmentsMetadataManagerProviderTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule
= new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
@Test
public void testLifecycleStartCreatesSegmentTables() throws Exception
{
final TestDerbyConnector connector = derbyConnectorRule.getConnector();
final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
final Lifecycle lifecycle = new Lifecycle();
SqlSegmentsMetadataManagerProvider provider = new SqlSegmentsMetadataManagerProvider(
jsonMapper,
Suppliers.ofInstance(config),
derbyConnectorRule.metadataTablesConfigSupplier(),
connector,
lifecycle
);
SegmentsMetadataManager manager = provider.get();
Assert.assertTrue(manager instanceof SqlSegmentsMetadataManager);
final MetadataStorageTablesConfig storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get();
final String segmentsTable = storageConfig.getSegmentsTable();
final String upgradeSegmentsTable = storageConfig.getUpgradeSegmentsTable();
// Verify that the tables do not exist yet
Assert.assertFalse(tableExists(segmentsTable, connector));
Assert.assertFalse(tableExists(upgradeSegmentsTable, connector));
lifecycle.start();
// Verify that tables have now been created
Assert.assertTrue(tableExists(segmentsTable, connector));
Assert.assertTrue(tableExists(upgradeSegmentsTable, connector));
lifecycle.stop();
}
private boolean tableExists(String tableName, TestDerbyConnector connector)
{
return connector.retryWithHandle(
handle -> connector.tableExists(handle, tableName.toUpperCase(Locale.ENGLISH))
);
}
}

View File

@ -222,6 +222,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -124,6 +124,7 @@ public class CreateTables extends GuiceRunnable
dbConnector.createDataSourceTable();
dbConnector.createPendingSegmentsTable();
dbConnector.createSegmentTable();
dbConnector.createUpgradeSegmentsTable();
dbConnector.createRulesTable();
dbConnector.createConfigTable();
dbConnector.createTaskTables();

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.inject.Injector;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Locale;
public class CreateTablesTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule
= new TestDerbyConnector.DerbyConnectorRule();
private TestDerbyConnector connector;
@Before
public void setup()
{
this.connector = derbyConnectorRule.getConnector();
}
@Test
public void testRunCreatesAllTables()
{
final MetadataStorageTablesConfig config = derbyConnectorRule.metadataTablesConfigSupplier().get();
Assert.assertNotNull(config);
// Verify that tables do not exist before starting
Assert.assertFalse(tableExists(config.getDataSourceTable()));
Assert.assertFalse(tableExists(config.getSegmentsTable()));
Assert.assertFalse(tableExists(config.getPendingSegmentsTable()));
Assert.assertFalse(tableExists(config.getUpgradeSegmentsTable()));
Assert.assertFalse(tableExists(config.getConfigTable()));
Assert.assertFalse(tableExists(config.getRulesTable()));
Assert.assertFalse(tableExists(config.getAuditTable()));
Assert.assertFalse(tableExists(config.getSupervisorTable()));
Assert.assertFalse(tableExists(config.getTaskLockTable()));
// Run CreateTables
CreateTables createTables = new CreateTables()
{
@Override
public Injector makeInjector()
{
Injector injector = Mockito.mock(Injector.class);
Mockito.when(injector.getInstance(MetadataStorageConnector.class)).thenReturn(connector);
return injector;
}
};
createTables.run();
// Verify that tables have now been created
Assert.assertTrue(tableExists(config.getDataSourceTable()));
Assert.assertTrue(tableExists(config.getSegmentsTable()));
Assert.assertTrue(tableExists(config.getPendingSegmentsTable()));
Assert.assertTrue(tableExists(config.getUpgradeSegmentsTable()));
Assert.assertTrue(tableExists(config.getConfigTable()));
Assert.assertTrue(tableExists(config.getRulesTable()));
Assert.assertTrue(tableExists(config.getAuditTable()));
Assert.assertTrue(tableExists(config.getSupervisorTable()));
Assert.assertTrue(tableExists(config.getTaskLockTable()));
}
private boolean tableExists(String tableName)
{
return connector.retryWithHandle(
handle -> connector.tableExists(handle, tableName.toUpperCase(Locale.ENGLISH))
);
}
}