From 0fe6a2af68d8bd7d729bc2d2402c223809dca7e0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sun, 23 Jun 2024 21:26:04 -0700 Subject: [PATCH] Fix replica task failures with metadata inconsistency while running concurrent append replace (#16614) Changes: - Add new task action `RetrieveSegmentsByIdAction` - Use new task action to retrieve segments irrespective of their visibility - During rolling upgrades, this task action would fail as Overlord would be on old version - If new action fails, fall back to just fetching used segments as before --- .../ActionBasedUsedSegmentChecker.java | 76 +++++--- .../actions/RetrieveSegmentsByIdAction.java | 110 +++++++++++ .../indexing/common/actions/TaskAction.java | 3 +- .../ActionBasedUsedSegmentCheckerTest.java | 184 +++++++++--------- ...TestIndexerMetadataStorageCoordinator.java | 6 + .../IndexerMetadataStorageCoordinator.java | 6 + .../IndexerSQLMetadataStorageCoordinator.java | 12 ++ .../appenderator/BaseAppenderatorDriver.java | 37 ++-- .../SegmentsAndCommitMetadata.java | 9 - .../appenderator/UsedSegmentChecker.java | 7 +- .../StreamAppenderatorDriverFailTest.java | 3 +- .../appenderator/TestUsedSegmentChecker.java | 9 +- 12 files changed, 309 insertions(+), 153 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index 3a33bc80d68..2a7d5610cca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -20,10 +20,13 @@ package org.apache.druid.indexing.appenderator; import com.google.common.collect.Iterables; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsByIdAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.JodaUtils; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -31,14 +34,16 @@ import org.joda.time.Interval; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.TreeMap; +import java.util.stream.Collectors; public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker { + private static final Logger log = new Logger(ActionBasedUsedSegmentChecker.class); + private final TaskActionClient taskActionClient; public ActionBasedUsedSegmentChecker(TaskActionClient taskActionClient) @@ -47,33 +52,54 @@ public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker } @Override - public Set findUsedSegments(Set segmentIds) throws IOException + public Set findPublishedSegments(Set segmentIds) throws IOException { - // Group by dataSource - final Map> idsByDataSource = new TreeMap<>(); - for (SegmentIdWithShardSpec segmentId : segmentIds) { - idsByDataSource.computeIfAbsent(segmentId.getDataSource(), i -> new HashSet<>()).add(segmentId.asSegmentId()); + if (segmentIds == null || segmentIds.isEmpty()) { + return Collections.emptySet(); } - final Set usedSegments = new HashSet<>(); - - for (Map.Entry> entry : idsByDataSource.entrySet()) { - String dataSource = entry.getKey(); - Set segmentIdsInDataSource = entry.getValue(); - final List intervals = JodaUtils.condenseIntervals( - Iterables.transform(segmentIdsInDataSource, SegmentId::getInterval) - ); - - final Collection usedSegmentsForIntervals = taskActionClient - .submit(new RetrieveUsedSegmentsAction(dataSource, intervals)); - - for (DataSegment segment : usedSegmentsForIntervals) { - if (segmentIdsInDataSource.contains(segment.getId())) { - usedSegments.add(segment); - } + // Validate that all segments belong to the same datasource + final String dataSource = segmentIds.iterator().next().getDataSource(); + for (SegmentId segmentId : segmentIds) { + if (!segmentId.getDataSource().equals(dataSource)) { + throw InvalidInput.exception( + "Published segment IDs to find cannot belong to multiple datasources[%s, %s].", + dataSource, segmentId.getDataSource() + ); } } - return usedSegments; + // Try to retrieve segments using new task action + final Set serializedSegmentIds = segmentIds.stream() + .map(SegmentId::toString) + .collect(Collectors.toSet()); + try { + return taskActionClient.submit(new RetrieveSegmentsByIdAction(dataSource, serializedSegmentIds)); + } + catch (Exception e) { + log.warn( + e, + "Could not retrieve published segment IDs[%s] using task action[segmentListById]." + + " Overlord maybe on an older version, retrying with action[segmentListUsed]." + + " This task may fail to publish segments if there is a concurrent replace happening.", + serializedSegmentIds + ); + } + + // Fall back to using old task action if Overlord is still on an older version + final Set publishedSegments = new HashSet<>(); + final List usedSearchIntervals = JodaUtils.condenseIntervals( + Iterables.transform(segmentIds, SegmentId::getInterval) + ); + final Collection foundUsedSegments = taskActionClient.submit( + new RetrieveUsedSegmentsAction(dataSource, null, usedSearchIntervals, Segments.INCLUDING_OVERSHADOWED) + ); + for (DataSegment segment : foundUsedSegments) { + if (segmentIds.contains(segment.getId())) { + publishedSegments.add(segment); + } + } + + return publishedSegments; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java new file mode 100644 index 00000000000..88d3703f4b0 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.timeline.DataSegment; + +import java.util.Objects; +import java.util.Set; + +/** + * Task action to retrieve segments from the metadata store. Matching segments + * are returned regardless of their visibility i.e. visible, overshadowed or unused. + */ +public class RetrieveSegmentsByIdAction implements TaskAction> +{ + private final String dataSource; + private final Set segmentIds; + + @JsonCreator + public RetrieveSegmentsByIdAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segmentIds") Set segmentIds + ) + { + this.dataSource = dataSource; + this.segmentIds = segmentIds; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Set getSegmentIds() + { + return segmentIds; + } + + @Override + public TypeReference> getReturnTypeReference() + { + return new TypeReference>() + { + }; + } + + @Override + public Set perform(Task task, TaskActionToolbox toolbox) + { + return toolbox.getIndexerMetadataStorageCoordinator() + .retrieveSegmentsById(dataSource, segmentIds); + } + + @Override + public boolean isAudited() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RetrieveSegmentsByIdAction that = (RetrieveSegmentsByIdAction) o; + return Objects.equals(dataSource, that.dataSource) && Objects.equals(segmentIds, that.segmentIds); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSource, segmentIds); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "dataSource='" + dataSource + '\'' + + ", segmentIds=" + segmentIds + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 171d53b9cdd..7ab0b946cd8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -38,9 +38,8 @@ import java.util.concurrent.Future; @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), - // Type name doesn't correspond to the name of the class for backward compatibility. + @JsonSubTypes.Type(name = "segmentListById", value = RetrieveSegmentsByIdAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), - // Type name doesn't correspond to the name of the class for backward compatibility. @JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class), @JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class), @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java index c339a103b2d..160176c8841 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java @@ -19,120 +19,124 @@ package org.apache.druid.indexing.appenderator; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsByIdAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; -import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; public class ActionBasedUsedSegmentCheckerTest { - @Test - public void testBasic() throws IOException + private TaskActionClient taskActionClient; + private ActionBasedUsedSegmentChecker segmentRetriever; + + @Before + public void setup() { - final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class); + taskActionClient = EasyMock.createMock(TaskActionClient.class); + segmentRetriever = new ActionBasedUsedSegmentChecker(taskActionClient); + } + + @Test + public void testRetrieveSegmentsById() throws IOException + { + final List segments = + CreateDataSegments.ofDatasource("wiki") + .forIntervals(3, Granularities.DAY) + .startingAt("2013-01-01") + .eachOfSizeInMb(400); + EasyMock.expect( taskActionClient.submit( - new RetrieveUsedSegmentsAction("bar", ImmutableList.of(Intervals.of("2002/P1D"))) - ) - ).andReturn( - ImmutableList.of( - DataSegment.builder() - .dataSource("bar") - .interval(Intervals.of("2002/P1D")) - .shardSpec(new LinearShardSpec(0)) - .version("b") - .size(0) - .build(), - DataSegment.builder() - .dataSource("bar") - .interval(Intervals.of("2002/P1D")) - .shardSpec(new LinearShardSpec(1)) - .version("b") - .size(0) - .build() - ) - ); - EasyMock.expect( - taskActionClient.submit( - new RetrieveUsedSegmentsAction( - "foo", - ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")) + new RetrieveSegmentsByIdAction( + "wiki", + segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toSet()) ) ) - ).andReturn( - ImmutableList.of( - DataSegment.builder() - .dataSource("foo") - .interval(Intervals.of("2000/P1D")) - .shardSpec(new LinearShardSpec(0)) - .version("a") - .size(0) - .build(), - DataSegment.builder() - .dataSource("foo") - .interval(Intervals.of("2000/P1D")) - .shardSpec(new LinearShardSpec(1)) - .version("a") - .size(0) - .build(), - DataSegment.builder() - .dataSource("foo") - .interval(Intervals.of("2001/P1D")) - .shardSpec(new LinearShardSpec(1)) - .version("b") - .size(0) - .build(), - DataSegment.builder() - .dataSource("foo") - .interval(Intervals.of("2002/P1D")) - .shardSpec(new LinearShardSpec(1)) - .version("b") - .size(0) - .build() - ) - ); + ).andReturn(new HashSet<>(segments)).once(); EasyMock.replay(taskActionClient); - final UsedSegmentChecker checker = new ActionBasedUsedSegmentChecker(taskActionClient); - final Set segments = checker.findUsedSegments( - ImmutableSet.of( - new SegmentIdWithShardSpec("foo", Intervals.of("2000/P1D"), "a", new LinearShardSpec(1)), - new SegmentIdWithShardSpec("foo", Intervals.of("2001/P1D"), "b", new LinearShardSpec(0)), - new SegmentIdWithShardSpec("bar", Intervals.of("2002/P1D"), "b", new LinearShardSpec(0)) - ) - ); - + final Set searchSegmentIds = segments.stream() + .map(DataSegment::getId) + .collect(Collectors.toSet()); Assert.assertEquals( - ImmutableSet.of( - DataSegment.builder() - .dataSource("foo") - .interval(Intervals.of("2000/P1D")) - .shardSpec(new LinearShardSpec(1)) - .version("a") - .size(0) - .build(), - DataSegment.builder() - .dataSource("bar") - .interval(Intervals.of("2002/P1D")) - .shardSpec(new LinearShardSpec(0)) - .version("b") - .size(0) - .build() - ), - segments + new HashSet<>(segments), + segmentRetriever.findPublishedSegments(searchSegmentIds) ); EasyMock.verify(taskActionClient); } + + @Test + public void testRetrieveUsedSegmentsIfNotFoundById() throws IOException + { + final List segments = + CreateDataSegments.ofDatasource("wiki") + .forIntervals(3, Granularities.DAY) + .startingAt("2013-01-01") + .eachOfSizeInMb(400); + + EasyMock.expect( + taskActionClient.submit( + new RetrieveSegmentsByIdAction("wiki", EasyMock.anyObject()) + ) + ).andThrow(InvalidInput.exception("task action not supported yet")).once(); + EasyMock.expect( + taskActionClient.submit( + new RetrieveUsedSegmentsAction( + "wiki", + null, + Collections.singletonList(Intervals.of("2013-01-01/P3D")), + Segments.INCLUDING_OVERSHADOWED + ) + ) + ).andReturn(segments).once(); + EasyMock.replay(taskActionClient); + + final Set searchSegmentIds = segments.stream() + .map(DataSegment::getId) + .collect(Collectors.toSet()); + Assert.assertEquals( + new HashSet<>(segments), + segmentRetriever.findPublishedSegments(searchSegmentIds) + ); + + EasyMock.verify(taskActionClient); + } + + @Test + public void testSegmentsForMultipleDatasourcesThrowsException() + { + DruidException exception = Assert.assertThrows( + DruidException.class, + () -> segmentRetriever.findPublishedSegments( + ImmutableSet.of( + SegmentId.of("wiki", Intervals.ETERNITY, "v1", 0), + SegmentId.of("koala", Intervals.ETERNITY, "v1", 0) + ) + ) + ); + Assert.assertEquals( + "Published segment IDs to find cannot belong to multiple datasources[wiki, koala].", + exception.getMessage() + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 3aceae494c6..31a40277b8e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -136,6 +136,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto } } + @Override + public Set retrieveSegmentsById(String dataSource, Set segmentIds) + { + return Collections.emptySet(); + } + @Override public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index da6dd9ffd95..c02bba6082c 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -179,6 +179,12 @@ public interface IndexerMetadataStorageCoordinator @Nullable DateTime maxUsedStatusLastUpdatedTime ); + /** + * Retrieves segments for the given IDs, regardless of their visibility + * (visible, overshadowed or unused). + */ + Set retrieveSegmentsById(String dataSource, Set segmentIds); + /** * Mark as unused segments which include ONLY data within the given interval. * diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 2b9f328a097..72367330232 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -320,6 +320,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return matchingSegments; } + @Override + public Set retrieveSegmentsById(String dataSource, Set segmentIds) + { + return connector.inReadOnlyTransaction( + (handle, transactionStatus) -> + retrieveSegmentsById(handle, dataSource, segmentIds) + .stream() + .map(DataSegmentPlus::getDataSegment) + .collect(Collectors.toSet()) + ); + } + @Override public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 65df4f56761..a192111db4a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -46,6 +46,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -274,7 +275,7 @@ public abstract class BaseAppenderatorDriver implements Closeable { this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator"); this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); - this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); + this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "segmentRetriever"); this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller"); this.executor = MoreExecutors.listeningDecorator( Execs.singleThreaded("[" + StringUtils.encodeForFormat(appenderator.getId()) + "]-publish") @@ -622,7 +623,6 @@ public abstract class BaseAppenderatorDriver implements Closeable return RetryUtils.retry( () -> { try { - final Set upgradedSegments = new HashSet<>(); final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); final SegmentPublishResult publishResult = publisher.publishSegments( segmentsToBeOverwritten, @@ -633,22 +633,25 @@ public abstract class BaseAppenderatorDriver implements Closeable ); if (publishResult.isSuccess()) { log.info( - "Published [%s] segments with commit metadata [%s]", + "Published [%d] segments with commit metadata[%s].", segmentsAndCommitMetadata.getSegments().size(), callerMetadata ); log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); - // This set must contain only those segments that were upgraded as a result of a concurrent replace. - upgradedSegments.addAll(publishResult.getSegments()); + + // Log segments upgraded as a result of a concurrent replace + final Set upgradedSegments = new HashSet<>(publishResult.getSegments()); segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove); if (!upgradedSegments.isEmpty()) { log.info("Published [%d] upgraded segments.", upgradedSegments.size()); log.infoSegments(upgradedSegments, "Upgraded segments"); } - log.info("Published segment schemas: [%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping()); + + log.info("Published segment schemas[%s].", segmentsAndCommitMetadata.getSegmentSchemaMapping()); + return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); } else { - // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active - // now after all, for two possible reasons: + // Publishing didn't affirmatively succeed. However, segments + // with these IDs may have already been published: // // 1) A replica may have beat us to publishing these segments. In this case we want to delete the // segments we pushed (if they had unique paths) to avoid wasting space on deep storage. @@ -656,29 +659,28 @@ public abstract class BaseAppenderatorDriver implements Closeable // from the overlord. In this case we do not want to delete the segments we pushed, since they are // now live! - final Set segmentsIdentifiers = segmentsAndCommitMetadata + final Set segmentIds = segmentsAndCommitMetadata .getSegments() .stream() - .map(SegmentIdWithShardSpec::fromDataSegment) + .map(DataSegment::getId) .collect(Collectors.toSet()); - final Set activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers); - - if (activeSegments.equals(ourSegments)) { + final Set publishedSegments = usedSegmentChecker.findPublishedSegments(segmentIds); + if (publishedSegments.equals(ourSegments)) { log.info( - "Could not publish [%s] segments, but checked and found them already published; continuing.", + "Could not publish [%d] segments, but they have already been published by another task.", ourSegments.size() ); log.infoSegments( segmentsAndCommitMetadata.getSegments(), "Could not publish segments" ); - log.info("Could not publish segment and schemas: [%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping()); + log.info("Could not publish segment schemas[%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping()); // Clean up pushed segments if they are physically disjoint from the published ones (this means // they were probably pushed by a replica, and with the unique paths option). final boolean physicallyDisjoint = Sets.intersection( - activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), + publishedSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()), ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()) ).isEmpty(); @@ -698,8 +700,9 @@ public abstract class BaseAppenderatorDriver implements Closeable } throw new ISE("Failed to publish segments"); } + + return segmentsAndCommitMetadata; } - return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); } catch (Exception e) { // Must not remove segments here, we aren't sure if our transaction succeeded or not. diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java index 72187688057..dfbfa621a23 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java @@ -26,16 +26,12 @@ import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; public class SegmentsAndCommitMetadata { - private static final SegmentsAndCommitMetadata NIL - = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null, null); - private final Object commitMetadata; private final ImmutableList segments; private final SegmentSchemaMapping segmentSchemaMapping; @@ -139,9 +135,4 @@ public class SegmentsAndCommitMetadata ", segmentSchemaMapping=" + segmentSchemaMapping + '}'; } - - public static SegmentsAndCommitMetadata nil() - { - return NIL; - } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UsedSegmentChecker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UsedSegmentChecker.java index 29fd6028f82..3ab966009b0 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UsedSegmentChecker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UsedSegmentChecker.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import java.io.IOException; import java.util.Set; @@ -28,10 +29,6 @@ public interface UsedSegmentChecker { /** * For any identifiers that exist and are actually used, returns the corresponding DataSegment objects. - * - * @param identifiers identifiers to search for - * - * @return used DataSegments */ - Set findUsedSegments(Set identifiers) throws IOException; + Set findPublishedSegments(Set identifiers) throws IOException; } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 5a21a4331fe..e3ed8fd49d7 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -46,6 +46,7 @@ import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTe import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -326,7 +327,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport private static class NoopUsedSegmentChecker implements UsedSegmentChecker { @Override - public Set findUsedSegments(Set identifiers) + public Set findPublishedSegments(Set identifiers) { return ImmutableSet.of(); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java index 91ee44d1ce1..37612b11776 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.partition.PartitionChunk; @@ -38,14 +39,14 @@ public class TestUsedSegmentChecker implements UsedSegmentChecker } @Override - public Set findUsedSegments(Set identifiers) + public Set findPublishedSegments(Set identifiers) { final SegmentTimeline timeline = SegmentTimeline.forSegments(pushedSegments); final Set retVal = new HashSet<>(); - for (SegmentIdWithShardSpec identifier : identifiers) { - for (TimelineObjectHolder holder : timeline.lookup(identifier.getInterval())) { + for (SegmentId segmentId : identifiers) { + for (TimelineObjectHolder holder : timeline.lookup(segmentId.getInterval())) { for (PartitionChunk chunk : holder.getObject()) { - if (identifiers.contains(SegmentIdWithShardSpec.fromDataSegment(chunk.getObject()))) { + if (identifiers.contains(chunk.getObject().getId())) { retVal.add(chunk.getObject()); } }