Check for handoff of upgraded segments (#16162)

Changes:
1) Check for handoff of upgraded realtime segments.
2) Drop sink only when all associated realtime segments have been abandoned.
3) Delete pending segments upon commit to prevent unnecessary upgrades and
partition space exhaustion when a concurrent replace happens. This also prevents
potential data duplication.
4) Register pending segment upgrade only on those tasks to which the segment is associated.
This commit is contained in:
AmatyaAvadhanula 2024-04-25 22:03:38 +05:30 committed by GitHub
parent 5061507541
commit 31eee7d51e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 587 additions and 398 deletions

View File

@ -298,12 +298,6 @@ public class MaterializedViewSupervisor implements Supervisor
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}
@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
throw new UnsupportedOperationException();
}
@Override
public int getActiveTaskGroupsCount()
{

View File

@ -207,8 +207,6 @@ public class MaterializedViewSupervisorSpecTest
Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount());
Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes());
Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()

View File

@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
@ -46,7 +45,7 @@ import java.util.Objects;
import java.util.Set;
@JsonTypeName(MSQWorkerTask.TYPE)
public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask
public class MSQWorkerTask extends AbstractTask
{
public static final String TYPE = "query_worker";
@ -126,13 +125,6 @@ public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocat
return ImmutableSet.of();
}
@Override
public String getTaskAllocatorId()
{
return getControllerTaskId();
}
@Override
public boolean isReady(final TaskActionClient taskActionClient)
{

View File

@ -108,12 +108,4 @@ public class MSQWorkerTaskTest
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}
@Test
public void testGetTaskAllocatorId()
{
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId());
}
}

View File

@ -34,13 +34,10 @@ import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -155,7 +152,7 @@ public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPubl
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
registerUpgradedPendingSegmentsOnSupervisor(task, toolbox);
registerUpgradedPendingSegmentsOnSupervisor(task, toolbox, publishResult.getUpgradedPendingSegments());
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
@ -168,7 +165,11 @@ public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPubl
/**
* Registers upgraded pending segments on the active supervisor, if any
*/
private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox)
private void registerUpgradedPendingSegmentsOnSupervisor(
Task task,
TaskActionToolbox toolbox,
List<PendingSegmentRecord> upgradedPendingSegments
)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
@ -178,42 +179,10 @@ public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPubl
return;
}
final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
.getTaskLockbox()
.getAllReplaceLocksForDatasource(task.getDataSource())
.stream()
.filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
.collect(Collectors.toSet());
Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
pendingSegments.addAll(
toolbox.getIndexerMetadataStorageCoordinator()
.getPendingSegments(task.getDataSource(), replaceLock.getInterval())
);
}
Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
pendingSegment.getId().asSegmentId().toString(),
pendingSegment.getId()
));
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
pendingSegments.forEach(pendingSegment -> {
if (pendingSegment.getUpgradedFromSegmentId() != null
&& !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) {
segmentToParent.put(
pendingSegment.getId(),
idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId())
);
}
});
segmentToParent.forEach(
(newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
upgradedPendingSegments.forEach(
upgradedPendingSegment -> supervisorManager.registerUpgradedPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
upgradedPendingSegment
)
);
}

View File

@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
@ -109,7 +108,7 @@ import java.util.stream.Collectors;
* generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of
* publishing on its own.
*/
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";
@ -240,12 +239,6 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
return subtaskSpecId;
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Override
public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
{

View File

@ -1242,7 +1242,7 @@ public class TaskLockbox
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
= metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId);
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
log.info(
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId

View File

@ -33,9 +33,9 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import javax.annotation.Nullable;
@ -308,16 +308,19 @@ public class SupervisorManager
* allows the supervisor to include the pending segment in queries fired against
* that segment version.
*/
public boolean registerNewVersionOfPendingSegmentOnSupervisor(
public boolean registerUpgradedPendingSegmentOnSupervisor(
String supervisorId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
PendingSegmentRecord upgradedPendingSegment
)
{
try {
Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment cannot be null");
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot be null");
Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending segment cannot be null");
Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), "taskAllocatorId cannot be null");
Preconditions.checkNotNull(
upgradedPendingSegment.getUpgradedFromSegmentId(),
"upgradedFromSegmentId cannot be null"
);
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
@ -326,12 +329,12 @@ public class SupervisorManager
}
SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor = (SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion);
seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment);
return true;
}
catch (Exception e) {
log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed",
basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId);
log.error(e, "Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].",
upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId);
}
return false;
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
/**
* Contains a new version of an existing base pending segment. Used by realtime
* tasks to serve queries against multiple versions of the same pending segment.
*/
public class PendingSegmentVersions
{
private final SegmentIdWithShardSpec baseSegment;
private final SegmentIdWithShardSpec newVersion;
@JsonCreator
public PendingSegmentVersions(
@JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment,
@JsonProperty("newVersion") SegmentIdWithShardSpec newVersion
)
{
this.baseSegment = baseSegment;
this.newVersion = newVersion;
}
@JsonProperty
public SegmentIdWithShardSpec getBaseSegment()
{
return baseSegment;
}
@JsonProperty
public SegmentIdWithShardSpec getNewVersion()
{
return newVersion;
}
}

View File

@ -21,8 +21,8 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import java.util.List;
@ -158,15 +158,14 @@ public interface SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetTy
* Update the task state to redirect queries for later versions to the root pending segment.
* The task also announces that it is serving the segments belonging to the subsequent versions.
* The update is processed only if the task is serving the original pending segment.
* @param taskId - task id
* @param basePendingSegment - the pending segment that was originally allocated
* @param newVersionOfSegment - the ids belonging to the versions to which the root segment needs to be updated
*
* @param taskId - task id
* @param pendingSegmentRecord - the ids belonging to the versions to which the root segment needs to be updated
* @return true if the update succeeds
*/
ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
String taskId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newVersionOfSegment
PendingSegmentRecord pendingSegmentRecord
);
Class<PartitionIdType> getPartitionType();

View File

