From d9bd02256ab3a9c393e97d2da709f7395293547f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 25 Jun 2024 22:18:59 -0700 Subject: [PATCH] Refactor: Rename UsedSegmentChecker and cleanup task actions (#16644) Changes: - Rename `UsedSegmentChecker` to `PublishedSegmentsRetriever` - Remove deprecated single `Interval` argument from `RetrieveUsedSegmentsAction` as it is now unused and has been deprecated since #1988 - Return `Set` of segments instead of a `Collection` from `IndexerMetadataStorageCoordinator.retrieveUsedSegments()` --- ...ActionBasedPublishedSegmentRetriever.java} | 10 +- .../actions/RetrieveUsedSegmentsAction.java | 50 +++----- .../indexing/common/actions/TaskAction.java | 2 +- .../common/task/BatchAppenderators.java | 4 +- .../common/task/KillUnusedSegmentsTask.java | 1 - ...rlordActionBasedUsedSegmentsRetriever.java | 2 +- .../SeekableStreamIndexTask.java | 4 +- .../SeekableStreamIndexTaskRunner.java | 2 +- .../seekablestream/SequenceMetadata.java | 16 +-- ...onBasedPublishedSegmentRetrieverTest.java} | 7 +- .../RetrieveUsedSegmentsActionSerdeTest.java | 11 +- .../SegmentTransactionalInsertActionTest.java | 25 ++-- .../ConcurrentReplaceAndAppendTest.java | 1 - ...ncurrentReplaceAndStreamingAppendTest.java | 2 - ...TestIndexerMetadataStorageCoordinator.java | 8 +- .../druid/common/config/ConfigManager.java | 6 +- .../IndexerMetadataStorageCoordinator.java | 68 +++-------- .../IndexerSQLMetadataStorageCoordinator.java | 107 ++++++------------ .../druid/metadata/SQLMetadataConnector.java | 2 +- .../appenderator/BaseAppenderatorDriver.java | 8 +- .../appenderator/BatchAppenderatorDriver.java | 10 +- ...er.java => PublishedSegmentRetriever.java} | 2 +- .../StreamAppenderatorDriver.java | 8 +- ...exerSQLMetadataStorageCoordinatorTest.java | 4 +- ...mentsSinksBatchAppenderatorDriverTest.java | 2 +- ...edSegmentsBatchAppenderatorDriverTest.java | 2 +- .../StreamAppenderatorDriverFailTest.java | 15 +-- .../StreamAppenderatorDriverTest.java | 2 +- ...ava => TestPublishedSegmentRetriever.java} | 4 +- 29 files changed, 137 insertions(+), 248 deletions(-) rename indexing-service/src/main/java/org/apache/druid/indexing/appenderator/{ActionBasedUsedSegmentChecker.java => ActionBasedPublishedSegmentRetriever.java} (89%) rename indexing-service/src/test/java/org/apache/druid/indexing/appenderator/{ActionBasedUsedSegmentCheckerTest.java => ActionBasedPublishedSegmentRetrieverTest.java} (96%) rename server/src/main/java/org/apache/druid/segment/realtime/appenderator/{UsedSegmentChecker.java => PublishedSegmentRetriever.java} (96%) rename server/src/test/java/org/apache/druid/segment/realtime/appenderator/{TestUsedSegmentChecker.java => TestPublishedSegmentRetriever.java} (92%) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java similarity index 89% rename from indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java rename to indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java index 2a7d5610cca..ba5cf923b12 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java @@ -27,7 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; +import org.apache.druid.segment.realtime.appenderator.PublishedSegmentRetriever; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; @@ -40,13 +40,13 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker +public class ActionBasedPublishedSegmentRetriever implements PublishedSegmentRetriever { - private static final Logger log = new Logger(ActionBasedUsedSegmentChecker.class); + private static final Logger log = new Logger(ActionBasedPublishedSegmentRetriever.class); private final TaskActionClient taskActionClient; - public ActionBasedUsedSegmentChecker(TaskActionClient taskActionClient) + public ActionBasedPublishedSegmentRetriever(TaskActionClient taskActionClient) { this.taskActionClient = taskActionClient; } @@ -92,7 +92,7 @@ public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker Iterables.transform(segmentIds, SegmentId::getInterval) ); final Collection foundUsedSegments = taskActionClient.submit( - new RetrieveUsedSegmentsAction(dataSource, null, usedSearchIntervals, Segments.INCLUDING_OVERSHADOWED) + new RetrieveUsedSegmentsAction(dataSource, usedSearchIntervals, Segments.INCLUDING_OVERSHADOWED) ); for (DataSegment segment : foundUsedSegments) { if (segmentIds.contains(segment.getId())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java index 4e5f134c521..a107795864c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java @@ -22,8 +22,8 @@ package org.apache.druid.indexing.common.actions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; +import org.apache.druid.common.config.Configs; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask; import org.apache.druid.indexing.overlord.Segments; @@ -35,6 +35,7 @@ import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentTimeline; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -48,19 +49,13 @@ import java.util.Set; import java.util.stream.Collectors; /** - * This TaskAction returns a collection of segments which have data within the specified intervals and are marked as - * used. + * Task action to retrieve a collection of segments which have data within the + * specified intervals and are marked as used. + *

* If the task holds REPLACE locks and is writing back to the same datasource, - * only segments that were created before the REPLACE lock was acquired are returned for an interval. - * This ensures that the input set of segments for this replace task remains consistent - * even when new data is appended by other concurrent tasks. - * - * The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in - * the collection only once. - * - * @implNote This action doesn't produce a {@link Set} because it's implemented via {@link - * org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals} which returns - * a collection. Producing a {@link Set} would require an unnecessary copy of segments collection. + * only segments that were created before the REPLACE lock was acquired are + * returned for an interval. This ensures that the input set of segments for this + * replace task remains consistent even when new data is appended by other concurrent tasks. */ public class RetrieveUsedSegmentsAction implements TaskAction> { @@ -73,35 +68,22 @@ public class RetrieveUsedSegmentsAction implements TaskAction intervals, - // When JSON object is deserialized, this parameter is optional for backward compatibility. - // Otherwise, it shouldn't be considered optional. @JsonProperty("visibility") @Nullable Segments visibility ) { - this.dataSource = dataSource; - - Preconditions.checkArgument( - interval == null || intervals == null, - "please specify intervals only" - ); - - List theIntervals = null; - if (interval != null) { - theIntervals = ImmutableList.of(interval); - } else if (intervals != null && intervals.size() > 0) { - theIntervals = JodaUtils.condenseIntervals(intervals); + if (CollectionUtils.isNullOrEmpty(intervals)) { + throw InvalidInput.exception("No interval specified for retrieving used segments"); } - this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found"); - // Defaulting to the former behaviour when visibility wasn't explicitly specified for backward compatibility - this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE; + this.dataSource = dataSource; + this.intervals = JodaUtils.condenseIntervals(intervals); + this.visibility = Configs.valueOrDefault(visibility, Segments.ONLY_VISIBLE); } public RetrieveUsedSegmentsAction(String dataSource, Collection intervals) { - this(dataSource, null, intervals, Segments.ONLY_VISIBLE); + this(dataSource, intervals, Segments.ONLY_VISIBLE); } @JsonProperty @@ -198,7 +180,7 @@ public class RetrieveUsedSegmentsAction implements TaskAction retrieveUsedSegments(TaskActionToolbox toolbox) + private Set retrieveUsedSegments(TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() .retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 7ab0b946cd8..4606bd597a8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -38,7 +38,7 @@ import java.util.concurrent.Future; @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), - @JsonSubTypes.Type(name = "segmentListById", value = RetrieveSegmentsByIdAction.class), + @JsonSubTypes.Type(name = "retrieveSegmentsById", value = RetrieveSegmentsByIdAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class), @JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java index 3a8f70d6746..087464b48ac 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.common.task; -import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.IAE; @@ -134,7 +134,7 @@ public final class BatchAppenderators return new BatchAppenderatorDriver( appenderator, segmentAllocator, - new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), + new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()), toolbox.getDataSegmentKiller() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 6a1f9e95a06..fe49569a3bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -198,7 +198,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction( getDataSource(), - null, ImmutableList.of(getInterval()), Segments.INCLUDING_OVERSHADOWED ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java index 73bc411fb01..e6e9ad7170b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java @@ -53,6 +53,6 @@ public class OverlordActionBasedUsedSegmentsRetriever implements UsedSegmentsRet { return toolbox .getTaskActionClient() - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, visibility)); + .submit(new RetrieveUsedSegmentsAction(dataSource, intervals, visibility)); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index f8e78bb711d..ee4eed572df 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -27,8 +27,8 @@ import com.google.common.base.Suppliers; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; -import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; @@ -237,7 +237,7 @@ public abstract class SeekableStreamIndexTask) publishedSegmentsAndMetadata -> { + publishedSegmentsAndMetadata -> { if (publishedSegmentsAndMetadata == null) { throw new ISE( "Transaction failure publishing segments for sequence [%s]", diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index c3832391be8..2da858f80cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -393,15 +393,13 @@ public class SequenceMetadata // if we created no segments and didn't change any offsets, just do nothing and return. log.info( "With empty segment set, start offsets [%s] and end offsets [%s] are the same, skipping metadata commit.", - startPartitions, - finalPartitions + startPartitions, finalPartitions ); return SegmentPublishResult.ok(segmentsToPush); } else { log.info( "With empty segment set, start offsets [%s] and end offsets [%s] changed, committing new metadata.", - startPartitions, - finalPartitions + startPartitions, finalPartitions ); action = SegmentTransactionalInsertAction.commitMetadataOnlyAction( runner.getAppenderator().getDataSource(), @@ -419,12 +417,10 @@ public class SequenceMetadata ); final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions); action = taskLockType == TaskLockType.APPEND - ? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata, - segmentSchemaMapping - ) - : SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata, - segmentSchemaMapping - ); + ? SegmentTransactionalAppendAction + .forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata, segmentSchemaMapping) + : SegmentTransactionalInsertAction + .appendAction(segmentsToPush, startMetadata, endMetadata, segmentSchemaMapping); } else { action = taskLockType == TaskLockType.APPEND ? SegmentTransactionalAppendAction.forSegments(segmentsToPush, segmentSchemaMapping) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetrieverTest.java similarity index 96% rename from indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetrieverTest.java index 160176c8841..dd1cd5c59ca 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetrieverTest.java @@ -43,16 +43,16 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -public class ActionBasedUsedSegmentCheckerTest +public class ActionBasedPublishedSegmentRetrieverTest { private TaskActionClient taskActionClient; - private ActionBasedUsedSegmentChecker segmentRetriever; + private ActionBasedPublishedSegmentRetriever segmentRetriever; @Before public void setup() { taskActionClient = EasyMock.createMock(TaskActionClient.class); - segmentRetriever = new ActionBasedUsedSegmentChecker(taskActionClient); + segmentRetriever = new ActionBasedPublishedSegmentRetriever(taskActionClient); } @Test @@ -103,7 +103,6 @@ public class ActionBasedUsedSegmentCheckerTest taskActionClient.submit( new RetrieveUsedSegmentsAction( "wiki", - null, Collections.singletonList(Intervals.of("2013-01-01/P3D")), Segments.INCLUDING_OVERSHADOWED ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java index 99675fd57bb..6ae9fc80e3c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java @@ -28,6 +28,7 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.List; /** @@ -42,7 +43,7 @@ public class RetrieveUsedSegmentsActionSerdeTest Interval interval = Intervals.of("2014/2015"); RetrieveUsedSegmentsAction expected = - new RetrieveUsedSegmentsAction("dataSource", interval, null, Segments.ONLY_VISIBLE); + new RetrieveUsedSegmentsAction("dataSource", Collections.singletonList(interval), Segments.ONLY_VISIBLE); RetrieveUsedSegmentsAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), RetrieveUsedSegmentsAction.class); @@ -68,11 +69,15 @@ public class RetrieveUsedSegmentsActionSerdeTest @Test public void testOldJsonDeserialization() throws Exception { - String jsonStr = "{\"type\": \"segmentListUsed\", \"dataSource\": \"test\", \"interval\": \"2014/2015\"}"; + String jsonStr = "{\"type\": \"segmentListUsed\", \"dataSource\": \"test\", \"intervals\": [\"2014/2015\"]}"; RetrieveUsedSegmentsAction actual = (RetrieveUsedSegmentsAction) MAPPER.readValue(jsonStr, TaskAction.class); Assert.assertEquals( - new RetrieveUsedSegmentsAction("test", Intervals.of("2014/2015"), null, Segments.ONLY_VISIBLE), + new RetrieveUsedSegmentsAction( + "test", + Collections.singletonList(Intervals.of("2014/2015")), + Segments.ONLY_VISIBLE + ), actual ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index f158ef1980c..44ce60b5ceb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -35,18 +35,13 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.assertj.core.api.Assertions; -import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; public class SegmentTransactionalInsertActionTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule public TaskActionTestKit actionTestKit = new TaskActionTestKit(); @@ -157,8 +152,8 @@ public class SegmentTransactionalInsertActionTest Assert.assertEquals( SegmentPublishResult.fail( InvalidInput.exception( - "The new start metadata state[ObjectMetadata{theObject=[1]}] is ahead of the last commited end" - + " state[null]. Try resetting the supervisor." + "The new start metadata state[ObjectMetadata{theObject=[1]}] is" + + " ahead of the last committed end state[null]. Try resetting the supervisor." ).toString() ), result @@ -169,17 +164,15 @@ public class SegmentTransactionalInsertActionTest public void testFailBadVersion() throws Exception { final Task task = NoopTask.create(); - final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.overwriteAction( - null, - ImmutableSet.of(SEGMENT3), - null - ); + final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction + .overwriteAction(null, ImmutableSet.of(SEGMENT3), null); actionTestKit.getTaskLockbox().add(task); acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); - thrown.expect(IllegalStateException.class); - thrown.expectMessage(CoreMatchers.containsString("are not covered by locks")); - SegmentPublishResult result = action.perform(task, actionTestKit.getTaskActionToolbox()); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT3)), result); + IllegalStateException exception = Assert.assertThrows( + IllegalStateException.class, + () -> action.perform(task, actionTestKit.getTaskActionToolbox()) + ); + Assert.assertTrue(exception.getMessage().contains("are not covered by locks")); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index a62d477ef09..f179e4707ec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -984,7 +984,6 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase Collection allUsedSegments = dummyTaskActionClient.submit( new RetrieveUsedSegmentsAction( WIKI, - null, ImmutableList.of(interval), visibility ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java index 4f599e24493..a8ce15538d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java @@ -709,7 +709,6 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase Collection allUsedSegments = dummyTaskActionClient.submit( new RetrieveUsedSegmentsAction( WIKI, - null, ImmutableList.of(interval), visibility ) @@ -829,7 +828,6 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase return dummyTaskActionClient.submit( new RetrieveUsedSegmentsAction( WIKI, - null, ImmutableList.of(Intervals.ETERNITY), Segments.INCLUDING_OVERSHADOWED ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 31a40277b8e..61a57e94842 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -85,9 +85,9 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto } @Override - public List retrieveAllUsedSegments(String dataSource, Segments visibility) + public Set retrieveAllUsedSegments(String dataSource, Segments visibility) { - return ImmutableList.of(); + return ImmutableSet.of(); } @Override @@ -97,13 +97,13 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto } @Override - public List retrieveUsedSegmentsForIntervals( + public Set retrieveUsedSegmentsForIntervals( String dataSource, List intervals, Segments visibility ) { - return ImmutableList.of(); + return ImmutableSet.of(); } @Override diff --git a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java index a9be4c6bf2d..7ba1481e97d 100644 --- a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java +++ b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java @@ -61,7 +61,11 @@ public class ConfigManager private volatile PollingCallable poller; @Inject - public ConfigManager(MetadataStorageConnector dbConnector, Supplier dbTables, Supplier config) + public ConfigManager( + MetadataStorageConnector dbConnector, + Supplier dbTables, + Supplier config + ) { this.dbConnector = dbConnector; this.config = config; diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 9452d19485f..c055a8d9e9f 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -39,32 +39,15 @@ import java.util.Map; import java.util.Set; /** + * Handles metadata transactions performed by the Overlord. */ public interface IndexerMetadataStorageCoordinator { /** - * Retrieve all published segments which may include any data in the interval and are marked as used from the - * metadata store. - * - * The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in - * the collection only once. - * - * @param dataSource The data source to query - * @param interval The interval for which all applicable and used segmented are requested. - * @param visibility Whether only visible or visible as well as overshadowed segments should be returned. The - * visibility is considered within the specified interval: that is, a segment which is visible - * outside of the specified interval, but overshadowed within the specified interval will not be - * returned if {@link Segments#ONLY_VISIBLE} is passed. See more precise description in the doc for - * {@link Segments}. - * @return The DataSegments which include data in the requested interval. These segments may contain data outside the - * requested interval. - * - * @implNote This method doesn't return a {@link Set} because there may be an expectation that {@code Set.contains()} - * is O(1) operation, while it's not the case for the returned collection unless it copies all segments into a new - * {@link java.util.HashSet} or {@link com.google.common.collect.ImmutableSet} which may in turn be unnecessary in - * other use cases. So clients should perform such copy themselves if they need {@link Set} semantics. + * Retrieves all published segments that have partial or complete overlap with + * the given interval and are marked as used. */ - default Collection retrieveUsedSegmentsForInterval( + default Set retrieveUsedSegmentsForInterval( String dataSource, Interval interval, Segments visibility @@ -74,21 +57,16 @@ public interface IndexerMetadataStorageCoordinator } /** - * Retrieve all published used segments in the data source from the metadata store. + * Retrieves all published used segments for the given data source. * - * @param dataSource The data source to query - * - * @return all segments belonging to the given data source - * @see #retrieveUsedSegmentsForInterval(String, Interval, Segments) similar to this method but also accepts data - * interval. + * @see #retrieveUsedSegmentsForInterval(String, Interval, Segments) */ - Collection retrieveAllUsedSegments(String dataSource, Segments visibility); + Set retrieveAllUsedSegments(String dataSource, Segments visibility); /** - * * Retrieve all published segments which are marked as used and the created_date of these segments belonging to the * given data source and list of intervals from the metadata store. - * + *

* Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility" * parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link * Segments#INCLUDING_OVERSHADOWED} was passed. It's the responsibility of the caller to filter out overshadowed ones @@ -99,32 +77,16 @@ public interface IndexerMetadataStorageCoordinator * * @return The DataSegments and the related created_date of segments */ - Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals); + Collection> retrieveUsedSegmentsAndCreatedDates( + String dataSource, + List intervals + ); /** - * Retrieve all published segments which may include any data in the given intervals and are marked as used from the - * metadata store. - *

- * The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in - * the collection only once. - *

- * - * @param dataSource The data source to query - * @param intervals The intervals for which all applicable and used segments are requested. - * @param visibility Whether only visible or visible as well as overshadowed segments should be returned. The - * visibility is considered within the specified intervals: that is, a segment which is visible - * outside of the specified intervals, but overshadowed on the specified intervals will not be - * returned if {@link Segments#ONLY_VISIBLE} is passed. See more precise description in the doc for - * {@link Segments}. - * @return The DataSegments which include data in the requested intervals. These segments may contain data outside the - * requested intervals. - * - * @implNote This method doesn't return a {@link Set} because there may be an expectation that {@code Set.contains()} - * is O(1) operation, while it's not the case for the returned collection unless it copies all segments into a new - * {@link java.util.HashSet} or {@link com.google.common.collect.ImmutableSet} which may in turn be unnecessary in - * other use cases. So clients should perform such copy themselves if they need {@link Set} semantics. + * Retrieves all published segments that have partial or complete overlap with + * the given intervals and are marked as used. */ - Collection retrieveUsedSegmentsForIntervals( + Set retrieveUsedSegmentsForIntervals( String dataSource, List intervals, Segments visibility diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 75d296923fb..fd637728908 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -152,7 +152,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } @Override - public Collection retrieveUsedSegmentsForIntervals( + public Set retrieveUsedSegmentsForIntervals( final String dataSource, final List intervals, final Segments visibility @@ -165,7 +165,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } @Override - public Collection retrieveAllUsedSegments(String dataSource, Segments visibility) + public Set retrieveAllUsedSegments(String dataSource, Segments visibility) { return doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility); } @@ -173,7 +173,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor /** * @param intervals empty list means unrestricted interval. */ - private Collection doRetrieveUsedSegments( + private Set doRetrieveUsedSegments( final String dataSource, final List intervals, final Segments visibility @@ -431,7 +431,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } } - private Collection retrieveAllUsedSegmentsForIntervalsWithHandle( + private Set retrieveAllUsedSegmentsForIntervalsWithHandle( final Handle handle, final String dataSource, final List intervals @@ -440,7 +440,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) .retrieveUsedSegments(dataSource, intervals)) { - final List retVal = new ArrayList<>(); + final Set retVal = new HashSet<>(); iterator.forEachRemaining(retVal::add); return retVal; } @@ -2564,8 +2564,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor * oldCommitMetadata when this function is called (based on T.equals). This method is idempotent in that if * the metadata already equals newCommitMetadata, it will return true. * - * @param handle database handle - * @param dataSource druid dataSource * @param startMetadata dataSource metadata pre-insert must match this startMetadata according to * {@link DataSourceMetadata#matches(DataSourceMetadata)} * @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with @@ -2627,15 +2625,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { // Offsets stored in startMetadata is greater than the last commited metadata. - return new DataStoreMetadataUpdateResult(true, false, - "The new start metadata state[%s] is ahead of the last commited" - + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb + return DataStoreMetadataUpdateResult.failure( + "The new start metadata state[%s] is ahead of the last committed" + + " end state[%s]. Try resetting the supervisor.", + startMetadata, oldCommitMetadataFromDb ); } if (!startMetadataMatchesExisting) { // Not in the desired start state. - return new DataStoreMetadataUpdateResult(true, false, + return DataStoreMetadataUpdateResult.failure( "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.", oldCommitMetadataFromDb, startMetadata ); @@ -2668,11 +2667,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS - : new DataStoreMetadataUpdateResult( - true, - true, - "Failed to insert metadata for datasource [%s]", - dataSource); + : DataStoreMetadataUpdateResult.retryableFailure("Failed to insert metadata for datasource[%s]", dataSource); } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = handle.createStatement( @@ -2692,11 +2687,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS - : new DataStoreMetadataUpdateResult( - true, - true, - "Failed to update metadata for datasource [%s]", - dataSource); + : DataStoreMetadataUpdateResult.retryableFailure("Failed to update metadata for datasource[%s]", dataSource); } if (retVal.isSuccess()) { @@ -2712,19 +2703,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor public boolean deleteDataSourceMetadata(final String dataSource) { return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) - { - int rows = handle.createStatement( - StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) - ) - .bind("dataSource", dataSource) - .execute(); + handle -> { + int rows = handle.createStatement( + StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ).bind("dataSource", dataSource).execute(); - return rows > 0; - } + return rows > 0; } ); } @@ -2767,17 +2751,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor public void updateSegmentMetadata(final Set segments) { connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - for (final DataSegment segment : segments) { - updatePayload(handle, segment); - } - - return null; + (handle, transactionStatus) -> { + for (final DataSegment segment : segments) { + updatePayload(handle, segment); } + + return 0; } ); } @@ -2990,10 +2969,21 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor { private final boolean failed; private final boolean canRetry; - @Nullable private final String errorMsg; + @Nullable + private final String errorMsg; public static final DataStoreMetadataUpdateResult SUCCESS = new DataStoreMetadataUpdateResult(false, false, null); + public static DataStoreMetadataUpdateResult failure(String errorMsgFormat, Object... messageArgs) + { + return new DataStoreMetadataUpdateResult(true, false, errorMsgFormat, messageArgs); + } + + public static DataStoreMetadataUpdateResult retryableFailure(String errorMsgFormat, Object... messageArgs) + { + return new DataStoreMetadataUpdateResult(true, true, errorMsgFormat, messageArgs); + } + DataStoreMetadataUpdateResult(boolean failed, boolean canRetry, @Nullable String errorMsg, Object... errorFormatArgs) { this.failed = failed; @@ -3022,34 +3012,5 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor { return errorMsg; } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DataStoreMetadataUpdateResult that = (DataStoreMetadataUpdateResult) o; - return failed == that.failed && canRetry == that.canRetry && Objects.equals(errorMsg, that.errorMsg); - } - - @Override - public int hashCode() - { - return Objects.hash(failed, canRetry, errorMsg); - } - - @Override - public String toString() - { - return "DataStoreMetadataUpdateResult{" + - "failed=" + failed + - ", canRetry=" + canRetry + - ", errorMsg='" + errorMsg + '\'' + - '}'; - } } } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index cd927b2fef8..2d315d19fc8 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -318,7 +318,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector public void createSegmentTable(final String tableName) { - List columns = new ArrayList<>(); + final List columns = new ArrayList<>(); columns.add("id VARCHAR(255) NOT NULL"); columns.add("dataSource VARCHAR(255) %4$s NOT NULL"); columns.add("created_date VARCHAR(255) NOT NULL"); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index a192111db4a..fa7d037c92c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -254,7 +254,7 @@ public abstract class BaseAppenderatorDriver implements Closeable private static final Logger log = new Logger(BaseAppenderatorDriver.class); private final SegmentAllocator segmentAllocator; - private final UsedSegmentChecker usedSegmentChecker; + private final PublishedSegmentRetriever publishedSegmentRetriever; private final DataSegmentKiller dataSegmentKiller; protected final Appenderator appenderator; @@ -269,13 +269,13 @@ public abstract class BaseAppenderatorDriver implements Closeable BaseAppenderatorDriver( Appenderator appenderator, SegmentAllocator segmentAllocator, - UsedSegmentChecker usedSegmentChecker, + PublishedSegmentRetriever segmentRetriever, DataSegmentKiller dataSegmentKiller ) { this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator"); this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); - this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "segmentRetriever"); + this.publishedSegmentRetriever = Preconditions.checkNotNull(segmentRetriever, "segmentRetriever"); this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller"); this.executor = MoreExecutors.listeningDecorator( Execs.singleThreaded("[" + StringUtils.encodeForFormat(appenderator.getId()) + "]-publish") @@ -665,7 +665,7 @@ public abstract class BaseAppenderatorDriver implements Closeable .map(DataSegment::getId) .collect(Collectors.toSet()); - final Set publishedSegments = usedSegmentChecker.findPublishedSegments(segmentIds); + final Set publishedSegments = publishedSegmentRetriever.findPublishedSegments(segmentIds); if (publishedSegments.equals(ourSegments)) { log.info( "Could not publish [%d] segments, but they have already been published by another task.", diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index e08fcf601df..81a02ab1eec 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -61,20 +61,16 @@ import java.util.stream.Collectors; public class BatchAppenderatorDriver extends BaseAppenderatorDriver { /** - * Create a driver. - * - * @param appenderator appenderator - * @param segmentAllocator segment allocator - * @param usedSegmentChecker used segment checker + * Creates a driver for batch ingestion. */ public BatchAppenderatorDriver( Appenderator appenderator, SegmentAllocator segmentAllocator, - UsedSegmentChecker usedSegmentChecker, + PublishedSegmentRetriever segmentRetriever, DataSegmentKiller dataSegmentKiller ) { - super(appenderator, segmentAllocator, usedSegmentChecker, dataSegmentKiller); + super(appenderator, segmentAllocator, segmentRetriever, dataSegmentKiller); } @Nullable diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UsedSegmentChecker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PublishedSegmentRetriever.java similarity index 96% rename from server/src/main/java/org/apache/druid/segment/realtime/appenderator/UsedSegmentChecker.java rename to server/src/main/java/org/apache/druid/segment/realtime/appenderator/PublishedSegmentRetriever.java index 3ab966009b0..3f2d239f3be 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UsedSegmentChecker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PublishedSegmentRetriever.java @@ -25,7 +25,7 @@ import org.apache.druid.timeline.SegmentId; import java.io.IOException; import java.util.Set; -public interface UsedSegmentChecker +public interface PublishedSegmentRetriever { /** * For any identifiers that exist and are actually used, returns the corresponding DataSegment objects. diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 89115874916..68599dbea11 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -91,7 +91,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver * @param appenderator appenderator * @param segmentAllocator segment allocator * @param handoffNotifierFactory handoff notifier factory - * @param usedSegmentChecker used segment checker + * @param segmentRetriever used segment checker * @param objectMapper object mapper, used for serde of commit metadata * @param metrics Firedepartment metrics */ @@ -99,13 +99,13 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver Appenderator appenderator, SegmentAllocator segmentAllocator, SegmentHandoffNotifierFactory handoffNotifierFactory, - UsedSegmentChecker usedSegmentChecker, + PublishedSegmentRetriever segmentRetriever, DataSegmentKiller dataSegmentKiller, ObjectMapper objectMapper, SegmentGenerationMetrics metrics ) { - super(appenderator, segmentAllocator, usedSegmentChecker, dataSegmentKiller); + super(appenderator, segmentAllocator, segmentRetriever, dataSegmentKiller); this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory") .createSegmentHandoffNotifier(appenderator.getDataSource()); @@ -416,7 +416,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver { return Futures.transformAsync( publish(publisher, committer, sequenceNames), - (AsyncFunction) this::registerHandoff, + this::registerHandoff, MoreExecutors.directExecutor() ); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index a782a72e646..222c1ece89f 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -523,7 +523,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata { metadataUpdateCounter.getAndIncrement(); if (attemptCounter.getAndIncrement() == 0) { - return new DataStoreMetadataUpdateResult(true, true, null); + return DataStoreMetadataUpdateResult.retryableFailure(null); } else { return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); } @@ -593,7 +593,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata Assert.assertEquals( SegmentPublishResult.fail( InvalidInput.exception( - "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited" + "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed" + " end state[null]. Try resetting the supervisor." ).toString()), result1 diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java index cc5a7f282eb..269aeaca7c4 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java @@ -95,7 +95,7 @@ public class ClosedSegmentsSinksBatchAppenderatorDriverTest extends EasyMockSupp driver = new BatchAppenderatorDriver( appenderatorTester.getAppenderator(), allocator, - new TestUsedSegmentChecker(appenderatorTester.getPushedSegments()), + new TestPublishedSegmentRetriever(appenderatorTester.getPushedSegments()), dataSegmentKiller ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java index ba8f097b669..0c6fb552a4d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java @@ -93,7 +93,7 @@ public class OpenAndClosedSegmentsBatchAppenderatorDriverTest extends EasyMockSu driver = new BatchAppenderatorDriver( openAndClosedSegmentsAppenderatorTester.getAppenderator(), allocator, - new TestUsedSegmentChecker(openAndClosedSegmentsAppenderatorTester.getPushedSegments()), + new TestPublishedSegmentRetriever(openAndClosedSegmentsAppenderatorTester.getPushedSegments()), dataSegmentKiller ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index bdc6f64c90a..ed3fe97d6cc 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -133,7 +133,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport createPersistFailAppenderator(), allocator, segmentHandoffNotifierFactory, - new NoopUsedSegmentChecker(), + new NoopPublishedSegmentRetriever(), dataSegmentKiller, OBJECT_MAPPER, new SegmentGenerationMetrics() @@ -171,7 +171,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport createPushFailAppenderator(), allocator, segmentHandoffNotifierFactory, - new NoopUsedSegmentChecker(), + new NoopPublishedSegmentRetriever(), dataSegmentKiller, OBJECT_MAPPER, new SegmentGenerationMetrics() @@ -209,7 +209,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport createDropFailAppenderator(), allocator, segmentHandoffNotifierFactory, - new NoopUsedSegmentChecker(), + new NoopPublishedSegmentRetriever(), dataSegmentKiller, OBJECT_MAPPER, new SegmentGenerationMetrics() @@ -260,7 +260,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport new FailableAppenderator(), allocator, segmentHandoffNotifierFactory, - new NoopUsedSegmentChecker(), + new NoopPublishedSegmentRetriever(), dataSegmentKiller, OBJECT_MAPPER, new SegmentGenerationMetrics() @@ -324,7 +324,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport } } - private static class NoopUsedSegmentChecker implements UsedSegmentChecker + private static class NoopPublishedSegmentRetriever implements PublishedSegmentRetriever { @Override public Set findPublishedSegments(Set identifiers) @@ -338,11 +338,6 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport return new FailableAppenderator().disablePush(); } - static Appenderator createPushInterruptAppenderator() - { - return new FailableAppenderator().interruptPush(); - } - static Appenderator createPersistFailAppenderator() { return new FailableAppenderator().disablePersist(); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index eae1148027d..69528b339ff 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -127,7 +127,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport streamAppenderatorTester.getAppenderator(), allocator, segmentHandoffNotifierFactory, - new TestUsedSegmentChecker(streamAppenderatorTester.getPushedSegments()), + new TestPublishedSegmentRetriever(streamAppenderatorTester.getPushedSegments()), dataSegmentKiller, OBJECT_MAPPER, new SegmentGenerationMetrics() diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestPublishedSegmentRetriever.java similarity index 92% rename from server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java rename to server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestPublishedSegmentRetriever.java index 37612b11776..8042c798a87 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestPublishedSegmentRetriever.java @@ -29,11 +29,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -public class TestUsedSegmentChecker implements UsedSegmentChecker +public class TestPublishedSegmentRetriever implements PublishedSegmentRetriever { private final List pushedSegments; - public TestUsedSegmentChecker(List pushedSegments) + public TestPublishedSegmentRetriever(List pushedSegments) { this.pushedSegments = pushedSegments; }