Associate pending segments with the tasks that requested them (#16144)

Changes:
- Add column `task_allocator_id` to `pendingSegments` metadata table.
- Add column `upgraded_from_segment_id` to `pendingSegments` metadata table.
- Add interface `PendingSegmentAllocatingTask` and implement it by all tasks which
can allocate pending segments.
- Use `taskAllocatorId` to identify the task (and its sub-tasks or replicas) to which
a pending segment has been allocated.
- Perform active cleanup of pending segments in `TaskLockbox` once there are no
active tasks for the corresponding task allocator id.
- When committing APPEND segments, also commit all upgraded pending segments
corresponding to that task allocator id.
- When committing REPLACE segments, upgrade all overlapping pending segments in
the same transaction.
This commit is contained in:
AmatyaAvadhanula 2024-04-17 09:06:31 +05:30 committed by GitHub
parent a5428e75ff
commit f3d69f30e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1779 additions and 576 deletions

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -71,7 +72,7 @@ import java.util.Optional;
import java.util.Set;
@JsonTypeName(MSQControllerTask.TYPE)
public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, PendingSegmentAllocatingTask
{
public static final String TYPE = "query_controller";
public static final String DUMMY_DATASOURCE_FOR_SELECT = "__query_select";
@ -157,6 +158,12 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
return ImmutableSet.of();
}
@Override
public String getTaskAllocatorId()
{
return getId();
}
@JsonProperty("spec")
public MSQSpec getQuerySpec()
{

View File

@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
@ -45,7 +46,7 @@ import java.util.Objects;
import java.util.Set;
@JsonTypeName(MSQWorkerTask.TYPE)
public class MSQWorkerTask extends AbstractTask
public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask
{
public static final String TYPE = "query_worker";
@ -125,6 +126,12 @@ public class MSQWorkerTask extends AbstractTask
return ImmutableSet.of();
}
@Override
public String getTaskAllocatorId()
{
return getControllerTaskId();
}
@Override
public boolean isReady(final TaskActionClient taskActionClient)

View File

@ -35,7 +35,7 @@ import java.util.Collections;
public class MSQControllerTaskTest
{
MSQSpec MSQ_SPEC = MSQSpec
private final MSQSpec MSQ_SPEC = MSQSpec
.builder()
.destination(new DataSourceMSQDestination(
"target",
@ -59,7 +59,7 @@ public class MSQControllerTaskTest
@Test
public void testGetInputSourceResources()
{
MSQControllerTask msqWorkerTask = new MSQControllerTask(
MSQControllerTask controllerTask = new MSQControllerTask(
null,
MSQ_SPEC,
null,
@ -67,7 +67,25 @@ public class MSQControllerTaskTest
null,
null,
null,
null);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
null
);
Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
}
@Test
public void testGetTaskAllocatorId()
{
final String taskId = "taskId";
MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId());
}
}

View File

@ -47,7 +47,6 @@ public class MSQWorkerTaskTest
@Test
public void testEquals()
{
Assert.assertNotEquals(msqWorkerTask, 0);
Assert.assertEquals(msqWorkerTask, msqWorkerTask);
Assert.assertEquals(
msqWorkerTask,
@ -110,4 +109,11 @@ public class MSQWorkerTaskTest
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}
@Test
public void testGetTaskAllocatorId()
{
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId());
}
}

View File

@ -23,8 +23,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.LockRequestForNewSegment;
@ -210,6 +212,12 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
final TaskActionToolbox toolbox
)
{
if (!(task instanceof PendingSegmentAllocatingTask)) {
throw DruidException.defensive(
"Task[%s] of type[%s] cannot allocate segments as it does not implement PendingSegmentAllocatingTask.",
task.getId(), task.getType()
);
}
int attempt = 0;
while (true) {
attempt++;

View File

@ -22,10 +22,12 @@ package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@ -41,8 +43,20 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
*
* Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*
* <pre>
* Pseudo code (for a single interval):
* For an append lock held over an interval:
* transaction {
* commit input segments contained within interval
* if there is an active replace lock over the interval:
* add an entry for the inputSegment corresponding to the replace lock's task in the upgradeSegments table
* fetch pending segments with parent contained within the input segments, and commit them
* }
* </pre>
*/
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
@ -114,6 +128,13 @@ public class SegmentTransactionalAppendAction implements TaskAction<SegmentPubli
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
if (!(task instanceof PendingSegmentAllocatingTask)) {
throw DruidException.defensive(
"Task[%s] of type[%s] cannot append segments as it does not implement PendingSegmentAllocatingTask.",
task.getId(),
task.getType()
);
}
// Verify that all the locks are of expected type
final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
for (TaskLock lock : locks) {
@ -132,17 +153,20 @@ public class SegmentTransactionalAppendAction implements TaskAction<SegmentPubli
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);
final CriticalAction.Action<SegmentPublishResult> publishAction;
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
segmentToReplaceLock,
taskAllocatorId
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
endMetadata,
taskAllocatorId
);
}

View File

@ -30,11 +30,14 @@ import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -42,6 +45,20 @@ import java.util.stream.Collectors;
/**
* Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*
* <pre>
* Pseudo code (for a single interval)
*- For a replace lock held over an interval:
* transaction {
* commit input segments contained within interval
* upgrade ids in the upgradeSegments table corresponding to this task to the replace lock's version and commit them
* fetch payload, task_allocator_id for pending segments
* upgrade each such pending segment to the replace lock's version with the corresponding root segment
* }
* For every pending segment with version == replace lock version:
* Fetch payload, group_id or the pending segment and relay them to the supervisor
* The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries
* </pre>
*/
public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
{
@ -123,7 +140,7 @@ public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPubl
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
tryUpgradeOverlappingPendingSegments(task, toolbox);
registerUpgradedPendingSegmentsOnSupervisor(task, toolbox);
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
@ -134,34 +151,55 @@ public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPubl
}
/**
* Tries to upgrade any pending segments that overlap with the committed segments.
* Registers upgraded pending segments on the active supervisor, if any
*/
private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}
final Set<String> activeRealtimeSequencePrefixes
= supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator()
.upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);
final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
.getTaskLockbox()
.getAllReplaceLocksForDatasource(task.getDataSource())
.stream()
.filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
.collect(Collectors.toSet());
upgradedPendingSegments.forEach(
(oldId, newId) -> toolbox.getSupervisorManager()
.registerNewVersionOfPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
)
Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
pendingSegments.addAll(
toolbox.getIndexerMetadataStorageCoordinator()
.getPendingSegments(task.getDataSource(), replaceLock.getInterval())
);
}
Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
pendingSegment.getId().asSegmentId().toString(),
pendingSegment.getId()
));
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
pendingSegments.forEach(pendingSegment -> {
if (pendingSegment.getUpgradedFromSegmentId() != null
&& !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) {
segmentToParent.put(
pendingSegment.getId(),
idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId())
);
}
});
segmentToParent.forEach(
(newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
)
);
}

View File