@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
@ -57,7 +58,6 @@ import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
@ -197,13 +197,12 @@ public abstract class SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, Se
@Override
public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
String taskId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newVersionOfSegment
PendingSegmentRecord pendingSegmentRecord
)
{
final RequestBuilder requestBuilder
= new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
.jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment));
.jsonContent(jsonMapper, pendingSegmentRecord);
return makeRequest(taskId, requestBuilder)
.handler(IgnoreHttpResponseHandler.INSTANCE)

View File

@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@ -1575,18 +1576,15 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Path("/pendingSegmentVersion")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response registerNewVersionOfPendingSegment(
PendingSegmentVersions pendingSegmentVersions,
public Response registerUpgradedPendingSegment(
PendingSegmentRecord upgradedPendingSegment,
// this field is only for internal purposes, shouldn't be usually set by users
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.WRITE);
try {
((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(
pendingSegmentVersions.getBaseSegment(),
pendingSegmentVersions.getNewVersion()
);
((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment);
return Response.ok().build();
}
catch (DruidException e) {
@ -1598,8 +1596,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
catch (Exception e) {
log.error(
e,
"Could not register new version[%s] of pending segment[%s]",
pendingSegmentVersions.getNewVersion(), pendingSegmentVersions.getBaseSegment()
"Could not register pending segment[%s] upgraded from[%s]",
upgradedPendingSegment.getId().asSegmentId(), upgradedPendingSegment.getUpgradedFromSegmentId()
);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}

View File

@ -89,12 +89,12 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import javax.annotation.Nonnull;
@ -178,7 +178,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [activelyReadingTaskGroups]
* map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
*/
private class TaskGroup
@VisibleForTesting
public class TaskGroup
{
final int groupId;
@ -265,6 +266,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return tasks.keySet();
}
@VisibleForTesting
public String getBaseSequenceName()
{
return baseSequenceName;
}
}
private class TaskData
@ -1096,42 +1102,23 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
/**
* The base sequence name of a seekable stream task group is used as a prefix of the sequence names
* of pending segments published by it.
* This method can be used to identify the active pending segments for a datasource
* by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method
* @return the set of base sequence names of both active and pending completion task gruops.
*/
@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
final Set<String> activeBaseSequences = new HashSet<>();
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
activeBaseSequences.add(taskGroup.baseSequenceName);
}
}
return activeBaseSequences;
}
public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
PendingSegmentRecord pendingSegmentRecord
)
{
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
for (String taskId : taskGroup.taskIds()) {
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion);
if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) {
for (String taskId : taskGroup.taskIds()) {
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord);
}
}
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
for (String taskId : taskGroup.taskIds()) {
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion);
if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) {
for (String taskId : taskGroup.taskIds()) {
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord);
}
}
}
}
@ -1548,7 +1535,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@VisibleForTesting
public void addTaskGroupToActivelyReadingTaskGroup(
public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
int taskGroupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
Optional<DateTime> minMsgTime,
@ -1572,10 +1559,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroupId
);
}
return group;
}
@VisibleForTesting
public void addTaskGroupToPendingCompletionTaskGroup(
public TaskGroup addTaskGroupToPendingCompletionTaskGroup(
int taskGroupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
Optional<DateTime> minMsgTime,
@ -1595,6 +1583,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new CopyOnWriteArrayList<>())
.add(group);
return group;
}
@VisibleForTesting
@ -3202,9 +3191,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// If we received invalid endOffset values, we clear the known offset to refetch the last committed offset
// from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure.
if (!endOffsetsAreInvalid) {
for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
partitionOffsets.put(entry.getKey(), entry.getValue());
}
partitionOffsets.putAll(endOffsets);
} else {
for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
partitionOffsets.put(entry.getKey(), getNotSetMarker());

View File

@ -52,7 +52,7 @@ public class ActionsTestTask extends CommandQueueTask
{
private final TaskActionClient client;
private final AtomicInteger sequenceId = new AtomicInteger(0);
private final Map<SegmentId, SegmentId> announcedSegmentsToParentSegments = new HashMap<>();
private final Map<SegmentId, String> announcedSegmentsToParentSegments = new HashMap<>();
public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory)
{
@ -82,7 +82,7 @@ public class ActionsTestTask extends CommandQueueTask
);
}
public Map<SegmentId, SegmentId> getAnnouncedSegmentsToParentSegments()
public Map<SegmentId, String> getAnnouncedSegmentsToParentSegments()
{
return announcedSegmentsToParentSegments;
}
@ -114,7 +114,7 @@ public class ActionsTestTask extends CommandQueueTask
TaskLockType.APPEND
)
);
announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId());
announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId().toString());
return pendingSegment;
}

View File

