Fix replica task failures with metadata inconsistency while running concurrent append replace (#16614)

Changes:
- Add new task action `RetrieveSegmentsByIdAction`
- Use new task action to retrieve segments irrespective of their visibility
- During rolling upgrades, this task action would fail as Overlord would be on old version
- If new action fails, fall back to just fetching used segments as before
This commit is contained in:
Kashif Faraz 2024-06-23 21:26:04 -07:00 committed by GitHub
parent 1a883ba1f7
commit 0fe6a2af68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 309 additions and 153 deletions

View File

@ -20,10 +20,13 @@
package org.apache.druid.indexing.appenderator;
import com.google.common.collect.Iterables;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.actions.RetrieveSegmentsByIdAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -31,14 +34,16 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker
{
private static final Logger log = new Logger(ActionBasedUsedSegmentChecker.class);
private final TaskActionClient taskActionClient;
public ActionBasedUsedSegmentChecker(TaskActionClient taskActionClient)
@ -47,33 +52,54 @@ public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker
}
@Override
public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> segmentIds) throws IOException
public Set<DataSegment> findPublishedSegments(Set<SegmentId> segmentIds) throws IOException
{
// Group by dataSource
final Map<String, Set<SegmentId>> idsByDataSource = new TreeMap<>();
for (SegmentIdWithShardSpec segmentId : segmentIds) {
idsByDataSource.computeIfAbsent(segmentId.getDataSource(), i -> new HashSet<>()).add(segmentId.asSegmentId());
if (segmentIds == null || segmentIds.isEmpty()) {
return Collections.emptySet();
}
final Set<DataSegment> usedSegments = new HashSet<>();
for (Map.Entry<String, Set<SegmentId>> entry : idsByDataSource.entrySet()) {
String dataSource = entry.getKey();
Set<SegmentId> segmentIdsInDataSource = entry.getValue();
final List<Interval> intervals = JodaUtils.condenseIntervals(
Iterables.transform(segmentIdsInDataSource, SegmentId::getInterval)
);
final Collection<DataSegment> usedSegmentsForIntervals = taskActionClient
.submit(new RetrieveUsedSegmentsAction(dataSource, intervals));
for (DataSegment segment : usedSegmentsForIntervals) {
if (segmentIdsInDataSource.contains(segment.getId())) {
usedSegments.add(segment);
}
// Validate that all segments belong to the same datasource
final String dataSource = segmentIds.iterator().next().getDataSource();
for (SegmentId segmentId : segmentIds) {
if (!segmentId.getDataSource().equals(dataSource)) {
throw InvalidInput.exception(
"Published segment IDs to find cannot belong to multiple datasources[%s, %s].",
dataSource, segmentId.getDataSource()
);
}
}
return usedSegments;
// Try to retrieve segments using new task action
final Set<String> serializedSegmentIds = segmentIds.stream()
.map(SegmentId::toString)
.collect(Collectors.toSet());
try {
return taskActionClient.submit(new RetrieveSegmentsByIdAction(dataSource, serializedSegmentIds));
}
catch (Exception e) {
log.warn(
e,
"Could not retrieve published segment IDs[%s] using task action[segmentListById]."
+ " Overlord maybe on an older version, retrying with action[segmentListUsed]."
+ " This task may fail to publish segments if there is a concurrent replace happening.",
serializedSegmentIds
);
}
// Fall back to using old task action if Overlord is still on an older version
final Set<DataSegment> publishedSegments = new HashSet<>();
final List<Interval> usedSearchIntervals = JodaUtils.condenseIntervals(
Iterables.transform(segmentIds, SegmentId::getInterval)
);
final Collection<DataSegment> foundUsedSegments = taskActionClient.submit(
new RetrieveUsedSegmentsAction(dataSource, null, usedSearchIntervals, Segments.INCLUDING_OVERSHADOWED)
);
for (DataSegment segment : foundUsedSegments) {
if (segmentIds.contains(segment.getId())) {
publishedSegments.add(segment);
}
}
return publishedSegments;
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.timeline.DataSegment;
import java.util.Objects;
import java.util.Set;
/**
* Task action to retrieve segments from the metadata store. Matching segments
* are returned regardless of their visibility i.e. visible, overshadowed or unused.
*/
public class RetrieveSegmentsByIdAction implements TaskAction<Set<DataSegment>>
{
private final String dataSource;
private final Set<String> segmentIds;
@JsonCreator
public RetrieveSegmentsByIdAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentIds") Set<String> segmentIds
)
{
this.dataSource = dataSource;
this.segmentIds = segmentIds;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Set<String> getSegmentIds()
{
return segmentIds;
}
@Override
public TypeReference<Set<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Set<DataSegment>>()
{
};
}
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveSegmentsById(dataSource, segmentIds);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RetrieveSegmentsByIdAction that = (RetrieveSegmentsByIdAction) o;
return Objects.equals(dataSource, that.dataSource) && Objects.equals(segmentIds, that.segmentIds);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, segmentIds);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", segmentIds=" + segmentIds +
'}';
}
}

View File

@ -38,9 +38,8 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListById", value = RetrieveSegmentsByIdAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),

View File

@ -19,120 +19,124 @@
package org.apache.druid.indexing.appenderator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.actions.RetrieveSegmentsByIdAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class ActionBasedUsedSegmentCheckerTest
{
@Test
public void testBasic() throws IOException
private TaskActionClient taskActionClient;
private ActionBasedUsedSegmentChecker segmentRetriever;
@Before
public void setup()
{
final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
taskActionClient = EasyMock.createMock(TaskActionClient.class);
segmentRetriever = new ActionBasedUsedSegmentChecker(taskActionClient);
}
@Test
public void testRetrieveSegmentsById() throws IOException
{
final List<DataSegment> segments =
CreateDataSegments.ofDatasource("wiki")
.forIntervals(3, Granularities.DAY)
.startingAt("2013-01-01")
.eachOfSizeInMb(400);
EasyMock.expect(
taskActionClient.submit(
new RetrieveUsedSegmentsAction("bar", ImmutableList.of(Intervals.of("2002/P1D")))
)
).andReturn(
ImmutableList.of(
DataSegment.builder()
.dataSource("bar")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(0))
.version("b")
.size(0)
.build(),
DataSegment.builder()
.dataSource("bar")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("b")
.size(0)
.build()
)
);
EasyMock.expect(
taskActionClient.submit(
new RetrieveUsedSegmentsAction(
"foo",
ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D"))
new RetrieveSegmentsByIdAction(
"wiki",
segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toSet())
)
)
).andReturn(
ImmutableList.of(
DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2000/P1D"))
.shardSpec(new LinearShardSpec(0))
.version("a")
.size(0)
.build(),
DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2000/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("a")
.size(0)
.build(),
DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2001/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("b")
.size(0)
.build(),
DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("b")
.size(0)
.build()
)
);
).andReturn(new HashSet<>(segments)).once();
EasyMock.replay(taskActionClient);
final UsedSegmentChecker checker = new ActionBasedUsedSegmentChecker(taskActionClient);
final Set<DataSegment> segments = checker.findUsedSegments(
ImmutableSet.of(
new SegmentIdWithShardSpec("foo", Intervals.of("2000/P1D"), "a", new LinearShardSpec(1)),
new SegmentIdWithShardSpec("foo", Intervals.of("2001/P1D"), "b", new LinearShardSpec(0)),
new SegmentIdWithShardSpec("bar", Intervals.of("2002/P1D"), "b", new LinearShardSpec(0))
)
);
final Set<SegmentId> searchSegmentIds = segments.stream()
.map(DataSegment::getId)
.collect(Collectors.toSet());
Assert.assertEquals(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2000/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("a")
.size(0)
.build(),
DataSegment.builder()
.dataSource("bar")
.interval(Intervals.of("2002/P1D"))
.shardSpec(new LinearShardSpec(0))
.version("b")
.size(0)
.build()
),
segments
new HashSet<>(segments),
segmentRetriever.findPublishedSegments(searchSegmentIds)
);
EasyMock.verify(taskActionClient);
}
@Test
public void testRetrieveUsedSegmentsIfNotFoundById() throws IOException
{
final List<DataSegment> segments =
CreateDataSegments.ofDatasource("wiki")
.forIntervals(3, Granularities.DAY)
.startingAt("2013-01-01")
.eachOfSizeInMb(400);
EasyMock.expect(
taskActionClient.submit(
new RetrieveSegmentsByIdAction("wiki", EasyMock.anyObject())
)
).andThrow(InvalidInput.exception("task action not supported yet")).once();
EasyMock.expect(
taskActionClient.submit(
new RetrieveUsedSegmentsAction(
"wiki",
null,
Collections.singletonList(Intervals.of("2013-01-01/P3D")),
Segments.INCLUDING_OVERSHADOWED
)
)
).andReturn(segments).once();
EasyMock.replay(taskActionClient);
final Set<SegmentId> searchSegmentIds = segments.stream()
.map(DataSegment::getId)
.collect(Collectors.toSet());
Assert.assertEquals(
new HashSet<>(segments),
segmentRetriever.findPublishedSegments(searchSegmentIds)
);
EasyMock.verify(taskActionClient);
}
@Test
public void testSegmentsForMultipleDatasourcesThrowsException()
{
DruidException exception = Assert.assertThrows(
DruidException.class,
() -> segmentRetriever.findPublishedSegments(
ImmutableSet.of(
SegmentId.of("wiki", Intervals.ETERNITY, "v1", 0),
SegmentId.of("koala", Intervals.ETERNITY, "v1", 0)
)
)
);
Assert.assertEquals(
"Published segment IDs to find cannot belong to multiple datasources[wiki, koala].",
exception.getMessage()
);
}
}