@ -119,7 +119,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Deprecated
public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ChatHandler
public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
implements ChatHandler, PendingSegmentAllocatingTask
{
private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
@ -259,6 +260,12 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
return true;
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Override
public TaskStatus runTask(final TaskToolbox toolbox)
{

View File

@ -127,7 +127,7 @@ import java.util.stream.IntStream;
* serialization fields of this class must correspond to those of {@link
* ClientCompactionTaskQuery}.
*/
public class CompactionTask extends AbstractBatchIndexTask
public class CompactionTask extends AbstractBatchIndexTask implements PendingSegmentAllocatingTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final Clock UTC_CLOCK = Clock.systemUTC();
@ -400,6 +400,12 @@ public class CompactionTask extends AbstractBatchIndexTask
return TYPE;
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Nonnull
@JsonIgnore
@Override

View File

@ -137,7 +137,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public class IndexTask extends AbstractBatchIndexTask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128();
@ -302,6 +302,12 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Nonnull
@JsonIgnore
@Override

View File

@ -39,7 +39,7 @@ import java.util.UUID;
/**
*/
public class NoopTask extends AbstractTask
public class NoopTask extends AbstractTask implements PendingSegmentAllocatingTask
{
private static final int DEFAULT_RUN_TIME = 2500;
@ -111,6 +111,12 @@ public class NoopTask extends AbstractTask
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override
public String getTaskAllocatorId()
{
return getId();
}
public static NoopTask create()
{
return forDatasource(null);

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
/**
* An interface to be implemented by every appending task that allocates pending segments.
*/
public interface PendingSegmentAllocatingTask
{
/**
* Unique string used by an appending task (or its sub-tasks and replicas) to allocate pending segments
* and identify pending segments allocated to it.
*/
String getTaskAllocatorId();
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
@ -131,7 +132,8 @@ import java.util.stream.Collectors;
*
* @see ParallelIndexTaskRunner
*/
public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implements ChatHandler
public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
implements ChatHandler, PendingSegmentAllocatingTask
{
public static final String TYPE = "index_parallel";
@ -476,6 +478,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
);
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Nullable
@Override
public Granularity getSegmentGranularity()

View File

@ -43,6 +43,7 @@ import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
@ -105,7 +106,7 @@ import java.util.stream.Collectors;
* generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of
* publishing on its own.
*/
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";
@ -236,6 +237,12 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
return subtaskSpecId;
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Override
public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
{

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
@ -38,6 +39,7 @@ import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
@ -99,10 +101,20 @@ public class TaskLockbox
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
// Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
// this set should be accessed under the giant lock.
/**
* Set of active tasks. Locks can be granted only to a task present in this set.
* Should be accessed only under the giant lock.
*/
private final Set<String> activeTasks = new HashSet<>();
/**
* Map from a taskAllocatorId to the set of active taskIds using that allocator id.
* Used to clean up pending segments for a taskAllocatorId as soon as the set
* of corresponding active taskIds becomes empty.
*/
@GuardedBy("giant")
private final Map<String, Set<String>> activeAllocatorIdToTaskIds = new HashMap<>();
@Inject
public TaskLockbox(
TaskStorage taskStorage,
@ -213,6 +225,12 @@ public class TaskLockbox
activeTasks.remove(task.getId());
}
}
activeAllocatorIdToTaskIds.clear();
for (Task task : storedActiveTasks) {
if (activeTasks.contains(task.getId())) {
trackAppendingTask(task);
}
}
log.info(
"Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
@ -387,7 +405,7 @@ public class TaskLockbox
if (request instanceof LockRequestForNewSegment) {
final LockRequestForNewSegment lockRequestForNewSegment = (LockRequestForNewSegment) request;
if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) {
newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion());
newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion(), null);
if (newSegmentId == null) {
return LockResult.fail();
}
@ -411,7 +429,12 @@ public class TaskLockbox
newSegmentId
);
}
newSegmentId = allocateSegmentId(lockRequestForNewSegment, posseToUse.getTaskLock().getVersion());
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
newSegmentId = allocateSegmentId(
lockRequestForNewSegment,
posseToUse.getTaskLock().getVersion(),
taskAllocatorId
);
}
}
return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
@ -514,6 +537,7 @@ public class TaskLockbox
}
}
@Nullable
private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist)
{
Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment");
@ -710,7 +734,7 @@ public class TaskLockbox
}
}
private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version)
private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version, String allocatorId)
{
return metadataStorageCoordinator.allocatePendingSegment(
request.getDataSource(),
@ -719,7 +743,8 @@ public class TaskLockbox
request.getInterval(),
request.getPartialShardSpec(),
version,
request.isSkipSegmentLineageCheck()
request.isSkipSegmentLineageCheck(),
allocatorId
);
}
@ -1159,12 +1184,25 @@ public class TaskLockbox
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
trackAppendingTask(task);
}
finally {
giant.unlock();
}
}
@GuardedBy("giant")
private void trackAppendingTask(Task task)
{
if (task instanceof PendingSegmentAllocatingTask) {
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (taskAllocatorId != null) {
activeAllocatorIdToTaskIds.computeIfAbsent(taskAllocatorId, s -> new HashSet<>())
.add(task.getId());
}
}
}
/**
* Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
*
@ -1176,13 +1214,35 @@ public class TaskLockbox
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
log.info(
"Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
upgradeSegmentsDeleted,
task.getId()
);
try {
// Clean upgrade segments table for entries associated with replacing task
if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
log.info(
"Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
upgradeSegmentsDeleted, task.getId()
);
}
// Clean pending segments associated with the appending task
if (task instanceof PendingSegmentAllocatingTask) {
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
final Set<String> idsInSameGroup = activeAllocatorIdToTaskIds.get(taskAllocatorId);
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
= metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId);
log.info(
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId
);
}
activeAllocatorIdToTaskIds.remove(taskAllocatorId);
}
}
}
catch (Exception e) {
log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables.");
}
unlockAll(task);
}
@ -1771,7 +1831,9 @@ public class TaskLockbox
action.getSequenceName(),
action.getPreviousSegmentId(),
acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(),
action.getPartialShardSpec()
action.getPartialShardSpec(),
null,
((PendingSegmentAllocatingTask) task).getTaskAllocatorId()
);
}

View File

@ -39,7 +39,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -126,15 +125,6 @@ public class SupervisorManager
return Optional.absent();
}
public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId)
{
if (supervisors.containsKey(activeSupervisorId)) {
return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
} else {
return Collections.emptySet();
}
}
public Optional<SupervisorSpec> getSupervisorSpec(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
@ -340,7 +330,7 @@ public class SupervisorManager
return true;
}
catch (Exception e) {
log.error(e, "PendingSegment[%s] mapping update request to version[%s] on Supervisor[%s] failed",
log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed",
basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId);
}
return false;

View File

@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@ -61,7 +62,7 @@ import java.util.Map;
public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
extends AbstractTask implements ChatHandler
extends AbstractTask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
@ -269,6 +270,12 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
}
@Override
public String getTaskAllocatorId()
{
return getTaskResource().getAvailabilityGroup();
}
protected abstract SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType> createTaskRunner();
/**

View File

@ -60,6 +60,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
@ -118,6 +119,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
private File baseDir;
private SupervisorManager supervisorManager;
protected File reportsFile;
@Before
@ -227,13 +229,14 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
taskStorage,
storageCoordinator,
new NoopServiceEmitter(),
null,
supervisorManager,
objectMapper
);
}
public TaskToolbox createTaskToolbox(TaskConfig config, Task task)
public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorManager supervisorManager)
{
this.supervisorManager = supervisorManager;
return new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))

View File

@ -36,10 +36,13 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -49,6 +52,7 @@ public class ActionsTestTask extends CommandQueueTask
{
private final TaskActionClient client;
private final AtomicInteger sequenceId = new AtomicInteger(0);
private final Map<SegmentId, SegmentId> announcedSegmentsToParentSegments = new HashMap<>();
public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory)
{
@ -78,16 +82,25 @@ public class ActionsTestTask extends CommandQueueTask
);
}
public Map<SegmentId, SegmentId> getAnnouncedSegmentsToParentSegments()
{
return announcedSegmentsToParentSegments;
}
public SegmentPublishResult commitAppendSegments(DataSegment... segments)
{
return runAction(
SegmentPublishResult publishResult = runAction(
SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments))
);
for (DataSegment segment : publishResult.getSegments()) {
announcedSegmentsToParentSegments.remove(segment.getId());
}
return publishResult;
}
public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Granularity preferredSegmentGranularity)
{
return runAction(
SegmentIdWithShardSpec pendingSegment = runAction(
new SegmentAllocateAction(
getDataSource(),
timestamp,
@ -101,28 +114,8 @@ public class ActionsTestTask extends CommandQueueTask
TaskLockType.APPEND
)
);
}
public SegmentIdWithShardSpec allocateSegmentForTimestamp(
DateTime timestamp,
Granularity preferredSegmentGranularity,
String sequenceName
)
{
return runAction(
new SegmentAllocateAction(
getDataSource(),
timestamp,
Granularities.SECOND,
preferredSegmentGranularity,
getId() + "__" + sequenceName,
null,
false,
NumberedPartialShardSpec.instance(),
LockGranularity.TIME_CHUNK,
TaskLockType.APPEND
)
);
announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId());
return pendingSegment;
}
private <T> T runAction(TaskAction<T> action)

View File

@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@ -40,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Test task that can be given a series of commands to execute in its {@link #runTask} method.
*/
public class CommandQueueTask extends AbstractTask
public class CommandQueueTask extends AbstractTask implements PendingSegmentAllocatingTask
{
private static final Logger log = new Logger(CommandQueueTask.class);
@ -140,6 +141,12 @@ public class CommandQueueTask extends AbstractTask
}
}
@Override
public String getTaskAllocatorId()
{
return getId();
}
@Override
public TaskStatus runTask(TaskToolbox taskToolbox)
{

View File

@ -728,6 +728,7 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
// Append segment for Oct-Dec
final DataSegment segmentV02 = asSegment(pendingSegment02);
appendTask2.commitAppendSegments(segmentV02);
appendTask2.finishRunAndGetStatus();
verifyIntervalHasUsedSegments(YEAR_23, segmentV02);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV02);
@ -747,12 +748,14 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
// Append segment for Jan 1st
final DataSegment segmentV01 = asSegment(pendingSegment01);
appendTask.commitAppendSegments(segmentV01);
appendTask.finishRunAndGetStatus();
verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV02);
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01, segmentV02);
// Replace segment for whole year
final DataSegment segmentV10 = createSegment(YEAR_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
replaceTask.finishRunAndGetStatus();
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
@ -767,6 +770,7 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
// Append segment for quarter
final DataSegment segmentV03 = asSegment(pendingSegment03);
appendTask3.commitAppendSegments(segmentV03);
appendTask3.finishRunAndGetStatus();
final DataSegment segmentV13 = DataSegment.builder(segmentV03)
.version(v1)
@ -1021,7 +1025,7 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
@Override
public TaskToolbox build(TaskConfig config, Task task)
{
return createTaskToolbox(config, task);
return createTaskToolbox(config, task, null);
}
};
}

View File