@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@ -74,7 +75,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -83,7 +83,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -122,10 +121,9 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
private final AtomicInteger groupId = new AtomicInteger(0);
private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class);
private Capture<String> supervisorId;
private Capture<SegmentIdWithShardSpec> oldPendingSegment;
private Capture<SegmentIdWithShardSpec> newPendingSegment;
private Capture<PendingSegmentRecord> pendingSegment;
private Map<String, Map<Interval, Set<Object>>> versionToIntervalToLoadSpecs;
private Map<SegmentId, Object> parentSegmentToLoadSpec;
private Map<String, Object> parentSegmentToLoadSpec;
@Override
@Before
@ -169,12 +167,10 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
groupId.set(0);
appendTask = createAndStartTask();
supervisorId = Capture.newInstance(CaptureType.ALL);
oldPendingSegment = Capture.newInstance(CaptureType.ALL);
newPendingSegment = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
pendingSegment = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(supervisorManager.registerUpgradedPendingSegmentOnSupervisor(
EasyMock.capture(supervisorId),
EasyMock.capture(oldPendingSegment),
EasyMock.capture(newPendingSegment)
EasyMock.capture(pendingSegment)
)).andReturn(true).anyTimes();
replaceTask = createAndStartTask();
EasyMock.replay(supervisorManager);
@ -682,20 +678,6 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12);
}
@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)
{
for (DataSegment segment : segments) {
if (version.equals(segment.getVersion())
&& Objects.equals(segment.getLoadSpec(), loadSpec)) {
return segment;
}
}
return null;
}
private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
@ -739,23 +721,6 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
}
}
private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments)
{
try {
final TaskActionClient taskActionClient = taskActionClientFactory.create(task);
Collection<DataSegment> allUsedSegments = taskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
Collections.singletonList(interval)
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
}
catch (IOException e) {
throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval);
}
}
private TaskToolboxFactory createToolboxFactory(
TaskConfig taskConfig,
TaskActionClientFactory taskActionClientFactory
@ -799,11 +764,10 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
{
replaceTask.commitReplaceSegments(dataSegments);
for (int i = 0; i < supervisorId.getValues().size(); i++) {
announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i));
announceUpgradedPendingSegment(pendingSegment.getValues().get(i));
}
supervisorId.reset();
oldPendingSegment.reset();
newPendingSegment.reset();
pendingSegment.reset();
replaceTask.finishRunAndGetStatus();
}
@ -812,19 +776,16 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments);
result.getSegments().forEach(this::unannounceUpgradedPendingSegment);
for (DataSegment segment : dataSegments) {
parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values()));
parentSegmentToLoadSpec.put(segment.getId().toString(), Iterables.getOnlyElement(segment.getLoadSpec().values()));
}
appendTask.finishRunAndGetStatus();
return result;
}
private void announceUpgradedPendingSegment(
SegmentIdWithShardSpec oldPendingSegment,
SegmentIdWithShardSpec newPendingSegment
)
private void announceUpgradedPendingSegment(PendingSegmentRecord pendingSegment)
{
appendTask.getAnnouncedSegmentsToParentSegments()
.put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId());
.put(pendingSegment.getId().asSegmentId(), pendingSegment.getUpgradedFromSegmentId());
}
private void unannounceUpgradedPendingSegment(
@ -849,7 +810,7 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase
loadSpecs.add(loadSpec);
}
for (Map.Entry<SegmentId, SegmentId> entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
for (Map.Entry<SegmentId, String> entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
final String version = entry.getKey().getVersion();
final Interval interval = entry.getKey().getInterval();
final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue());

View File

@ -1948,8 +1948,8 @@ public class TaskLockboxTest
// Only the replaceTask should attempt a delete on the upgradeSegments table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
// Any task may attempt pending segment clean up
EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once();
EasyMock.replay(coordinator);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);

View File

@ -31,7 +31,11 @@ import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMeta
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@ -544,6 +548,53 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
}
@Test
public void testRegisterUpgradedPendingSegmentOnSupervisor()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS"));
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec streamingSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
SeekableStreamSupervisor streamSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
EasyMock.expectLastCall().once();
EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes();
EasyMock.expect(streamingSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(streamingSpec.getDataSources()).andReturn(ImmutableList.of("DS")).anyTimes();
EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes();
EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes();
EasyMock.replay(streamingSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
EasyMock.expectLastCall().once();
replayAll();
final PendingSegmentRecord pendingSegment = new PendingSegmentRecord(
new SegmentIdWithShardSpec(
"DS",
Intervals.ETERNITY,
"version",
new NumberedShardSpec(0, 0)
),
"sequenceName",
"prevSegmentId",
"upgradedFromSegmentId",
"taskAllocatorId"
);
manager.start();
manager.createOrUpdateAndStartSupervisor(noopSpec);
Assert.assertFalse(manager.registerUpgradedPendingSegmentOnSupervisor("noop", pendingSegment));
manager.createOrUpdateAndStartSupervisor(streamingSpec);
Assert.assertTrue(manager.registerUpgradedPendingSegmentOnSupervisor("sss", pendingSegment));
verifyAll();
}
private static class TestSupervisorSpec implements SupervisorSpec
{
private final String id;

View File

@ -74,11 +74,13 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -86,6 +88,10 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.MatcherAssert;
@ -1548,10 +1554,19 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testGetActiveRealtimeSequencePrefixes()
public void testRegisterNewVersionOfPendingSegment()
{
EasyMock.expect(spec.isSuspended()).andReturn(false);
Capture<PendingSegmentRecord> captured0 = Capture.newInstance(CaptureType.FIRST);
Capture<PendingSegmentRecord> captured1 = Capture.newInstance(CaptureType.FIRST);
EasyMock.expect(
indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task0"), EasyMock.capture(captured0))
).andReturn(Futures.immediateFuture(true));
EasyMock.expect(
indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task2"), EasyMock.capture(captured1))
).andReturn(Futures.immediateFuture(true));
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
@ -1559,34 +1574,63 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
// Spin off two active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(3);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
final SeekableStreamSupervisor.TaskGroup taskGroup0 = supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task0"),
ImmutableSet.of()
);
final SeekableStreamSupervisor.TaskGroup taskGroup1 = supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
final SeekableStreamSupervisor.TaskGroup taskGroup2 = supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task3"),
ImmutableSet.of()
final PendingSegmentRecord pendingSegmentRecord0 = new PendingSegmentRecord(
new SegmentIdWithShardSpec(
"DS",
Intervals.of("2024/2025"),
"2024",
new NumberedShardSpec(1, 0)
),
taskGroup0.getBaseSequenceName(),
"prevId0",
"someAppendedSegment0",
taskGroup0.getBaseSequenceName()
);
final PendingSegmentRecord pendingSegmentRecord1 = new PendingSegmentRecord(
new SegmentIdWithShardSpec(
"DS",
Intervals.of("2024/2025"),
"2024",
new NumberedShardSpec(2, 0)
),
taskGroup2.getBaseSequenceName(),
"prevId1",
"someAppendedSegment1",
taskGroup2.getBaseSequenceName()
);
Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size());
supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord0);
supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord1);
Assert.assertEquals(pendingSegmentRecord0, captured0.getValue());
Assert.assertEquals(pendingSegmentRecord1, captured1.getValue());
verifyAll();
}
@Test

View File

@ -252,11 +252,11 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
}
@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
public List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
)
{
return Collections.emptyMap();
return Collections.emptyList();
}
@Override
@ -297,7 +297,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
}
@Override
public int deletePendingSegmentsForTaskGroup(final String taskGroup)
public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup)
{
throw new UnsupportedOperationException();
}

View File

