From a0188192de9804e272fd83fa86fec8a7034caa4b Mon Sep 17 00:00:00 2001 From: Karan Kumar Date: Sat, 25 Nov 2023 13:50:29 +0530 Subject: [PATCH] Fixing failing compaction/parallel index jobs during upgrade due to new actions being available on the overlord. (#15430) * Fixing failing compaction/parallel index jobs during upgrade due to new actions not available on the overlord. * Fixing build * Removing extra space. * Fixing json getter. * Review comments. --- .../MaterializedViewSupervisor.java | 4 ++- .../RetrieveSegmentsToReplaceAction.java | 31 +++++++------------ .../indexing/common/config/TaskConfig.java | 27 +++++++++++++--- .../indexing/input/DruidInputSource.java | 4 +-- .../common/config/TaskConfigBuilder.java | 16 +++++++++- .../ConcurrentReplaceAndAppendTest.java | 2 +- .../indexing/overlord/TaskLifecycleTest.java | 1 + ...TestIndexerMetadataStorageCoordinator.java | 2 +- .../IndexerMetadataStorageCoordinator.java | 6 ++-- .../IndexerSQLMetadataStorageCoordinator.java | 14 +++++---- ...exerSQLMetadataStorageCoordinatorTest.java | 16 +++++----- 11 files changed, 77 insertions(+), 46 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index af5c0fbe95a..33386878015 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -57,6 +57,7 @@ import org.joda.time.Interval; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -371,7 +372,8 @@ public class MaterializedViewSupervisor implements Supervisor // Pair max(created_date), interval -> list> Pair, Map>> baseSegmentsSnapshot = getMaxCreateDateAndBaseSegments( - metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY) + metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), + Collections.singletonList(Intervals.ETERNITY)) ); // baseSegments are used to create HadoopIndexTask Map> baseSegments = baseSegmentsSnapshot.rhs; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java index 78e6ada5c1e..7fec3369a82 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common.actions; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.indexing.common.task.Task; @@ -38,6 +37,7 @@ import org.joda.time.Interval; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -63,20 +63,18 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction intervals; @JsonCreator public RetrieveSegmentsToReplaceAction( @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("intervals") List intervals ) { this.dataSource = dataSource; - this.interval = interval; + this.intervals = intervals; } @JsonProperty @@ -86,9 +84,9 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction getIntervals() { - return interval; + return intervals; } @Override @@ -128,7 +126,7 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction>> intervalToCreatedToSegments = new HashMap<>(); for (Pair segmentAndCreatedDate : - toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval)) { + toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { final DataSegment segment = segmentAndCreatedDate.lhs; final String created = segmentAndCreatedDate.rhs; intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) @@ -165,7 +163,7 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction retrieveAllVisibleSegments(TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE); + .retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE); } @Override @@ -185,25 +183,20 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction usedSegments; - if (toolbox == null) { + if (toolbox == null || !toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) { usedSegments = FutureUtils.getUnchecked( coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)), true @@ -554,7 +554,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI } else { try { usedSegments = toolbox.getTaskActionClient() - .submit(new RetrieveSegmentsToReplaceAction(dataSource, interval)); + .submit(new RetrieveSegmentsToReplaceAction(dataSource, Collections.singletonList(interval))); } catch (IOException e) { LOG.error(e, "Error retrieving the used segments for interval[%s].", interval); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index af920ebbeb7..8b488fff809 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -41,6 +41,7 @@ public class TaskConfigBuilder private Boolean storeEmptyColumns; private boolean enableTaskLevelLogPush; private Long tmpStorageBytesPerTask; + private Boolean enableConcurrentAppendAndReplace; public TaskConfigBuilder setBaseDir(String baseDir) { @@ -132,6 +133,18 @@ public class TaskConfigBuilder return this; } + public TaskConfigBuilder enableConcurrentAppendAndReplace() + { + this.enableConcurrentAppendAndReplace = true; + return this; + } + + public TaskConfigBuilder disableConcurrentAppendAndReplace() + { + this.enableConcurrentAppendAndReplace = false; + return this; + } + public TaskConfig build() { return new TaskConfig( @@ -149,7 +162,8 @@ public class TaskConfigBuilder batchProcessingMode, storeEmptyColumns, enableTaskLevelLogPush, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + enableConcurrentAppendAndReplace ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 1c4b6809c38..7f83a8f0233 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -957,7 +957,7 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase Collection allUsedSegments = taskActionClient.submit( new RetrieveSegmentsToReplaceAction( WIKI, - interval + Collections.singletonList(interval) ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 572364a56a2..627c161863b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -615,6 +615,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest .setDefaultRowFlushBoundary(50000) .setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()) .setTmpStorageBytesPerTask(-1L) + .enableConcurrentAppendAndReplace() .build(); return new TaskToolboxFactory( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 143a74c72cb..bca79a559af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -89,7 +89,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals) { return ImmutableList.of(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 34a55574dce..3cea2e6dd58 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -84,7 +84,7 @@ public interface IndexerMetadataStorageCoordinator /** * * Retrieve all published segments which are marked as used and the created_date of these segments belonging to the - * given data source and interval from the metadata store. + * given data source and list of intervals from the metadata store. * * Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility" * parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link @@ -92,11 +92,11 @@ public interface IndexerMetadataStorageCoordinator * if needed. * * @param dataSource The data source to query - * @param interval The interval to query + * @param intervals The list of interval to query * * @return The DataSegments and the related created_date of segments */ - Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval); + Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals); /** * Retrieve all published segments which may include any data in the given intervals and are marked as used from the diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 612f712c1bb..62f55f96c47 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -174,21 +174,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals) { StringBuilder queryBuilder = new StringBuilder( "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true" ); - final List intervals = new ArrayList<>(); - // Do not need an interval condition if the interval is ETERNITY - if (!Intervals.isEternity(interval)) { - intervals.add(interval); + boolean hasEternityInterval = false; + for (Interval interval : intervals) { + if (Intervals.isEternity(interval)) { + hasEternityInterval = true; + break; + } } SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode( queryBuilder, - intervals, + hasEternityInterval ? Collections.emptyList() : intervals, SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS, connector ); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 5d76296d67b..8e2e7eb747f 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2870,28 +2870,28 @@ public class IndexerSQLMetadataStorageCoordinatorTest insertUsedSegments(ImmutableSet.of(defaultSegment)); List> resultForIntervalOnTheLeft = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2001")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2001"))); Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty()); List> resultForIntervalOnTheRight = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("3000/3001")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("3000/3001"))); Assert.assertTrue(resultForIntervalOnTheRight.isEmpty()); List> resultForExactInterval = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval())); Assert.assertEquals(1, resultForExactInterval.size()); Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs); List> resultForIntervalWithLeftOverlap = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2015-01-02")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2015-01-02"))); Assert.assertEquals(resultForExactInterval, resultForIntervalWithLeftOverlap); List> resultForIntervalWithRightOverlap = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2015-01-01/3000")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2015-01-01/3000"))); Assert.assertEquals(resultForExactInterval, resultForIntervalWithRightOverlap); List> resultForEternity = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.ETERNITY); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.ETERNITY)); Assert.assertEquals(resultForExactInterval, resultForEternity); } @@ -2902,11 +2902,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest insertUsedSegments(ImmutableSet.of(eternitySegment, firstHalfEternityRangeSegment, secondHalfEternityRangeSegment)); List> resultForRandomInterval = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval())); Assert.assertEquals(3, resultForRandomInterval.size()); List> resultForEternity = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), eternitySegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(eternitySegment.getInterval())); Assert.assertEquals(3, resultForEternity.size()); }