@ -0,0 +1,881 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.concurrent;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TestTaskToolboxFactory;
import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Contains tests to verify behaviour of concurrently running REPLACE and APPEND
* tasks on the same interval of a datasource.
* <p>
* The tests verify the interleaving of the following actions:
* <ul>
* <li>LOCK: Acquisition of a lock on an interval by a replace task</li>
* <li>ALLOCATE: Allocation of a pending segment by an append task</li>
* <li>REPLACE: Commit of segments created by a replace task</li>
* <li>APPEND: Commit of segments created by an append task</li>
* </ul>
*/
public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
{
/**
* The version used by append jobs when no previous replace job has run on an interval.
*/
private static final String SEGMENT_V0 = DateTimes.EPOCH.toString();
private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");
private static final String WIKI = "wiki";
private TaskQueue taskQueue;
private TaskActionClientFactory taskActionClientFactory;
private TaskActionClient dummyTaskActionClient;
private final List<ActionsTestTask> runningTasks = new ArrayList<>();
private ActionsTestTask appendTask;
private ActionsTestTask replaceTask;
private final AtomicInteger groupId = new AtomicInteger(0);
private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class);
private Capture<String> supervisorId;
private Capture<SegmentIdWithShardSpec> oldPendingSegment;
private Capture<SegmentIdWithShardSpec> newPendingSegment;
private Map<String, Map<Interval, Set<Object>>> versionToIntervalToLoadSpecs;
private Map<SegmentId, Object> parentSegmentToLoadSpec;
@Override
@Before
public void setUpIngestionTestBase() throws IOException
{
EasyMock.reset(supervisorManager);
EasyMock.expect(supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(WIKI))
.andReturn(Optional.of(WIKI)).anyTimes();
super.setUpIngestionTestBase();
final TaskConfig taskConfig = new TaskConfigBuilder().build();
taskActionClientFactory = createActionClientFactory();
dummyTaskActionClient = taskActionClientFactory.create(NoopTask.create());
final WorkerConfig workerConfig = new WorkerConfig().setCapacity(10);
TaskRunner taskRunner = new ThreadingTaskRunner(
createToolboxFactory(taskConfig, taskActionClientFactory),
taskConfig,
workerConfig,
new NoopTaskLogs(),
getObjectMapper(),
new TestAppenderatorsManager(),
new MultipleFileTaskReportFileWriter(),
new DruidNode("middleManager", "host", false, 8091, null, true, false),
TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
);
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(0L), null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
taskActionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper(),
new NoopTaskContextEnricher()
);
runningTasks.clear();
taskQueue.start();
groupId.set(0);
appendTask = createAndStartTask();
supervisorId = Capture.newInstance(CaptureType.ALL);
oldPendingSegment = Capture.newInstance(CaptureType.ALL);
newPendingSegment = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
EasyMock.capture(supervisorId),
EasyMock.capture(oldPendingSegment),
EasyMock.capture(newPendingSegment)
)).andReturn(true).anyTimes();
replaceTask = createAndStartTask();
EasyMock.replay(supervisorManager);
versionToIntervalToLoadSpecs = new HashMap<>();
parentSegmentToLoadSpec = new HashMap<>();
}
@After
public void tearDown()
{
verifyVersionIntervalLoadSpecUniqueness();
for (ActionsTestTask task : runningTasks) {
task.finishRunAndGetStatus();
}
}
@Test
public void testLockReplaceAllocateAppend()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(segmentV10.getVersion(), pendingSegment.getVersion());
final DataSegment segmentV11 = asSegment(pendingSegment);
commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendDayReplaceDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceDayAppendDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceDayAppendDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockAppendDayReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
replaceTask.finishRunAndGetStatus();
// Verify that the segment appended to v0 gets upgraded to v1
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.shardSpec(new NumberedShardSpec(1, 1))
.version(v1).build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateAppendDayLockReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that the segment appended to v0 gets fully overshadowed
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
}
@Test
public void testLockReplaceMonthAllocateAppendDay()
{
String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
// Verify that the allocated segment takes the version and interval of previous replace
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(v1, pendingSegment.getVersion());
final DataSegment segmentV11 = asSegment(pendingSegment);
commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendDayReplaceMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceMonthAppendDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceMonthAppendDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockAppendDayReplaceMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateAppendDayLockReplaceMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that the old segment gets completely replaced
verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10);
}
@Test
public void testLockReplaceDayAllocateAppendMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
replaceTask.commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that new segment gets allocated with DAY granularity even though preferred was MONTH
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(v1, pendingSegment.getVersion());
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
final DataSegment segmentV11 = asSegment(pendingSegment);
commitAppendSegments(segmentV11);
verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateAppendMonthReplaceDay()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that the segment is allocated for DAY granularity
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01);
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
// Verify that append segment gets upgraded to replace version
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.version(v1)
.interval(segmentV10.getInterval())
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testLockAllocateReplaceDayAppendMonth()
{
final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion();
// Verify that an APPEND lock cannot be acquired on month
TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23);
Assert.assertNull(appendLock);
// Verify that the segment is allocated for DAY granularity instead of MONTH
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
final DataSegment segmentV11 = DataSegment.builder(segmentV01)
.interval(FIRST_OF_JAN_23)
.version(v1)
.shardSpec(new NumberedShardSpec(1, 1))
.build();
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11);
}
@Test
public void testAllocateLockReplaceDayAppendMonth()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
// Verify that replace lock cannot be acquired on MONTH
TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
Assert.assertNull(replaceLock);
// Verify that segment cannot be committed since there is no lock
final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, SEGMENT_V0);
final ISE exception = Assert.assertThrows(ISE.class, () -> commitReplaceSegments(segmentV10));
final Throwable throwable = Throwables.getRootCause(exception);
Assert.assertEquals(
StringUtils.format(
"Segments[[%s]] are not covered by locks[[]] for task[%s]",
segmentV10, replaceTask.getId()
),
throwable.getMessage()
);
final DataSegment segmentV01 = asSegment(pendingSegment);
commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
}
@Test
public void testAllocateAppendMonthLockReplaceDay()
{
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH);
Assert.assertEquals(JAN_23, pendingSegment.getInterval());
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
final DataSegment segmentV01 = asSegment(pendingSegment);
appendTask.commitAppendSegments(segmentV01);
verifyIntervalHasUsedSegments(JAN_23, segmentV01);
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
// Verify that replace lock cannot be acquired on DAY as MONTH is already locked
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
Assert.assertNull(replaceLock);
}
@Test
public void testLockAllocateDayReplaceMonthAllocateAppend()
{
final SegmentIdWithShardSpec pendingSegmentV0
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
final DataSegment segmentV10 = createSegment(JAN_23, v1);
commitReplaceSegments(segmentV10);
verifyIntervalHasUsedSegments(JAN_23, segmentV10);
final SegmentIdWithShardSpec pendingSegmentV1
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(segmentV10.getVersion(), pendingSegmentV1.getVersion());
final DataSegment segmentV00 = asSegment(pendingSegmentV0);
final DataSegment segmentV11 = asSegment(pendingSegmentV1);
Set<DataSegment> appendSegments = commitAppendSegments(segmentV00, segmentV11)
.getSegments();
Assert.assertEquals(3, appendSegments.size());
// Segment V11 is committed
Assert.assertTrue(appendSegments.remove(segmentV11));
// Segment V00 is also committed
Assert.assertTrue(appendSegments.remove(segmentV00));
// Segment V00 is upgraded to v1 with MONTH granularlity at the time of commit as V12
final DataSegment segmentV12 = Iterables.getOnlyElement(appendSegments);
Assert.assertEquals(v1, segmentV12.getVersion());
Assert.assertEquals(JAN_23, segmentV12.getInterval());
Assert.assertEquals(segmentV00.getLoadSpec(), segmentV12.getLoadSpec());
verifyIntervalHasUsedSegments(JAN_23, segmentV00, segmentV10, segmentV11, segmentV12);
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12);
}
@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)
{
for (DataSegment segment : segments) {
if (version.equals(segment.getVersion())
&& Objects.equals(segment.getLoadSpec(), loadSpec)) {
return segment;
}
}
return null;
}
private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
return new DataSegment(
id,
Collections.singletonMap(id.toString(), id.toString()),
Collections.emptyList(),
Collections.emptyList(),
pendingSegment.getShardSpec(),
null,
0,
0
);
}
private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... expectedSegments)
{
verifySegments(interval, Segments.INCLUDING_OVERSHADOWED, expectedSegments);
}
private void verifyIntervalHasVisibleSegments(Interval interval, DataSegment... expectedSegments)
{
verifySegments(interval, Segments.ONLY_VISIBLE, expectedSegments);
}
private void verifySegments(Interval interval, Segments visibility, DataSegment... expectedSegments)
{
try {
Collection<DataSegment> allUsedSegments = dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(interval),
visibility
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
}
catch (IOException e) {
throw new ISE(e, "Error while fetching used segments in interval[%s]", interval);
}
}
private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments)
{
try {
final TaskActionClient taskActionClient = taskActionClientFactory.create(task);
Collection<DataSegment> allUsedSegments = taskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
Collections.singletonList(interval)
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
}
catch (IOException e) {
throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval);
}
}
private TaskToolboxFactory createToolboxFactory(
TaskConfig taskConfig,
TaskActionClientFactory taskActionClientFactory
)
{
TestTaskToolboxFactory.Builder builder = new TestTaskToolboxFactory.Builder()
.setConfig(taskConfig)
.setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT))
.setTaskActionClientFactory(taskActionClientFactory);
return new TestTaskToolboxFactory(builder)
{
@Override
public TaskToolbox build(TaskConfig config, Task task)
{
return createTaskToolbox(config, task, supervisorManager);
}
};
}
private DataSegment createSegment(Interval interval, String version)
{
SegmentId id = SegmentId.of(WIKI, interval, version, null);
return DataSegment.builder()
.dataSource(WIKI)
.interval(interval)
.version(version)
.loadSpec(Collections.singletonMap(id.toString(), id.toString()))
.size(100)
.build();
}
private ActionsTestTask createAndStartTask()
{
ActionsTestTask task = new ActionsTestTask(WIKI, "test_" + groupId.incrementAndGet(), taskActionClientFactory);
taskQueue.add(task);
runningTasks.add(task);
return task;
}
private void commitReplaceSegments(DataSegment... dataSegments)
{
replaceTask.commitReplaceSegments(dataSegments);
for (int i = 0; i < supervisorId.getValues().size(); i++) {
announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i));
}
supervisorId.reset();
oldPendingSegment.reset();
newPendingSegment.reset();
replaceTask.finishRunAndGetStatus();
}
private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments)
{
SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments);
result.getSegments().forEach(this::unannounceUpgradedPendingSegment);
for (DataSegment segment : dataSegments) {
parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values()));
}
appendTask.finishRunAndGetStatus();
return result;
}
private void announceUpgradedPendingSegment(
SegmentIdWithShardSpec oldPendingSegment,
SegmentIdWithShardSpec newPendingSegment
)
{
appendTask.getAnnouncedSegmentsToParentSegments()
.put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId());
}
private void unannounceUpgradedPendingSegment(
DataSegment segment
)
{
appendTask.getAnnouncedSegmentsToParentSegments()
.remove(segment.getId());
}
private void verifyVersionIntervalLoadSpecUniqueness()
{
for (DataSegment usedSegment : getAllUsedSegments()) {
final String version = usedSegment.getVersion();
final Interval interval = usedSegment.getInterval();
final Object loadSpec = Iterables.getOnlyElement(usedSegment.getLoadSpec().values());
Map<Interval, Set<Object>> intervalToLoadSpecs
= versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
Set<Object> loadSpecs
= intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
Assert.assertFalse(loadSpecs.contains(loadSpec));
loadSpecs.add(loadSpec);
}
for (Map.Entry<SegmentId, SegmentId> entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
final String version = entry.getKey().getVersion();
final Interval interval = entry.getKey().getInterval();
final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue());
Map<Interval, Set<Object>> intervalToLoadSpecs
= versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>());
Set<Object> loadSpecs
= intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>());
Assert.assertFalse(loadSpecs.contains(loadSpec));
loadSpecs.add(loadSpec);
}
}
private Collection<DataSegment> getAllUsedSegments()
{
try {
return dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(Intervals.ETERNITY),
Segments.INCLUDING_OVERSHADOWED
)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1896,14 +1896,17 @@ public class TaskLockboxTest
}
@Test
public void testUpgradeSegmentsCleanupOnUnlock()
public void testCleanupOnUnlock()
{
final Task replaceTask = NoopTask.create();
final Task appendTask = NoopTask.create();
final Task replaceTask = NoopTask.forDatasource("replace");
final Task appendTask = NoopTask.forDatasource("append");
final IndexerSQLMetadataStorageCoordinator coordinator
= EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
// Only the replaceTask should attempt a delete on the upgradeSegments table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
// Any task may attempt pending segment clean up
EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once();
EasyMock.replay(coordinator);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
@ -175,7 +176,8 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
@Override
public SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
String taskGroup
)
{
return SegmentPublishResult.ok(commitSegments(appendSegments));
@ -186,7 +188,8 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
String taskGroup
)
{
return SegmentPublishResult.ok(commitSegments(appendSegments));
@ -228,7 +231,8 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
Interval interval,
PartialShardSpec partialShardSpec,
String maxVersion,
boolean skipSegmentLineageCheck
boolean skipSegmentLineageCheck,
String taskAllocatorId
)
{
return new SegmentIdWithShardSpec(
@ -241,8 +245,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<String> activeBaseSequenceNames
Set<DataSegment> replaceSegments
)
{
return Collections.emptyMap();
@ -285,6 +288,18 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
throw new UnsupportedOperationException();
}
@Override
public int deletePendingSegmentsForTaskGroup(final String taskGroup)
{
throw new UnsupportedOperationException();
}
@Override
public List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval)
{
throw new UnsupportedOperationException();
}
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
@ -238,7 +239,7 @@ public interface IndexerMetadataStorageCoordinator
* identifier may have a version lower than this one, but will not have one higher.
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
*
* @param taskAllocatorId The task allocator id with which the pending segment is associated
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
SegmentIdWithShardSpec allocatePendingSegment(
@ -248,7 +249,8 @@ public interface IndexerMetadataStorageCoordinator
Interval interval,
PartialShardSpec partialShardSpec,
String maxVersion,
boolean skipSegmentLineageCheck
boolean skipSegmentLineageCheck,
String taskAllocatorId
);
/**
@ -322,10 +324,12 @@ public interface IndexerMetadataStorageCoordinator
* must be committed in a single transaction.
* @param appendSegmentToReplaceLock Map from append segment to the currently
* active REPLACE lock (if any) covering it
* @param taskAllocatorId allocator id of the task committing the segments to be appended
*/
SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
String taskAllocatorId
);
/**
@ -340,7 +344,8 @@ public interface IndexerMetadataStorageCoordinator
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
String taskGroup
);
/**
@ -373,13 +378,10 @@ public interface IndexerMetadataStorageCoordinator
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups
* of the supervisor (if any) for this datasource
* @return Map from originally allocated pending segment to its new upgraded ID.
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<String> activeRealtimeSequencePrefixes
Set<DataSegment> replaceSegments
);
/**
@ -476,4 +478,19 @@ public interface IndexerMetadataStorageCoordinator
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
/**
* Delete pending segment for a give task group after all the tasks belonging to it have completed.
* @param taskAllocatorId task id / task group / replica group for an appending task
* @return number of pending segments deleted from the metadata store
*/
int deletePendingSegmentsForTaskGroup(String taskAllocatorId);
/**
* Fetches all the pending segments of the datasource that overlap with a given interval.
* @param datasource datasource to be queried
* @param interval interval with which segments overlap
* @return List of pending segment records
*/
List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval);
}