@ -389,9 +389,9 @@ public interface IndexerMetadataStorageCoordinator
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @return Map from originally allocated pending segment to its new upgraded ID.
* @return List of inserted pending segment records
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
);
@ -495,7 +495,7 @@ public interface IndexerMetadataStorageCoordinator
* @param taskAllocatorId task id / task group / replica group for an appending task
* @return number of pending segments deleted from the metadata store
*/
int deletePendingSegmentsForTaskGroup(String taskAllocatorId);
int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId);
/**
* Fetches all the pending segments of the datasource that overlap with a given interval.

View File

@ -23,10 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -46,12 +49,19 @@ public class SegmentPublishResult
private final boolean success;
@Nullable
private final String errorMsg;
@Nullable
private final List<PendingSegmentRecord> upgradedPendingSegments;
public static SegmentPublishResult ok(Set<DataSegment> segments)
{
return new SegmentPublishResult(segments, true, null);
}
public static SegmentPublishResult ok(Set<DataSegment> segments, List<PendingSegmentRecord> upgradedPendingSegments)
{
return new SegmentPublishResult(segments, true, null, upgradedPendingSegments);
}
public static SegmentPublishResult fail(String errorMsg)
{
return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
@ -63,13 +73,28 @@ public class SegmentPublishResult
@JsonProperty("success") boolean success,
@JsonProperty("errorMsg") @Nullable String errorMsg
)
{
this(segments, success, errorMsg, null);
}
private SegmentPublishResult(
Set<DataSegment> segments,
boolean success,
@Nullable String errorMsg,
List<PendingSegmentRecord> upgradedPendingSegments
)
{
this.segments = Preconditions.checkNotNull(segments, "segments");
this.success = success;
this.errorMsg = errorMsg;
this.upgradedPendingSegments = upgradedPendingSegments;
if (!success) {
Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes");
Preconditions.checkArgument(
CollectionUtils.isNullOrEmpty(upgradedPendingSegments),
"upgraded pending segments must be null or empty for unsuccessful publishes"
);
}
}
@ -92,6 +117,12 @@ public class SegmentPublishResult
return errorMsg;
}
@Nullable
public List<PendingSegmentRecord> getUpgradedPendingSegments()
{
return upgradedPendingSegments;
}
@Override
public boolean equals(Object o)
{

View File

@ -31,7 +31,6 @@ import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -186,12 +185,6 @@ public class NoopSupervisorSpec implements SupervisorSpec
{
return -1;
}
@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
return Collections.emptySet();
}
};
}

View File

@ -29,7 +29,6 @@ import org.apache.druid.segment.incremental.ParseExceptionReport;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface Supervisor
{
@ -103,9 +102,4 @@ public interface Supervisor
}
int getActiveTaskGroupsCount();
/**
* @return active sequence prefixes for reading and pending completion task groups of a seekable stream supervisor
*/
Set<String> getActiveRealtimeSequencePrefixes();
}

View File

