From c25a5568275c506115914cc5622a829c89f8e384 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 15 Apr 2022 15:47:47 -0700 Subject: [PATCH] Fix bug in auto compaction preserveExistingMetrics feature (#12438) * fix bug * fix test * fix IT --- .../duty/ITAutoCompactionTest.java | 96 ++++++++++++++ .../incremental/OnheapIncrementalIndex.java | 10 +- .../segment/data/IncrementalIndexTest.java | 69 ++++++++++ ...ClientCompactionTaskQueryTuningConfig.java | 7 +- .../coordinator/duty/CompactSegments.java | 2 +- .../duty/NewestSegmentFirstIterator.java | 2 +- .../coordinator/duty/CompactSegmentsTest.java | 125 ++++++++++++++++++ .../duty/NewestSegmentFirstIteratorTest.java | 18 +-- .../duty/NewestSegmentFirstPolicyTest.java | 26 ++-- 9 files changed, 323 insertions(+), 32 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 7dd7591615c..32d563f25b2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; @@ -121,6 +122,101 @@ public class ITAutoCompactionTest extends AbstractIndexerTest fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix(); } + @Test + public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetricsUsingAggregatorWithDifferentReturnType() throws Exception + { + // added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2 + loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS); + // added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null + loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 2 segments across 1 days... + verifySegmentsCount(2); + ArrayList nullList = new ArrayList(); + nullList.add(null); + Map queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "count", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "sum_added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%QUANTILESRESULT%%", 2, + "%%THETARESULT%%", 2.0, + "%%HLLRESULT%%", 2 + ); + verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields); + + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(null, null, true), + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), + null, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + // FloatSumAggregator combine method takes in two Float but return Double + new FloatSumAggregatorFactory("sum_added", "added"), + new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), + new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false), + new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L) + }, + false + ); + // should now only have 1 row after compaction + // added = null, count = 3, sum_added = 93.0 + forceTriggerAutoCompaction(1); + + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "count", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "sum_added", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93.0f)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%QUANTILESRESULT%%", 3, + "%%THETARESULT%%", 3.0, + "%%HLLRESULT%%", 3 + ); + verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields); + + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + // Verify rollup segments does not get compacted again + forceTriggerAutoCompaction(1); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + @Test public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index a28a0a620f4..f521c3913b0 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -452,13 +452,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override public float getMetricFloatValue(int rowOffset, int aggOffset) { - return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat); + return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue(); } @Override public long getMetricLongValue(int rowOffset, int aggOffset) { - return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong); + return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue(); } @Override @@ -470,7 +470,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override protected double getMetricDoubleValue(int rowOffset, int aggOffset) { - return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble); + return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue(); } @Override @@ -544,7 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex * If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed */ - private T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function getMetricTypeFunction) + private Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function getMetricTypeFunction) { if (preserveExistingMetrics) { // Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values @@ -564,7 +564,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex AggregatorFactory aggregatorFactory = metrics[aggOffset]; T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]); T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]); - return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined); + return aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined); } } else { // If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java index 3257ec6d49a..77309ff2777 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java @@ -50,6 +50,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; +import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.filter.BoundDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; @@ -799,6 +800,74 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest } } + @Test + public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetricUsingAggregatorWithDifferentReturnType() throws IndexSizeExceededException + { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + // FloatSumAggregator combine method takes in two Float but return Double + new FloatSumAggregatorFactory("sum_of_x", "x") + }; + final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "x", 2) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "x", 3) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5) + ) + ); + + Assert.assertEquals(index.isRollup() ? 1 : 4, index.size()); + Iterator iterator = index.iterator(); + int rowCount = 0; + while (iterator.hasNext()) { + rowCount++; + Row row = iterator.next(); + Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch()); + if (index.isRollup()) { + // All rows are rollup into one row + Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue()); + Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue()); + } else { + // We still have 4 rows + if (rowCount == 1 || rowCount == 2) { + Assert.assertEquals(1, row.getMetric("count").intValue()); + Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue()); + } else { + if (isPreserveExistingMetrics) { + Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue()); + Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue()); + } else { + Assert.assertEquals(1, row.getMetric("count").intValue()); + // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true) + Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0.0f, row.getMetric("sum_of_x")); + } + } + } + } + } + @Test public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException { diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java index f35257e244a..374f61c5127 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java @@ -79,13 +79,14 @@ public class ClientCompactionTaskQueryTuningConfig public static ClientCompactionTaskQueryTuningConfig from( @Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig, - @Nullable Integer maxRowsPerSegment + @Nullable Integer maxRowsPerSegment, + @Nullable Boolean preserveExistingMetrics ) { if (userCompactionTaskQueryTuningConfig == null) { return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, - new OnheapIncrementalIndex.Spec(true), + new OnheapIncrementalIndex.Spec(preserveExistingMetrics), null, null, null, @@ -107,7 +108,7 @@ public class ClientCompactionTaskQueryTuningConfig } else { AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null ? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() - : new OnheapIncrementalIndex.Spec(true); + : new OnheapIncrementalIndex.Spec(preserveExistingMetrics); return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, appendableIndexSpecToUse, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index ddae02298bd..c809e8168f5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -452,7 +452,7 @@ public class CompactSegments implements CoordinatorCustomDuty "coordinator-issued", segmentsToCompact, config.getTaskPriority(), - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null), granularitySpec, dimensionsSpec, config.getMetricsSpec(), diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index 8e2f5f3d2c7..4f2f1afca58 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -338,7 +338,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator { Preconditions.checkState(!candidates.isEmpty(), "Empty candidates"); final ClientCompactionTaskQueryTuningConfig tuningConfig = - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()); + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null); final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig); final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState(); if (lastCompactionState == null) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 2b91a89abbd..08b32feff34 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -69,6 +69,7 @@ import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -1741,6 +1742,130 @@ public class CompactSegmentsTest Assert.assertEquals(expected, actual); } + @Test + public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + null, + new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, + null, + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass( + ClientCompactionTaskQueryTuningConfig.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt(), + clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue()); + Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()); + Assert.assertTrue(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics()); + } + + @Test + public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null, + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass( + ClientCompactionTaskQueryTuningConfig.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt(), + clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue()); + Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()); + Assert.assertFalse(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics()); + } + private void verifySnapshot( CompactSegments compactSegments, AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java index fd7549c6298..cbcd1b906bb 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java @@ -101,7 +101,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -145,7 +145,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -189,7 +189,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new DynamicPartitionsSpec(null, 1000L), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -233,7 +233,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new DynamicPartitionsSpec(100, 1000L), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -277,7 +277,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new DynamicPartitionsSpec(100, 1000L), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -321,7 +321,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -365,7 +365,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -409,7 +409,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } @@ -453,7 +453,7 @@ public class NewestSegmentFirstIteratorTest Assert.assertEquals( new SingleDimensionPartitionsSpec(10000, null, "dim", false), NewestSegmentFirstIterator.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) + ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null) ) ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 20268731032..1b7570a9721 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -700,7 +700,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -733,7 +733,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -766,7 +766,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -809,7 +809,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -852,7 +852,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -904,7 +904,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -955,7 +955,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1015,7 +1015,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have // queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1075,7 +1075,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have // Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1174,7 +1174,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have // filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1298,7 +1298,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have // metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, @@ -1447,7 +1447,7 @@ public class NewestSegmentFirstPolicyTest // Different indexSpec as what is set in the auto compaction config IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null); Map newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference>() {}); - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -1496,7 +1496,7 @@ public class NewestSegmentFirstPolicyTest public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() { NullHandling.initializeForTests(); - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); final VersionedIntervalTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),