View File

@ -38,18 +38,24 @@ public class SegmentCreateRequest
private final String sequenceName;
private final String previousSegmentId;
private final PartialShardSpec partialShardSpec;
private final String upgradedFromSegmentId;
private final String taskAllocatorId;
public SegmentCreateRequest(
String sequenceName,
String previousSegmentId,
String version,
PartialShardSpec partialShardSpec
PartialShardSpec partialShardSpec,
String upgradedFromSegmentId,
String taskAllocatorId
)
{
this.sequenceName = sequenceName;
this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId;
this.version = version;
this.partialShardSpec = partialShardSpec;
this.upgradedFromSegmentId = upgradedFromSegmentId;
this.taskAllocatorId = taskAllocatorId;
}
public String getSequenceName()
@ -75,4 +81,14 @@ public class SegmentCreateRequest
{
return partialShardSpec;
}
public String getUpgradedFromSegmentId()
{
return upgradedFromSegmentId;
}
public String getTaskAllocatorId()
{
return taskAllocatorId;
}
}

View File

@ -57,7 +57,6 @@ import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
@ -94,7 +93,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -281,99 +279,74 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
/**
* Fetches all the pending segments, whose interval overlaps with the given
* search interval and has a sequence_name that begins with one of the prefixes in sequenceNamePrefixFilter
* from the metadata store. Returns a Map from the pending segment ID to the sequence name.
* Fetches all the pending segments, whose interval overlaps with the given search interval, from the metadata store.
*/
@VisibleForTesting
Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval,
final Set<String> sequenceNamePrefixFilter
) throws IOException
{
if (sequenceNamePrefixFilter.isEmpty()) {
return Collections.emptyMap();
}
final List<String> sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter);
final List<String> sequenceNamePrefixConditions = new ArrayList<>();
for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE :prefix%d)", i));
}
String sql = "SELECT sequence_name, payload"
+ " FROM " + dbTables.getPendingSegmentsTable()
+ " WHERE dataSource = :dataSource"
+ " AND start < :end"
+ StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString())
+ " AND ( " + String.join(" OR ", sequenceNamePrefixConditions) + " )";
Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());
for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i) + "%");
}
final ResultIterator<PendingSegmentsRecord> dbSegments =
query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
.iterator();
final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<>();
while (dbSegments.hasNext()) {
PendingSegmentsRecord record = dbSegments.next();
final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
if (interval.overlaps(identifier.getInterval())) {
pendingSegmentToSequenceName.put(identifier, record.sequenceName);
}
}
dbSegments.close();
return pendingSegmentToSequenceName;
}
private Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
) throws IOException
)
{
final ResultIterator<PendingSegmentsRecord> dbSegments =
handle.createQuery(
StringUtils.format(
// This query might fail if the year has a different number of digits
// See https://github.com/apache/druid/pull/11582 for a similar issue
// Using long for these timestamps instead of varchar would give correct time comparisons
"SELECT sequence_name, payload FROM %1$s"
+ " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start",
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
.iterator();
final boolean compareIntervalEndpointsAsStrings = Intervals.canCompareEndpointsAsStrings(interval);
final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = new HashMap<>();
while (dbSegments.hasNext()) {
PendingSegmentsRecord record = dbSegments.next();
final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
if (interval.overlaps(identifier.getInterval())) {
pendingSegmentToSequenceName.put(identifier, record.sequenceName);
}
String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id"
+ " FROM " + dbTables.getPendingSegmentsTable()
+ " WHERE dataSource = :dataSource";
if (compareIntervalEndpointsAsStrings) {
sql = sql
+ " AND start < :end"
+ StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString());
}
dbSegments.close();
Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("dataSource", dataSource);
if (compareIntervalEndpointsAsStrings) {
query = query.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());
}
return pendingSegmentToSequenceName;
final ResultIterator<PendingSegmentRecord> pendingSegmentIterator =
query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper))
.iterator();
final ImmutableList.Builder<PendingSegmentRecord> pendingSegments = ImmutableList.builder();
while (pendingSegmentIterator.hasNext()) {
final PendingSegmentRecord pendingSegment = pendingSegmentIterator.next();
if (compareIntervalEndpointsAsStrings || pendingSegment.getId().getInterval().overlaps(interval)) {
pendingSegments.add(pendingSegment);
}
}
pendingSegmentIterator.close();
return pendingSegments.build();
}
List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorIdWithHandle(
final Handle handle,
final String dataSource,
final String taskAllocatorId
)
{
String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id"
+ " FROM " + dbTables.getPendingSegmentsTable()
+ " WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id";
Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("task_allocator_id", taskAllocatorId);
final ResultIterator<PendingSegmentRecord> pendingSegmentRecords =
query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper))
.iterator();
final List<PendingSegmentRecord> pendingSegments = new ArrayList<>();
while (pendingSegmentRecords.hasNext()) {
pendingSegments.add(pendingSegmentRecords.next());
}
pendingSegmentRecords.close();
return pendingSegments;
}
private SegmentTimeline getTimelineForIntervalsWithHandle(
@ -503,9 +476,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
segmentsToInsert.addAll(
createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask)
);
return SegmentPublishResult.ok(
SegmentPublishResult result = SegmentPublishResult.ok(
insertSegments(handle, segmentsToInsert)
);
upgradePendingSegmentsOverlappingWith(segmentsToInsert);
return result;
},
3,
getSqlMetadataMaxRetry()
@ -519,14 +494,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override
public SegmentPublishResult commitAppendSegments(
final Set<DataSegment> appendSegments,
final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
final String taskAllocatorId
)
{
return commitAppendSegmentsAndMetadataInTransaction(
appendSegments,
appendSegmentToReplaceLock,
null,
null
null,
taskAllocatorId
);
}
@ -535,14 +512,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
String taskAllocatorId
)
{
return commitAppendSegmentsAndMetadataInTransaction(
appendSegments,
appendSegmentToReplaceLock,
startMetadata,
endMetadata
endMetadata,
taskAllocatorId
);
}
@ -645,7 +624,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion,
final boolean skipSegmentLineageCheck
final boolean skipSegmentLineageCheck,
String taskAllocatorId
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
@ -677,7 +657,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
allocateInterval,
partialShardSpec,
maxVersion,
existingChunks
existingChunks,
taskAllocatorId
);
} else {
return allocatePendingSegmentWithSegmentLineageCheck(
@ -688,7 +669,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
allocateInterval,
partialShardSpec,
maxVersion,
existingChunks
existingChunks,
taskAllocatorId
);
}
}
@ -697,8 +679,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<String> activeRealtimeSequencePrefixes
Set<DataSegment> replaceSegments
)
{
if (replaceSegments.isEmpty()) {
@ -717,7 +698,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final String datasource = replaceSegments.iterator().next().getDataSource();
return connector.retryWithHandle(
handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes)
handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId)
);
}
@ -736,11 +717,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegments(
Handle handle,
String datasource,
Map<Interval, DataSegment> replaceIntervalToMaxId,
Set<String> activeRealtimeSequencePrefixes
) throws IOException
Map<Interval, DataSegment> replaceIntervalToMaxId
) throws JsonProcessingException
{
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> newPendingSegmentVersions = new HashMap<>();
final List<PendingSegmentRecord> upgradedPendingSegments = new ArrayList<>();
final Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> pendingSegmentToNewId = new HashMap<>();
for (Map.Entry<Interval, DataSegment> entry : replaceIntervalToMaxId.entrySet()) {
@ -751,15 +731,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final int numCorePartitions = maxSegmentId.getShardSpec().getNumCorePartitions();
int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum();
final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
= getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes);
final List<PendingSegmentRecord> overlappingPendingSegments
= getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval);
for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
: overlappingPendingSegments.entrySet()) {
final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey();
final String pendingSegmentSequence = overlappingPendingSegment.getValue();
for (PendingSegmentRecord overlappingPendingSegment : overlappingPendingSegments) {
final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getId();
if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) {
if (shouldUpgradePendingSegment(overlappingPendingSegment, replaceInterval, replaceVersion)) {
// Ensure unique sequence_name_prev_id_sha1 by setting
// sequence_prev_id -> pendingSegmentId
// sequence_name -> prefix + replaceVersion
@ -769,14 +747,14 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
replaceVersion,
new NumberedShardSpec(++currentPartitionNumber, numCorePartitions)
);
newPendingSegmentVersions.put(
new SegmentCreateRequest(
upgradedPendingSegments.add(
new PendingSegmentRecord(
newId,
UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,
pendingSegmentId.toString(),
replaceVersion,
NumberedPartialShardSpec.instance()
),
newId
pendingSegmentId.toString(),
overlappingPendingSegment.getTaskAllocatorId()
)
);
pendingSegmentToNewId.put(pendingSegmentId, newId);
}
@ -787,33 +765,34 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// includes hash of both sequence_name and prev_segment_id
int numInsertedPendingSegments = insertPendingSegmentsIntoMetastore(
handle,
newPendingSegmentVersions,
upgradedPendingSegments,
datasource,
false
);
log.info(
"Inserted total [%d] new versions for [%d] pending segments.",
numInsertedPendingSegments, newPendingSegmentVersions.size()
numInsertedPendingSegments, upgradedPendingSegments.size()
);
return pendingSegmentToNewId;
}
private boolean shouldUpgradePendingSegment(
SegmentIdWithShardSpec pendingSegmentId,
String pendingSegmentSequenceName,
PendingSegmentRecord pendingSegment,
Interval replaceInterval,
String replaceVersion
)
{
if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) {
if (pendingSegment.getTaskAllocatorId() == null) {
return false;
} else if (!replaceInterval.contains(pendingSegmentId.getInterval())) {
} else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) {
return false;
} else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) {
return false;
} else {
// Do not upgrade already upgraded pending segment
return pendingSegmentSequenceName == null
|| !pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
return pendingSegment.getSequenceName() == null
|| !pendingSegment.getSequenceName().startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
}
}
@ -826,7 +805,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion,
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
final String taskAllocatorId
) throws IOException
{
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
@ -896,7 +876,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
interval,
previousSegmentIdNotNull,
sequenceName,
sequenceNamePrevIdSha1
sequenceNamePrevIdSha1,
taskAllocatorId
);
return newIdentifier;
}
@ -947,7 +928,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
// For each of the remaining requests, create a new segment
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = createNewSegments(
final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments = createNewSegments(
handle,
dataSource,
interval,
@ -965,12 +946,14 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319)
insertPendingSegmentsIntoMetastore(
handle,
createdSegments,
ImmutableList.copyOf(createdSegments.values()),
dataSource,
skipSegmentLineageCheck
);
allocatedSegmentIds.putAll(createdSegments);
for (Map.Entry<SegmentCreateRequest, PendingSegmentRecord> entry : createdSegments.entrySet()) {
allocatedSegmentIds.put(entry.getKey(), entry.getValue().getId());
}
return allocatedSegmentIds;
}
@ -1009,7 +992,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion,
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
final String taskAllocatorId
) throws IOException
{
final String sql = StringUtils.format(
@ -1073,7 +1057,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
// always insert empty previous sequence id
insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1,
taskAllocatorId
);
log.info(
"Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].",
@ -1281,7 +1267,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@Nullable DataSourceMetadata startMetadata,
@Nullable DataSourceMetadata endMetadata
@Nullable DataSourceMetadata endMetadata,
String taskAllocatorId
)
{
verifySegmentsToCommit(appendSegments);
@ -1291,16 +1278,38 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
final String dataSource = appendSegments.iterator().next().getDataSource();
final Set<DataSegment> segmentIdsForNewVersions = connector.retryTransaction(
final List<PendingSegmentRecord> segmentIdsForNewVersions = connector.retryTransaction(
(handle, transactionStatus)
-> createNewIdsForAppendSegments(handle, dataSource, appendSegments),
-> getPendingSegmentsForTaskAllocatorIdWithHandle(handle, dataSource, taskAllocatorId),
0,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
// Create entries for all required versions of the append segments
final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
allSegmentsToInsert.addAll(segmentIdsForNewVersions);
final Map<String, DataSegment> segmentIdMap = new HashMap<>();
appendSegments.forEach(segment -> segmentIdMap.put(segment.getId().toString(), segment));
segmentIdsForNewVersions.forEach(
pendingSegment -> {
if (segmentIdMap.containsKey(pendingSegment.getUpgradedFromSegmentId())) {
final DataSegment oldSegment = segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId());
allSegmentsToInsert.add(
new DataSegment(
pendingSegment.getId().asSegmentId(),
oldSegment.getLoadSpec(),
oldSegment.getDimensions(),
oldSegment.getMetrics(),
pendingSegment.getId().getShardSpec(),
oldSegment.getLastCompactionState(),
oldSegment.getBinaryVersion(),
oldSegment.getSize()
)
);
}
}
);
final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
try {
@ -1341,31 +1350,27 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
private int insertPendingSegmentsIntoMetastore(
@VisibleForTesting
int insertPendingSegmentsIntoMetastore(
Handle handle,
Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments,
List<PendingSegmentRecord> pendingSegments,
String dataSource,
boolean skipSegmentLineageCheck
) throws JsonProcessingException
{
final PreparedBatch insertBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ "sequence_name_prev_id_sha1, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+ ":sequence_name_prev_id_sha1, :payload)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
));
// Deduplicate the segment ids by inverting the map
Map<SegmentIdWithShardSpec, SegmentCreateRequest> segmentIdToRequest = new HashMap<>();
createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request));
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ "sequence_name_prev_id_sha1, payload, task_allocator_id, upgraded_from_segment_id) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+ ":sequence_name_prev_id_sha1, :payload, :task_allocator_id, :upgraded_from_segment_id)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
));
final String now = DateTimes.nowUtc().toString();
for (Map.Entry<SegmentIdWithShardSpec, SegmentCreateRequest> entry : segmentIdToRequest.entrySet()) {
final SegmentCreateRequest request = entry.getValue();
final SegmentIdWithShardSpec segmentId = entry.getKey();
for (PendingSegmentRecord pendingSegment : pendingSegments) {
final SegmentIdWithShardSpec segmentId = pendingSegment.getId();
final Interval interval = segmentId.getInterval();
insertBatch.add()
@ -1374,13 +1379,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("created_date", now)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("sequence_name", request.getSequenceName())
.bind("sequence_prev_id", request.getPreviousSegmentId())
.bind("sequence_name", pendingSegment.getSequenceName())
.bind("sequence_prev_id", pendingSegment.getSequencePrevId())
.bind(
"sequence_name_prev_id_sha1",
getSequenceNameAndPrevIdSha(request, segmentId, skipSegmentLineageCheck)
pendingSegment.computeSequenceNamePrevIdSha1(skipSegmentLineageCheck)
)
.bind("payload", jsonMapper.writeValueAsBytes(segmentId));
.bind("payload", jsonMapper.writeValueAsBytes(segmentId))
.bind("task_allocator_id", pendingSegment.getTaskAllocatorId())
.bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId());
}
int[] updated = insertBatch.execute();
return Arrays.stream(updated).sum();
@ -1393,15 +1400,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Interval interval,
String previousSegmentId,
String sequenceName,
String sequenceNamePrevIdSha1
String sequenceNamePrevIdSha1,
String taskAllocatorId
) throws JsonProcessingException
{
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ "sequence_name_prev_id_sha1, payload) "
+ "sequence_name_prev_id_sha1, payload, task_allocator_id) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+ ":sequence_name_prev_id_sha1, :payload)",
+ ":sequence_name_prev_id_sha1, :payload, :task_allocator_id)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
@ -1415,188 +1423,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("sequence_prev_id", previousSegmentId)
.bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
.bind("task_allocator_id", taskAllocatorId)
.execute();
}
/**
* Creates new IDs for the given append segments if a REPLACE task started and
* finished after these append segments had already been allocated. The newly
* created IDs belong to the same interval and version as the segments committed
* by the REPLACE task.
*/
private Set<DataSegment> createNewIdsForAppendSegments(
Handle handle,
String dataSource,
Set<DataSegment> segmentsToAppend
) throws IOException
{
if (segmentsToAppend.isEmpty()) {
return Collections.emptySet();
}
final Set<Interval> appendIntervals = new HashSet<>();
final TreeMap<String, Set<DataSegment>> appendVersionToSegments = new TreeMap<>();
for (DataSegment segment : segmentsToAppend) {
appendIntervals.add(segment.getInterval());
appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
.add(segment);
}
// Fetch all used segments that overlap with any of the append intervals
final Collection<DataSegment> overlappingSegments = retrieveUsedSegmentsForIntervals(
dataSource,
new ArrayList<>(appendIntervals),
Segments.INCLUDING_OVERSHADOWED
);
final Map<String, Set<Interval>> overlappingVersionToIntervals = new HashMap<>();
final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new HashMap<>();
for (DataSegment segment : overlappingSegments) {
overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>())
.add(segment.getInterval());
overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>())
.add(segment);
}
final Set<DataSegment> upgradedSegments = new HashSet<>();
for (Map.Entry<String, Set<Interval>> entry : overlappingVersionToIntervals.entrySet()) {
final String upgradeVersion = entry.getKey();
Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan(
upgradeVersion,
entry.getValue(),
appendVersionToSegments
);
for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : segmentsToUpgrade.entrySet()) {
final Interval upgradeInterval = upgradeEntry.getKey();
final Set<DataSegment> segmentsAlreadyOnVersion
= overlappingIntervalToSegments.getOrDefault(upgradeInterval, Collections.emptySet())
.stream()
.filter(s -> s.getVersion().equals(upgradeVersion))
.collect(Collectors.toSet());
Set<DataSegment> segmentsUpgradedToVersion = createNewIdsForAppendSegmentsWithVersion(
handle,
upgradeVersion,
upgradeInterval,
upgradeEntry.getValue(),
segmentsAlreadyOnVersion
);
log.info("Upgraded [%d] segments to version[%s].", segmentsUpgradedToVersion.size(), upgradeVersion);
upgradedSegments.addAll(segmentsUpgradedToVersion);
}
}
return upgradedSegments;
}
/**
* Creates a Map from eligible interval to Set of segments that are fully
* contained in that interval and have a version strictly lower than {@code #cutoffVersion}.
*/
private Map<Interval, Set<DataSegment>> getSegmentsWithVersionLowerThan(
String cutoffVersion,
Set<Interval> eligibleIntervals,
TreeMap<String, Set<DataSegment>> versionToSegments
)
{
final Set<DataSegment> eligibleSegments
= versionToSegments.headMap(cutoffVersion).values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());
final Map<Interval, Set<DataSegment>> eligibleIntervalToSegments = new HashMap<>();
for (DataSegment segment : eligibleSegments) {
final Interval segmentInterval = segment.getInterval();
for (Interval eligibleInterval : eligibleIntervals) {
if (eligibleInterval.contains(segmentInterval)) {
eligibleIntervalToSegments.computeIfAbsent(eligibleInterval, itvl -> new HashSet<>())
.add(segment);
break;
} else if (eligibleInterval.overlaps(segmentInterval)) {
// Committed interval overlaps only partially
throw new ISE(
"Committed interval[%s] conflicts with interval[%s] of append segment[%s].",
eligibleInterval, segmentInterval, segment.getId()
);
}
}
}
return eligibleIntervalToSegments;
}
/**
* Computes new segment IDs that belong to the upgradeInterval and upgradeVersion.
*
* @param committedSegments Segments that already exist in the upgradeInterval
* at upgradeVersion.
*/
private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(
Handle handle,
String upgradeVersion,
Interval upgradeInterval,
Set<DataSegment> segmentsToUpgrade,
Set<DataSegment> committedSegments
) throws IOException
{
// Find the committed segments with the higest partition number
SegmentIdWithShardSpec committedMaxId = null;
for (DataSegment committedSegment : committedSegments) {
if (committedMaxId == null
|| committedMaxId.getShardSpec().getPartitionNum() < committedSegment.getShardSpec().getPartitionNum()) {
committedMaxId = SegmentIdWithShardSpec.fromDataSegment(committedSegment);
}
}
// Get pending segments for the new version to determine the next partition number to allocate
final String dataSource = segmentsToUpgrade.iterator().next().getDataSource();
final Set<SegmentIdWithShardSpec> pendingSegmentIds
= getPendingSegmentsForIntervalWithHandle(handle, dataSource, upgradeInterval).keySet();
final Set<SegmentIdWithShardSpec> allAllocatedIds = new HashSet<>(pendingSegmentIds);
// Create new IDs for each append segment
final Set<DataSegment> newSegmentIds = new HashSet<>();
for (DataSegment segment : segmentsToUpgrade) {
SegmentCreateRequest request = new SegmentCreateRequest(
segment.getId() + "__" + upgradeVersion,
null,
upgradeVersion,
NumberedPartialShardSpec.instance()
);
// Create new segment ID based on committed segments, allocated pending segments
// and new IDs created so far in this method
final SegmentIdWithShardSpec newId = createNewSegment(
request,
dataSource,
upgradeInterval,
upgradeVersion,
committedMaxId,
allAllocatedIds
);
// Update the set so that subsequent segment IDs use a higher partition number
allAllocatedIds.add(newId);
newSegmentIds.add(
DataSegment.builder(segment)
.interval(newId.getInterval())
.version(newId.getVersion())
.shardSpec(newId.getShardSpec())
.build()
);
}
return newSegmentIds;
}
private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
Handle handle,
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<TimelineObjectHolder<String, DataSegment>> existingChunks,
List<SegmentCreateRequest> requests
) throws IOException
)
{
if (requests.isEmpty()) {
return Collections.emptyMap();
@ -1637,18 +1475,21 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendingSegments =
new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet());
final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>(
getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream()
.map(PendingSegmentRecord::getId)
.collect(Collectors.toSet())
);
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<>();
final Map<UniqueAllocateRequest, SegmentIdWithShardSpec> uniqueRequestToSegment = new HashMap<>();
final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments = new HashMap<>();
final Map<UniqueAllocateRequest, PendingSegmentRecord> uniqueRequestToSegment = new HashMap<>();
for (SegmentCreateRequest request : requests) {
// Check if the required segment has already been created in this batch
final UniqueAllocateRequest uniqueRequest =
new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck);
final SegmentIdWithShardSpec createdSegment;
final PendingSegmentRecord createdSegment;
if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
createdSegment = uniqueRequestToSegment.get(uniqueRequest);
} else {
@ -1663,9 +1504,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// Add to pendingSegments to consider for partitionId
if (createdSegment != null) {
pendingSegments.add(createdSegment);
pendingSegments.add(createdSegment.getId());
uniqueRequestToSegment.put(uniqueRequest, createdSegment);
log.info("Created new segment[%s]", createdSegment);
log.info("Created new segment[%s]", createdSegment.getId());
}
}
@ -1678,7 +1519,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return createdSegments;
}
private SegmentIdWithShardSpec createNewSegment(
private PendingSegmentRecord createNewSegment(
SegmentCreateRequest request,
String dataSource,
Interval interval,
@ -1731,12 +1572,19 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
return new SegmentIdWithShardSpec(
SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
return new PendingSegmentRecord(
pendingSegmentId,
request.getSequenceName(),
request.getPreviousSegmentId(),
request.getUpgradedFromSegmentId(),
request.getTaskAllocatorId()
);
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
@ -1761,7 +1609,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
return new SegmentIdWithShardSpec(
SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
dataSource,
interval,
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
@ -1771,6 +1619,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
return new PendingSegmentRecord(
pendingSegmentId,
request.getSequenceName(),
request.getPreviousSegmentId(),
request.getUpgradedFromSegmentId(),
request.getTaskAllocatorId()
);
}
}
@ -1796,7 +1651,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final PartialShardSpec partialShardSpec,
final String existingVersion,
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
)
{
// max partitionId of published data segments which share the same partition space.
SegmentIdWithShardSpec committedMaxId = null;
@ -1830,7 +1685,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()
getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream()
.map(PendingSegmentRecord::getId)
.collect(Collectors.toSet())
);
if (committedMaxId != null) {
pendings.add(committedMaxId);
@ -2688,6 +2545,30 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
@Override
public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE task_allocator_id = :task_allocator_id",
dbTables.getPendingSegmentsTable()
)
)
.bind("task_allocator_id", pendingSegmentsGroup)
.execute()
);
}
@Override
public List<PendingSegmentRecord> getPendingSegments(String datasource, Interval interval)
{
return connector.retryWithHandle(
handle -> getPendingSegmentsForIntervalWithHandle(handle, datasource, interval)
);
}
@Override
public int deleteUpgradeSegmentsForTask(final String taskId)
{

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.sql.ResultSet;
/**
* Representation of a record in the pending segments table. <br/>
* Mapping of column in table to field:
*
* <ul>
* <li> id -> id (Unique identifier for pending segment) <li/>
* <li> sequence_name -> sequenceName (sequence name used for segment allocation) <li/>
* <li> sequence_prev_id -> sequencePrevId (previous segment id used for segment allocation) <li/>
* <li> upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root segment from which this was upgraded) <li/>
* <li> task_allocator_id -> taskAllocatorId (Associates a task / task group / replica group with the pending segment) <li/>
* </ul>
*/
public class PendingSegmentRecord
{
private final SegmentIdWithShardSpec id;
private final String sequenceName;
private final String sequencePrevId;
private final String upgradedFromSegmentId;
private final String taskAllocatorId;
public PendingSegmentRecord(
SegmentIdWithShardSpec id,
String sequenceName,
String sequencePrevId,
@Nullable String upgradedFromSegmentId,
@Nullable String taskAllocatorId
)
{
this.id = id;
this.sequenceName = sequenceName;
this.sequencePrevId = sequencePrevId;
this.upgradedFromSegmentId = upgradedFromSegmentId;
this.taskAllocatorId = taskAllocatorId;
}
public SegmentIdWithShardSpec getId()
{
return id;
}
public String getSequenceName()
{
return sequenceName;
}
public String getSequencePrevId()
{
return sequencePrevId;
}
/**
* The original pending segment using which this upgraded segment was created.
* Can be null for pending segments allocated before this column was added or for segments that have not been upgraded.
*/
@Nullable
public String getUpgradedFromSegmentId()
{
return upgradedFromSegmentId;
}
/**
* task / taskGroup / replica group of task that allocated this segment.
* Can be null for pending segments allocated before this column was added.
*/
@Nullable
public String getTaskAllocatorId()
{
return taskAllocatorId;
}
@SuppressWarnings("UnstableApiUsage")
public String computeSequenceNamePrevIdSha1(boolean skipSegmentLineageCheck)
{
final Hasher hasher = Hashing.sha1().newHasher()
.putBytes(StringUtils.toUtf8(getSequenceName()))
.putByte((byte) 0xff);
if (skipSegmentLineageCheck) {
final Interval interval = getId().getInterval();
hasher
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis());
} else {
hasher
.putBytes(StringUtils.toUtf8(getSequencePrevId()));
}
hasher.putByte((byte) 0xff);
hasher.putBytes(StringUtils.toUtf8(getId().getVersion()));
return BaseEncoding.base16().encode(hasher.hash().asBytes());
}
public static PendingSegmentRecord fromResultSet(ResultSet resultSet, ObjectMapper jsonMapper)
{
try {
final byte[] payload = resultSet.getBytes("payload");
return new PendingSegmentRecord(
jsonMapper.readValue(payload, SegmentIdWithShardSpec.class),
resultSet.getString("sequence_name"),
resultSet.getString("sequence_prev_id"),
resultSet.getString("upgraded_from_segment_id"),
resultSet.getString("task_allocator_id")
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -288,6 +288,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
)
)
);
alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
}
public void createDataSourceTable(final String tableName)
@ -460,6 +461,26 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
}
}
private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName)
{
List<String> statements = new ArrayList<>();
if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
log.info("Table[%s] already has column[upgraded_from_segment_id].", tableName);
} else {
log.info("Adding column[upgraded_from_segment_id] to table[%s].", tableName);
statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN upgraded_from_segment_id VARCHAR(255)", tableName));
}
if (tableHasColumn(tableName, "task_allocator_id")) {
log.info("Table[%s] already has column[task_allocator_id].", tableName);
} else {
log.info("Adding column[task_allocator_id] to table[%s].", tableName);
statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN task_allocator_id VARCHAR(255)", tableName));
}
if (!statements.isEmpty()) {
alterTable(tableName, statements);
}
}
public void createLogTable(final String tableName, final String entryTypeName)
{
createTable(

View File

@ -35,7 +35,9 @@ public class SegmentCreateRequestTest
"sequence",
null,
"version",
partialShardSpec
partialShardSpec,
null,
null
);
Assert.assertEquals("sequence", request.getSequenceName());
Assert.assertEquals("", request.getPreviousSegmentId());

View File

@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@ -471,44 +469,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
private Boolean insertPendingSegmentAndSequenceName(Pair<SegmentIdWithShardSpec, String> pendingSegmentSequenceName)
{
final SegmentIdWithShardSpec pendingSegment = pendingSegmentSequenceName.lhs;
final String sequenceName = pendingSegmentSequenceName.rhs;
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
return derbyConnector.retryWithHandle(
handle -> {
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ "sequence_name_prev_id_sha1, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+ ":sequence_name_prev_id_sha1, :payload)",
table,
derbyConnector.getQuoteString()
)
)
.bind("id", pendingSegment.toString())
.bind("dataSource", pendingSegment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", pendingSegment.getInterval().getStart().toString())
.bind("end", pendingSegment.getInterval().getEnd().toString())
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", pendingSegment.toString())
.bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
.putLong((long) pendingSegment.hashCode() * sequenceName.hashCode())
.hash()
.asBytes()
))
.bind("payload", mapper.writeValueAsBytes(pendingSegment))
.execute();
return true;
}
);
}
private Map<String, String> getSegmentsCommittedDuringReplaceTask(String taskId)
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable();
@ -620,7 +580,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// Commit the segment and verify the results
SegmentPublishResult commitResult
= coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock);
= coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, "append");
Assert.assertTrue(commitResult.isSuccess());
Assert.assertEquals(appendSegments, commitResult.getSegments());
@ -649,6 +609,30 @@ public class IndexerSQLMetadataStorageCoordinatorTest
final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap = new HashMap<>();
final PendingSegmentRecord pendingSegmentInInterval = new PendingSegmentRecord(
new SegmentIdWithShardSpec(
"foo",
Intervals.of("2023-01-01/2023-01-02"),
"2023-01-02",
new NumberedShardSpec(100, 0)
),
"",
"",
null,
"append"
);
final PendingSegmentRecord pendingSegmentOutsideInterval = new PendingSegmentRecord(
new SegmentIdWithShardSpec(
"foo",
Intervals.of("2023-04-01/2023-04-02"),
"2023-01-02",
new NumberedShardSpec(100, 0)
),
"",
"",
null,
"append"
);
for (int i = 1; i < 9; i++) {
final DataSegment segment = new DataSegment(
"foo",
@ -665,6 +649,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
appendedSegmentToReplaceLockMap.put(segment, replaceLock);
}
insertUsedSegments(segmentsAppendedWithReplaceLock);
derbyConnector.retryWithHandle(
handle -> coordinator.insertPendingSegmentsIntoMetastore(
handle,
ImmutableList.of(pendingSegmentInInterval, pendingSegmentOutsideInterval),
"foo",
true
)
);
insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap);
final Set<DataSegment> replacingSegments = new HashSet<>();
@ -709,6 +701,25 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
Assert.assertTrue(hasBeenCarriedForward);
}
List<PendingSegmentRecord> pendingSegmentsInInterval =
coordinator.getPendingSegments("foo", Intervals.of("2023-01-01/2023-02-01"));
Assert.assertEquals(2, pendingSegmentsInInterval.size());
final SegmentId rootPendingSegmentId = pendingSegmentInInterval.getId().asSegmentId();
if (pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId() == null) {
Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(0).getId().asSegmentId());
Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(1).getUpgradedFromSegmentId());
} else {
Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(1).getId().asSegmentId());
Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId());
}
List<PendingSegmentRecord> pendingSegmentsOutsideInterval =
coordinator.getPendingSegments("foo", Intervals.of("2023-04-01/2023-05-01"));
Assert.assertEquals(1, pendingSegmentsOutsideInterval.size());
Assert.assertEquals(
pendingSegmentOutsideInterval.getId().asSegmentId(), pendingSegmentsOutsideInterval.get(0).getId().asSegmentId()
);
}
@Test
@ -2416,7 +2427,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"version",
false
false,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
@ -2428,7 +2440,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
identifier.getVersion(),
false
false,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
@ -2440,7 +2453,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
identifier1.getVersion(),
false
false,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
@ -2452,7 +2466,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
identifier1.getVersion(),
false
false,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier3.toString());
@ -2465,7 +2480,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"version",
false
false,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3", identifier4.toString());
@ -2501,7 +2517,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"version",
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
// Since there are no used core partitions yet
@ -2515,7 +2532,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
// Since there are no used core partitions yet
@ -2529,7 +2547,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
// Since there are no used core partitions yet
@ -2559,7 +2578,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString());
// Used segment set has 1 core partition
@ -2577,7 +2597,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString());
// Since all core partitions have been dropped
@ -2610,7 +2631,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"A",
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", identifier.toString());
// Assume it publishes; create its corresponding segment
@ -2638,7 +2660,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier1.toString());
// Assume it publishes; create its corresponding segment
@ -2666,7 +2689,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", identifier2.toString());
// Assume it publishes; create its corresponding segment
@ -2720,7 +2744,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1", identifier3.toString());
// no corresponding segment, pending aborted
@ -2754,7 +2779,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
// maxid = B_1 -> new partno = 2
// versionofexistingchunk=A
@ -2791,7 +2817,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
final String sequenceName = "seq";
final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec);
final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments(
dataSource,
interval,
@ -2802,7 +2828,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString());
final SegmentCreateRequest request1 =
new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec);
new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments(
dataSource,
interval,
@ -2813,7 +2839,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString());
final SegmentCreateRequest request2 =
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments(
dataSource,
interval,
@ -2824,7 +2850,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString());
final SegmentCreateRequest request3 =
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments(
dataSource,
interval,
@ -2836,7 +2862,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(segmentId2, segmentId3);
final SegmentCreateRequest request4 =
new SegmentCreateRequest("seq1", null, "v1", partialShardSpec);
new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments(
dataSource,
interval,
@ -2880,7 +2906,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
maxVersion,
true
true,
null
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString());
@ -2905,7 +2932,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"version",
false
false,
null
);
prevSegmentId = identifier.toString();
}
@ -2920,7 +2948,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"version",
false
false,
null
);
prevSegmentId = identifier.toString();
}
@ -2947,7 +2976,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
new NumberedOverwritePartialShardSpec(0, 1, (short) (i + 1)),
"version",
false
false,
null
);
Assert.assertEquals(
StringUtils.format(
@ -3015,7 +3045,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"version",
true
true,
null
);
HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@ -3046,7 +3077,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
partialShardSpec,
"version",
true
true,
null
);
shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@ -3077,7 +3109,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
new HashBasedNumberedPartialShardSpec(null, 2, 3, null),
"version",
true
true,
null
);
shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec();
@ -3124,7 +3157,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
NumberedPartialShardSpec.instance(),
version,
false
false,
null
);
Assert.assertNull(id);
}
@ -3169,7 +3203,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
NumberedPartialShardSpec.instance(),
version,
false
false,
null
);
Assert.assertNull(id);
}
@ -3329,64 +3364,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
@Test
public void testGetPendingSegmentsForIntervalWithSequencePrefixes()
{
Pair<SegmentIdWithShardSpec, String> validIntervalValidSequence = Pair.of(
SegmentIdWithShardSpec.fromDataSegment(defaultSegment),
"validLOL"
);
insertPendingSegmentAndSequenceName(validIntervalValidSequence);
Pair<SegmentIdWithShardSpec, String> validIntervalInvalidSequence = Pair.of(
SegmentIdWithShardSpec.fromDataSegment(defaultSegment2),
"invalidRandom"
);
insertPendingSegmentAndSequenceName(validIntervalInvalidSequence);
Pair<SegmentIdWithShardSpec, String> invalidIntervalvalidSequence = Pair.of(
SegmentIdWithShardSpec.fromDataSegment(existingSegment1),
"validStuff"
);
insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence);
Pair<SegmentIdWithShardSpec, String> twentyFifteenWithAnotherValidSequence = Pair.of(
new SegmentIdWithShardSpec(
existingSegment1.getDataSource(),
Intervals.of("2015/2016"),
"1970-01-01",
new NumberedShardSpec(1, 0)
),
"alsoValidAgain"
);
insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence);
Pair<SegmentIdWithShardSpec, String> twentyFifteenWithInvalidSequence = Pair.of(
new SegmentIdWithShardSpec(
existingSegment1.getDataSource(),
Intervals.of("2015/2016"),
"1970-01-01",
new NumberedShardSpec(2, 0)
),
"definitelyInvalid"
);
insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence);
final Map<SegmentIdWithShardSpec, String> expected = new HashMap<>();
expected.put(validIntervalValidSequence.lhs, validIntervalValidSequence.rhs);
expected.put(twentyFifteenWithAnotherValidSequence.lhs, twentyFifteenWithAnotherValidSequence.rhs);
final Map<SegmentIdWithShardSpec, String> actual =
derbyConnector.retryWithHandle(handle -> coordinator.getPendingSegmentsForIntervalWithHandle(
handle,
defaultSegment.getDataSource(),
defaultSegment.getInterval(),
ImmutableSet.of("valid", "alsoValid")
));
Assert.assertEquals(expected, actual);
}
@Test
public void testRetrieveUsedSegmentsAndCreatedDates()
{
@ -3471,7 +3448,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
NumberedPartialShardSpec.instance(),
"version",
false
false,
null
);
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());
@ -3525,7 +3503,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
interval,
NumberedPartialShardSpec.instance(),
"version",
false
false,
null
);
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());