@ -79,6 +79,7 @@ import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;
@ -530,9 +531,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
segmentSchemaMapping,
upgradeSegmentMetadata,
Collections.emptyMap()
)
),
upgradePendingSegmentsOverlappingWith(segmentsToInsert)
);
upgradePendingSegmentsOverlappingWith(segmentsToInsert);
return result;
},
3,
@ -735,12 +736,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
public List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments
)
{
if (replaceSegments.isEmpty()) {
return Collections.emptyMap();
return Collections.emptyList();
}
// Any replace interval has exactly one version of segments
@ -769,16 +770,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
* those versions.</li>
* </ul>
*
* @return Map from original pending segment to the new upgraded ID.
* @return Inserted pending segment records
*/
private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegments(
private List<PendingSegmentRecord> upgradePendingSegments(
Handle handle,
String datasource,
Map<Interval, DataSegment> replaceIntervalToMaxId
) throws JsonProcessingException
{
final List<PendingSegmentRecord> upgradedPendingSegments = new ArrayList<>();
final Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> pendingSegmentToNewId = new HashMap<>();
for (Map.Entry<Interval, DataSegment> entry : replaceIntervalToMaxId.entrySet()) {
final Interval replaceInterval = entry.getKey();
@ -813,7 +813,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
overlappingPendingSegment.getTaskAllocatorId()
)
);
pendingSegmentToNewId.put(pendingSegmentId, newId);
}
}
}
@ -831,7 +830,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
numInsertedPendingSegments, upgradedPendingSegments.size()
);
return pendingSegmentToNewId;
return upgradedPendingSegments;
}
private boolean shouldUpgradePendingSegment(
@ -1114,8 +1113,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
// always insert empty previous sequence id
insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1,
taskAllocatorId
insertPendingSegmentIntoMetastore(
handle,
newIdentifier,
dataSource,
interval,
"",
sequenceName,
sequenceNamePrevIdSha1,
taskAllocatorId
);
log.info(
@ -1320,6 +1326,39 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
private static void bindColumnValuesToQueryWithInCondition(
final String columnName,
final List<String> values,
final Update query
)
{
if (values == null) {
return;
}
for (int i = 0; i < values.size(); i++) {
query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
}
}
private int deletePendingSegmentsById(Handle handle, String datasource, List<String> pendingSegmentIds)
{
if (pendingSegmentIds.isEmpty()) {
return 0;
}
Update query = handle.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE dataSource = :dataSource %s",
dbTables.getPendingSegmentsTable(),
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", pendingSegmentIds)
)
).bind("dataSource", datasource);
bindColumnValuesToQueryWithInCondition("id", pendingSegmentIds, query);
return query.execute();
}
private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@ -1383,7 +1422,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (metadataUpdateResult.isFailed()) {
transactionStatus.setRollbackOnly();
metadataNotUpdated.set(true);
if (metadataUpdateResult.canRetry()) {
throw new RetryTransactionException(metadataUpdateResult.getErrorMsg());
} else {
@ -1393,6 +1431,20 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
// Delete the pending segments to be committed in this transaction in batches of at most 100
final List<List<String>> pendingSegmentIdBatches = Lists.partition(
allSegmentsToInsert.stream()
.map(pendingSegment -> pendingSegment.getId().toString())
.collect(Collectors.toList()),
100
);
int numDeletedPendingSegments = 0;
for (List<String> pendingSegmentIdBatch : pendingSegmentIdBatches) {
numDeletedPendingSegments += deletePendingSegmentsById(handle, dataSource, pendingSegmentIdBatch);
}
log.info("Deleted [%d] entries from pending segments table upon commit.", numDeletedPendingSegments);
return SegmentPublishResult.ok(
insertSegments(
handle,
@ -2761,7 +2813,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
@Override
public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup)
public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle

View File

@ -19,6 +19,8 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
@ -50,12 +52,13 @@ public class PendingSegmentRecord
private final String upgradedFromSegmentId;
private final String taskAllocatorId;
@JsonCreator
public PendingSegmentRecord(
SegmentIdWithShardSpec id,
String sequenceName,
String sequencePrevId,
@Nullable String upgradedFromSegmentId,
@Nullable String taskAllocatorId
@JsonProperty("id") SegmentIdWithShardSpec id,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("sequencePrevId") String sequencePrevId,
@JsonProperty("upgradedFromSegmentId") @Nullable String upgradedFromSegmentId,
@JsonProperty("taskAllocatorId") @Nullable String taskAllocatorId
)
{
this.id = id;
@ -65,16 +68,19 @@ public class PendingSegmentRecord
this.taskAllocatorId = taskAllocatorId;
}
@JsonProperty
public SegmentIdWithShardSpec getId()
{
return id;
}
@JsonProperty
public String getSequenceName()
{
return sequenceName;
}
@JsonProperty
public String getSequencePrevId()
{
return sequencePrevId;
@ -85,6 +91,7 @@ public class PendingSegmentRecord
* Can be null for pending segments allocated before this column was added or for segments that have not been upgraded.
*/
@Nullable
@JsonProperty
public String getUpgradedFromSegmentId()
{
return upgradedFromSegmentId;
@ -95,6 +102,7 @@ public class PendingSegmentRecord
* Can be null for pending segments allocated before this column was added.
*/
@Nullable
@JsonProperty
public String getTaskAllocatorId()
{
return taskAllocatorId;

View File

@ -955,7 +955,7 @@ public class SqlSegmentsMetadataQuery
*
* @implNote JDBI 3.x has better support for binding {@code IN} clauses directly.
*/
private static String getParameterizedInConditionForColumn(final String columnName, final List<String> values)
static String getParameterizedInConditionForColumn(final String columnName, final List<String> values)
{
if (values == null) {
return "";

View File

@ -563,7 +563,8 @@ public abstract class BaseAppenderatorDriver implements Closeable
return new SegmentsAndCommitMetadata(
segmentsAndCommitMetadata.getSegments(),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(),
segmentsAndCommitMetadata.getSegmentSchemaMapping()
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
segmentsAndCommitMetadata.getUpgradedSegments()
);
},
MoreExecutors.directExecutor()
@ -618,9 +619,10 @@ public abstract class BaseAppenderatorDriver implements Closeable
return executor.submit(
() -> {
try {
RetryUtils.retry(
return RetryUtils.retry(
() -> {
try {
final Set<DataSegment> upgradedSegments = new HashSet<>();
final ImmutableSet<DataSegment> ourSegments = ImmutableSet.copyOf(pushedAndTombstones);
final SegmentPublishResult publishResult = publisher.publishSegments(
segmentsToBeOverwritten,
@ -629,7 +631,6 @@ public abstract class BaseAppenderatorDriver implements Closeable
callerMetadata,
segmentsAndCommitMetadata.getSegmentSchemaMapping()
);
if (publishResult.isSuccess()) {
log.info(
"Published [%s] segments with commit metadata [%s]",
@ -637,6 +638,13 @@ public abstract class BaseAppenderatorDriver implements Closeable
callerMetadata
);
log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments");
// This set must contain only those segments that were upgraded as a result of a concurrent replace.
upgradedSegments.addAll(publishResult.getSegments());
segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove);
if (!upgradedSegments.isEmpty()) {
log.info("Published [%d] upgraded segments.", upgradedSegments.size());
log.infoSegments(upgradedSegments, "Upgraded segments");
}
log.info("Published segment schemas: [%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping());
} else {
// Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active
@ -691,6 +699,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
throw new ISE("Failed to publish segments");
}
}
return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);
}
catch (Exception e) {
// Must not remove segments here, we aren't sure if our transaction succeeded or not.
@ -703,9 +712,10 @@ public abstract class BaseAppenderatorDriver implements Closeable
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
return segmentsAndCommitMetadata;
},
e -> (e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store. The new start metadata is ahead of last commited end state.")),
e -> (e != null && e.getMessage() != null
&& e.getMessage().contains("Failed to update the metadata Store."
+ " The new start metadata is ahead of last commited end state.")),
RetryUtils.DEFAULT_MAX_TRIES
);
}
@ -717,7 +727,6 @@ public abstract class BaseAppenderatorDriver implements Closeable
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
return segmentsAndCommitMetadata;
}
);
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
@ -28,26 +29,59 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class SegmentsAndCommitMetadata
{
private static final SegmentsAndCommitMetadata NIL = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null);
private static final SegmentsAndCommitMetadata NIL
= new SegmentsAndCommitMetadata(Collections.emptyList(), null, null, null);
private final Object commitMetadata;
private final ImmutableList<DataSegment> segments;
private final SegmentSchemaMapping segmentSchemaMapping;
private final ImmutableSet<DataSegment> upgradedSegments;
public SegmentsAndCommitMetadata(
List<DataSegment> segments,
Object commitMetadata
)
{
this(segments, commitMetadata, null, null);
}
public SegmentsAndCommitMetadata(
List<DataSegment> segments,
Object commitMetadata,
SegmentSchemaMapping segmentSchemaMapping
)
{
this(segments, commitMetadata, segmentSchemaMapping, null);
}
public SegmentsAndCommitMetadata(
List<DataSegment> segments,
@Nullable Object commitMetadata,
@Nullable SegmentSchemaMapping segmentSchemaMapping
@Nullable SegmentSchemaMapping segmentSchemaMapping,
@Nullable Set<DataSegment> upgradedSegments
)
{
this.segments = ImmutableList.copyOf(segments);
this.commitMetadata = commitMetadata;
this.upgradedSegments = upgradedSegments == null ? null : ImmutableSet.copyOf(upgradedSegments);
this.segmentSchemaMapping = segmentSchemaMapping;
}
public SegmentsAndCommitMetadata withUpgradedSegments(Set<DataSegment> upgradedSegments)
{
return new SegmentsAndCommitMetadata(
this.segments,
this.commitMetadata,
this.segmentSchemaMapping,
upgradedSegments
);
}
@Nullable
public Object getCommitMetadata()
{
@ -59,6 +93,15 @@ public class SegmentsAndCommitMetadata
return segments;
}
/**
* @return the set of extra upgraded segments committed due to a concurrent replace.
*/
@Nullable
public Set<DataSegment> getUpgradedSegments()
{
return upgradedSegments;
}
public SegmentSchemaMapping getSegmentSchemaMapping()
{
return segmentSchemaMapping;
@ -75,13 +118,15 @@ public class SegmentsAndCommitMetadata
}
SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o;
return Objects.equals(commitMetadata, that.commitMetadata) &&
Objects.equals(upgradedSegments, that.upgradedSegments) &&
Objects.equals(segmentSchemaMapping, that.segmentSchemaMapping) &&
Objects.equals(segments, that.segments);
}
@Override
public int hashCode()
{
return Objects.hash(commitMetadata, segments);
return Objects.hash(commitMetadata, segments, upgradedSegments, segmentSchemaMapping);
}
@Override
@ -90,6 +135,8 @@ public class SegmentsAndCommitMetadata
return getClass().getSimpleName() + "{" +
"commitMetadata=" + commitMetadata +
", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", upgradedSegments=" + SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) +
", segmentSchemaMapping=" + segmentSchemaMapping +
'}';
}

View File

@ -356,13 +356,13 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
}
}
public void registerNewVersionOfPendingSegment(
public void registerUpgradedPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
SegmentIdWithShardSpec upgradedPendingSegment
)
{
newIdToBasePendingSegment.put(
newSegmentVersion.asSegmentId().toDescriptor(),
upgradedPendingSegment.asSegmentId().toDescriptor(),
basePendingSegment.asSegmentId().toDescriptor()
);
}

View File

@ -51,6 +51,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
@ -147,6 +148,7 @@ public class StreamAppenderator implements Appenderator
* of any thread from {@link #drop}.
*/
private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SegmentIdWithShardSpec> idToPendingSegment = new ConcurrentHashMap<>();
private final Set<SegmentIdWithShardSpec> droppingSinks = Sets.newConcurrentHashSet();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
private final long maxBytesTuningConfig;
@ -166,8 +168,25 @@ public class StreamAppenderator implements Appenderator
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
/**
* Map from base segment identifier of a sink to the set of all the segment ids associated with it.
* The set contains the base segment itself and its upgraded versions announced as a result of a concurrent replace.
* The map contains all the available sinks' identifiers in its keyset.
*/
private final ConcurrentMap<SegmentIdWithShardSpec, Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedSegments
= new ConcurrentHashMap<>();
/**
* Map from the id of an upgraded pending segment to the segment corresponding to its upgradedFromSegmentId.
*/
private final ConcurrentMap<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedSegmentToBaseSegment
= new ConcurrentHashMap<>();
/**
* Set of all segment identifiers that have been marked to be abandoned.
* This is used to determine if all the segments corresponding to a sink have been abandoned and it can be dropped.
*/
private final ConcurrentHashMap.KeySetView<SegmentIdWithShardSpec, Boolean> abandonedSegments
= ConcurrentHashMap.newKeySet();
private final SinkSchemaAnnouncer sinkSchemaAnnouncer;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
@ -527,9 +546,7 @@ public class StreamAppenderator implements Appenderator
.emit();
}
sinks.put(identifier, retVal);
metrics.setSinkCount(sinks.size());
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal));
addSink(identifier, retVal);
}
return retVal;
@ -1058,14 +1075,7 @@ public class StreamAppenderator implements Appenderator
log.debug("Shutting down immediately...");
for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
try {
unannounceAllVersionsOfSegment(entry.getValue().getSegment());
}
catch (Exception e) {
log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource())
.addData("identifier", entry.getKey().toString())
.emit();
}
unannounceAllVersionsOfSegment(entry.getValue().getSegment(), entry.getValue());
}
try {
shutdownExecutors();
@ -1098,61 +1108,78 @@ public class StreamAppenderator implements Appenderator
/**
* Unannounces the given base segment and all its upgraded versions.
*/
private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws IOException
private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink)
{
segmentAnnouncer.unannounceSegment(baseSegment);
synchronized (sink) {
final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment);
if (!baseSegmentToUpgradedSegments.containsKey(baseId)) {
return;
}
final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
= baseSegmentToUpgradedVersions.remove(baseSegment.getId());
if (upgradedVersionsOfSegment == null || upgradedVersionsOfSegment.isEmpty()) {
return;
}
for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
final DataSegment newSegment = new DataSegment(
newId.getDataSource(),
newId.getInterval(),
newId.getVersion(),
baseSegment.getLoadSpec(),
baseSegment.getDimensions(),
baseSegment.getMetrics(),
newId.getShardSpec(),
baseSegment.getBinaryVersion(),
baseSegment.getSize()
);
segmentAnnouncer.unannounceSegment(newSegment);
final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment = baseSegmentToUpgradedSegments.remove(baseId);
for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
final DataSegment newSegment = new DataSegment(
newId.getDataSource(),
newId.getInterval(),
newId.getVersion(),
baseSegment.getLoadSpec(),
baseSegment.getDimensions(),
baseSegment.getMetrics(),
newId.getShardSpec(),
baseSegment.getBinaryVersion(),
baseSegment.getSize()
);
unannounceSegment(newSegment);
upgradedSegmentToBaseSegment.remove(newId);
}
}
}
public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
) throws IOException
private void unannounceSegment(DataSegment segment)
{
try {
segmentAnnouncer.unannounceSegment(segment);
}
catch (Exception e) {
log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource())
.addData("identifier", segment.getId().toString())
.emit();
}
}
public void registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException
{
SegmentIdWithShardSpec basePendingSegment = idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId());
SegmentIdWithShardSpec upgradedPendingSegment = pendingSegmentRecord.getId();
if (!sinks.containsKey(basePendingSegment) || droppingSinks.contains(basePendingSegment)) {
return;
}
// Update query mapping with SinkQuerySegmentWalker
((SinkQuerySegmentWalker) texasRanger).registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion);
((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(basePendingSegment, upgradedPendingSegment);
// Announce segments
final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment();
final DataSegment newSegment = getUpgradedSegment(baseSegment, upgradedPendingSegment);
final DataSegment newSegment = new DataSegment(
newSegmentVersion.getDataSource(),
newSegmentVersion.getInterval(),
newSegmentVersion.getVersion(),
segmentAnnouncer.announceSegment(newSegment);
baseSegmentToUpgradedSegments.get(basePendingSegment).add(upgradedPendingSegment);
upgradedSegmentToBaseSegment.put(upgradedPendingSegment, basePendingSegment);
}
private DataSegment getUpgradedSegment(DataSegment baseSegment, SegmentIdWithShardSpec upgradedVersion)
{
return new DataSegment(
upgradedVersion.getDataSource(),
upgradedVersion.getInterval(),
upgradedVersion.getVersion(),
baseSegment.getLoadSpec(),
baseSegment.getDimensions(),
baseSegment.getMetrics(),
newSegmentVersion.getShardSpec(),
upgradedVersion.getShardSpec(),
baseSegment.getBinaryVersion(),
baseSegment.getSize()
);
segmentAnnouncer.announceSegment(newSegment);
baseSegmentToUpgradedVersions.computeIfAbsent(basePendingSegment.asSegmentId(), id -> new HashSet<>())
.add(newSegmentVersion);
}
private void lockBasePersistDirectory()
@ -1367,13 +1394,8 @@ public class StreamAppenderator implements Appenderator
hydrants
);
rowsSoFar += currSink.getNumRows();
sinks.put(identifier, currSink);
sinkTimeline.add(
currSink.getInterval(),
currSink.getVersion(),
identifier.getShardSpec().createChunk(currSink)
);
addSink(identifier, currSink);
segmentAnnouncer.announceSegment(currSink.getSegment());
}
catch (IOException e) {
@ -1396,12 +1418,49 @@ public class StreamAppenderator implements Appenderator
return committed.getMetadata();
}
/**
* Update the state of the appenderator when adding a sink.
*
* @param identifier sink identifier
* @param sink sink to be added
*/
private void addSink(SegmentIdWithShardSpec identifier, Sink sink)
{
sinks.put(identifier, sink);
// Asoociate the base segment of a sink with its string identifier
// Needed to get the base segment using upgradedFromSegmentId of a pending segment
idToPendingSegment.put(identifier.asSegmentId().toString(), identifier);
// The base segment is associated with itself in the maps to maintain all the upgraded ids of a sink.
baseSegmentToUpgradedSegments.put(identifier, new HashSet<>());
baseSegmentToUpgradedSegments.get(identifier).add(identifier);
sinkTimeline.add(
sink.getInterval(),
sink.getVersion(),
identifier.getShardSpec().createChunk(sink)
);
}
private ListenableFuture<?> abandonSegment(
final SegmentIdWithShardSpec identifier,
final Sink sink,
final boolean removeOnDiskData
)
{
abandonedSegments.add(identifier);
final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier);
synchronized (sink) {
if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) {
Set<SegmentIdWithShardSpec> relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier));
relevantSegments.removeAll(abandonedSegments);
// If there are unabandoned segments associated with the sink, return early
// This may be the case if segments have been upgraded as the result of a concurrent replace
if (!relevantSegments.isEmpty()) {
return Futures.immediateFuture(null);
}
}
}
// Ensure no future writes will be made to this sink.
if (sink.finishWriting()) {
// Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement,
@ -1419,7 +1478,7 @@ public class StreamAppenderator implements Appenderator
}
// Mark this identifier as dropping, so no future push tasks will pick it up.
droppingSinks.add(identifier);
droppingSinks.add(baseIdentifier);
// Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform(
@ -1430,8 +1489,8 @@ public class StreamAppenderator implements Appenderator
@Override
public Void apply(@Nullable Object input)
{
if (!sinks.remove(identifier, sink)) {
log.error("Sink for segment[%s] no longer valid, not abandoning.", identifier);
if (!sinks.remove(baseIdentifier, sink)) {
log.error("Sink for segment[%s] no longer valid, not abandoning.", baseIdentifier);
return null;
}
@ -1439,17 +1498,17 @@ public class StreamAppenderator implements Appenderator
if (removeOnDiskData) {
// Remove this segment from the committed list. This must be done from the persist thread.
log.debug("Removing commit metadata for segment[%s].", identifier);
log.debug("Removing commit metadata for segment[%s].", baseIdentifier);
try {
commitLock.lock();
final Committed oldCommit = readCommit();
if (oldCommit != null) {
writeCommit(oldCommit.without(identifier.toString()));
writeCommit(oldCommit.without(baseIdentifier.toString()));
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to update committed segments[%s]", schema.getDataSource())
.addData("identifier", identifier.toString())
.addData("identifier", baseIdentifier.toString())
.emit();
throw new RuntimeException(e);
}
@ -1458,22 +1517,14 @@ public class StreamAppenderator implements Appenderator
}
}
// Unannounce the segment.
try {
unannounceAllVersionsOfSegment(sink.getSegment());
}
catch (Exception e) {
log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource())
.addData("identifier", identifier.toString())
.emit();
}
unannounceAllVersionsOfSegment(sink.getSegment(), sink);
Runnable removeRunnable = () -> {
droppingSinks.remove(identifier);
droppingSinks.remove(baseIdentifier);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
identifier.getShardSpec().createChunk(sink)
baseIdentifier.getShardSpec().createChunk(sink)
);
for (FireHydrant hydrant : sink) {
if (cache != null) {
@ -1483,7 +1534,7 @@ public class StreamAppenderator implements Appenderator
}
if (removeOnDiskData) {
removeDirectory(computePersistDir(identifier));
removeDirectory(computePersistDir(baseIdentifier));
}
log.info("Dropped segment[%s].", identifier);

View File

@ -51,10 +51,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -322,10 +324,14 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
return Futures.immediateFuture(null);
} else {
final List<SegmentIdWithShardSpec> waitingSegmentIdList = segmentsAndCommitMetadata.getSegments().stream()
.map(
SegmentIdWithShardSpec::fromDataSegment)
.collect(Collectors.toList());
final Set<DataSegment> segmentsToBeHandedOff = new HashSet<>(segmentsAndCommitMetadata.getSegments());
if (segmentsAndCommitMetadata.getUpgradedSegments() != null) {
segmentsToBeHandedOff.addAll(segmentsAndCommitMetadata.getUpgradedSegments());
}
final List<SegmentIdWithShardSpec> waitingSegmentIdList =
segmentsToBeHandedOff.stream()
.map(SegmentIdWithShardSpec::fromDataSegment)
.collect(Collectors.toList());
final Object metadata = Preconditions.checkNotNull(segmentsAndCommitMetadata.getCommitMetadata(), "commitMetadata");
if (waitingSegmentIdList.isEmpty()) {
@ -333,7 +339,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
new SegmentsAndCommitMetadata(
segmentsAndCommitMetadata.getSegments(),
((AppenderatorDriverMetadata) metadata).getCallerMetadata(),
segmentsAndCommitMetadata.getSegmentSchemaMapping()
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
segmentsAndCommitMetadata.getUpgradedSegments()
)
);
}
@ -365,8 +372,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
public void onSuccess(Object result)
{
if (numRemainingHandoffSegments.decrementAndGet() == 0) {
List<DataSegment> segments = segmentsAndCommitMetadata.getSegments();
log.info("Successfully handed off [%d] segments.", segments.size());
log.info("Successfully handed off [%d] segments.", segmentsToBeHandedOff.size());
final long handoffTotalTime = System.currentTimeMillis() - handoffStartTime;
metrics.reportMaxSegmentHandoffTime(handoffTotalTime);
if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) {
@ -375,9 +381,10 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
}
resultFuture.set(
new SegmentsAndCommitMetadata(
segments,
segmentsAndCommitMetadata.getSegments(),
((AppenderatorDriverMetadata) metadata).getCallerMetadata(),
segmentsAndCommitMetadata.getSegmentSchemaMapping()
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
segmentsAndCommitMetadata.getUpgradedSegments()
)
);
}

View File

@ -201,7 +201,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage(
"Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]"
"Fail test while dropping segment"
);
driver = new StreamAppenderatorDriver(
@ -221,10 +221,8 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
Assert.assertNull(driver.startJob(null));
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}
committerSupplier.setMetadata(1);
Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier, false, true).isOk());
final SegmentsAndCommitMetadata published = driver.publish(
StreamAppenderatorDriverTest.makeOkPublisher(),

View File

@ -43,6 +43,7 @@ import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@ -58,6 +59,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -72,6 +74,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final String VERSION = "abc123";
private static final String UPGRADED_VERSION = "xyz456";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
private static final int MAX_ROWS_IN_MEMORY = 100;
private static final int MAX_ROWS_PER_SEGMENT = 3;
@ -246,6 +249,44 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
@Test
public void testHandoffUpgradedSegments()
throws IOException, InterruptedException, TimeoutException, ExecutionException
{
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
Assert.assertNull(driver.startJob(null));
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}
driver.persist(committerSupplier.get());
// There is no remaining rows in the driver, and thus the result must be empty
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = driver.publishAndRegisterHandoff(
makeUpgradingPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
Assert.assertNotNull(segmentsAndCommitMetadata.getUpgradedSegments());
Assert.assertEquals(
segmentsAndCommitMetadata.getSegments().size(),
segmentsAndCommitMetadata.getUpgradedSegments().size()
);
Set<SegmentDescriptor> expectedHandedOffSegments = new HashSet<>();
for (DataSegment segment : segmentsAndCommitMetadata.getSegments()) {
expectedHandedOffSegments.add(segment.toDescriptor());
}
for (DataSegment segment : segmentsAndCommitMetadata.getUpgradedSegments()) {
expectedHandedOffSegments.add(segment.toDescriptor());
}
Assert.assertEquals(expectedHandedOffSegments, segmentHandoffNotifierFactory.getHandedOffSegmentDescriptors());
}
@Test
public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException
{
@ -379,6 +420,29 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
SegmentPublishResult.ok(Collections.emptySet());
}
private TransactionalSegmentPublisher makeUpgradingPublisher()
{
return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> {
Set<DataSegment> allSegments = new HashSet<>(segmentsToPublish);
int id = 0;
for (DataSegment segment : segmentsToPublish) {
DataSegment upgradedSegment = new DataSegment(
SegmentId.of(DATA_SOURCE, Intervals.ETERNITY, UPGRADED_VERSION, id),
segment.getLoadSpec(),
segment.getDimensions(),
segment.getMetrics(),
new NumberedShardSpec(id, 0),
null,
segment.getBinaryVersion(),
segment.getSize()
);
id++;
allSegments.add(upgradedSegment);
}
return SegmentPublishResult.ok(allSegments);
};
}
static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException)
{
return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> {
@ -459,6 +523,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
{
private boolean handoffEnabled = true;
private long handoffDelay;
private final Set<SegmentDescriptor> handedOffSegmentDescriptors = new HashSet<>();
public void disableHandoff()
{
@ -470,6 +535,13 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
handoffDelay = delay;
}
public Set<SegmentDescriptor> getHandedOffSegmentDescriptors()
{
synchronized (handedOffSegmentDescriptors) {
return ImmutableSet.copyOf(handedOffSegmentDescriptors);
}
}
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
{
@ -494,6 +566,9 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
}
exec.execute(handOffRunnable);
synchronized (handedOffSegmentDescriptors) {
handedOffSegmentDescriptors.add(descriptor);
}
}
return true;
}