View File

@ -136,6 +136,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
}
}
@Override
public Set<DataSegment> retrieveSegmentsById(String dataSource, Set<String> segmentIds)
{
return Collections.emptySet();
}
@Override
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval)
{

View File

@ -179,6 +179,12 @@ public interface IndexerMetadataStorageCoordinator
@Nullable DateTime maxUsedStatusLastUpdatedTime
);
/**
* Retrieves segments for the given IDs, regardless of their visibility
* (visible, overshadowed or unused).
*/
Set<DataSegment> retrieveSegmentsById(String dataSource, Set<String> segmentIds);
/**
* Mark as unused segments which include ONLY data within the given interval.
*

View File

@ -320,6 +320,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return matchingSegments;
}
@Override
public Set<DataSegment> retrieveSegmentsById(String dataSource, Set<String> segmentIds)
{
return connector.inReadOnlyTransaction(
(handle, transactionStatus) ->
retrieveSegmentsById(handle, dataSource, segmentIds)
.stream()
.map(DataSegmentPlus::getDataSegment)
.collect(Collectors.toSet())
);
}
@Override
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval)
{

View File

@ -46,6 +46,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -274,7 +275,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
{
this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator");
this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "segmentRetriever");
this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller");
this.executor = MoreExecutors.listeningDecorator(
Execs.singleThreaded("[" + StringUtils.encodeForFormat(appenderator.getId()) + "]-publish")
@ -622,7 +623,6 @@ public abstract class BaseAppenderatorDriver implements Closeable
return RetryUtils.retry(
() -> {
try {
final Set<DataSegment> upgradedSegments = new HashSet<>();
final ImmutableSet<DataSegment> ourSegments = ImmutableSet.copyOf(pushedAndTombstones);
final SegmentPublishResult publishResult = publisher.publishSegments(
segmentsToBeOverwritten,
@ -633,22 +633,25 @@ public abstract class BaseAppenderatorDriver implements Closeable
);
if (publishResult.isSuccess()) {
log.info(
"Published [%s] segments with commit metadata [%s]",
"Published [%d] segments with commit metadata[%s].",
segmentsAndCommitMetadata.getSegments().size(),
callerMetadata
);
log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments");
// This set must contain only those segments that were upgraded as a result of a concurrent replace.
upgradedSegments.addAll(publishResult.getSegments());
// Log segments upgraded as a result of a concurrent replace
final Set<DataSegment> upgradedSegments = new HashSet<>(publishResult.getSegments());
segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove);
if (!upgradedSegments.isEmpty()) {
log.info("Published [%d] upgraded segments.", upgradedSegments.size());
log.infoSegments(upgradedSegments, "Upgraded segments");
}
log.info("Published segment schemas: [%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping());
log.info("Published segment schemas[%s].", segmentsAndCommitMetadata.getSegmentSchemaMapping());
return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);
} else {
// Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active
// now after all, for two possible reasons:
// Publishing didn't affirmatively succeed. However, segments
// with these IDs may have already been published:
//
// 1) A replica may have beat us to publishing these segments. In this case we want to delete the
// segments we pushed (if they had unique paths) to avoid wasting space on deep storage.
@ -656,29 +659,28 @@ public abstract class BaseAppenderatorDriver implements Closeable
// from the overlord. In this case we do not want to delete the segments we pushed, since they are
// now live!
final Set<SegmentIdWithShardSpec> segmentsIdentifiers = segmentsAndCommitMetadata
final Set<SegmentId> segmentIds = segmentsAndCommitMetadata
.getSegments()
.stream()
.map(SegmentIdWithShardSpec::fromDataSegment)
.map(DataSegment::getId)
.collect(Collectors.toSet());
final Set<DataSegment> activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
if (activeSegments.equals(ourSegments)) {
final Set<DataSegment> publishedSegments = usedSegmentChecker.findPublishedSegments(segmentIds);
if (publishedSegments.equals(ourSegments)) {
log.info(
"Could not publish [%s] segments, but checked and found them already published; continuing.",
"Could not publish [%d] segments, but they have already been published by another task.",
ourSegments.size()
);
log.infoSegments(
segmentsAndCommitMetadata.getSegments(),
"Could not publish segments"
);
log.info("Could not publish segment and schemas: [%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping());
log.info("Could not publish segment schemas[%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping());
// Clean up pushed segments if they are physically disjoint from the published ones (this means
// they were probably pushed by a replica, and with the unique paths option).
final boolean physicallyDisjoint = Sets.intersection(
activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
publishedSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
).isEmpty();
@ -698,8 +700,9 @@ public abstract class BaseAppenderatorDriver implements Closeable
}
throw new ISE("Failed to publish segments");
}
return segmentsAndCommitMetadata;
}
return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);
}
catch (Exception e) {
// Must not remove segments here, we aren't sure if our transaction succeeded or not.

View File

@ -26,16 +26,12 @@ import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class SegmentsAndCommitMetadata
{
private static final SegmentsAndCommitMetadata NIL
= new SegmentsAndCommitMetadata(Collections.emptyList(), null, null, null);
private final Object commitMetadata;
private final ImmutableList<DataSegment> segments;
private final SegmentSchemaMapping segmentSchemaMapping;
@ -139,9 +135,4 @@ public class SegmentsAndCommitMetadata
", segmentSchemaMapping=" + segmentSchemaMapping +
'}';
}
public static SegmentsAndCommitMetadata nil()
{
return NIL;
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.Set;
@ -28,10 +29,6 @@ public interface UsedSegmentChecker
{
/**
* For any identifiers that exist and are actually used, returns the corresponding DataSegment objects.
*
* @param identifiers identifiers to search for
*
* @return used DataSegments
*/
Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers) throws IOException;
Set<DataSegment> findPublishedSegments(Set<SegmentId> identifiers) throws IOException;
}

View File

@ -46,6 +46,7 @@ import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTe
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@ -326,7 +327,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
private static class NoopUsedSegmentChecker implements UsedSegmentChecker
{
@Override
public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers)
public Set<DataSegment> findPublishedSegments(Set<SegmentId> identifiers)
{
return ImmutableSet.of();
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
@ -38,14 +39,14 @@ public class TestUsedSegmentChecker implements UsedSegmentChecker
}
@Override
public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers)
public Set<DataSegment> findPublishedSegments(Set<SegmentId> identifiers)
{
final SegmentTimeline timeline = SegmentTimeline.forSegments(pushedSegments);
final Set<DataSegment> retVal = new HashSet<>();
for (SegmentIdWithShardSpec identifier : identifiers) {
for (TimelineObjectHolder<String, DataSegment> holder : timeline.lookup(identifier.getInterval())) {
for (SegmentId segmentId : identifiers) {
for (TimelineObjectHolder<String, DataSegment> holder : timeline.lookup(segmentId.getInterval())) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
if (identifiers.contains(SegmentIdWithShardSpec.fromDataSegment(chunk.getObject()))) {
if (identifiers.contains(chunk.getObject().getId())) {
retVal.add(chunk.getObject());
}
}