From 33d9d9bd74ade384ef5feb31748b989122deb160 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 29 Oct 2021 10:22:25 -0700 Subject: [PATCH] Add rollup config to auto and manual compaction (#11850) * add rollup to auto and manual compaction * add unit tests * add unit tests * add IT * fix checkstyle --- docs/configuration/index.md | 1 + docs/ingestion/compaction.md | 11 +- .../indexing/common/task/CompactionTask.java | 6 +- .../ClientCompactionTaskQuerySerdeTest.java | 10 +- .../task/CompactionTaskParallelRunTest.java | 4 +- .../common/task/CompactionTaskRunTest.java | 10 +- .../common/task/CompactionTaskTest.java | 65 +++++++++-- .../duty/ITAutoCompactionTest.java | 72 ++++++++++--- .../duty/ITAutoCompactionUpgradeTest.java | 2 +- .../wikipedia_index_rollup_queries.json | 56 ++++++++++ .../wikipedia_index_task_no_rollup.json | 76 +++++++++++++ .../ClientCompactionTaskGranularitySpec.java | 19 +++- .../UserCompactionTaskGranularityConfig.java | 17 ++- .../coordinator/duty/CompactSegments.java | 4 +- .../duty/NewestSegmentFirstIterator.java | 64 ++++++----- .../DataSourceCompactionConfigTest.java | 40 ++++++- .../coordinator/duty/CompactSegmentsTest.java | 68 +++++++++++- .../duty/KillCompactionConfigTest.java | 6 +- .../duty/NewestSegmentFirstIteratorTest.java | 18 ++-- .../duty/NewestSegmentFirstPolicyTest.java | 101 ++++++++++++++---- ...rdinatorCompactionConfigsResourceTest.java | 8 +- 21 files changed, 542 insertions(+), 116 deletions(-) create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6f5b0536816..b346f4beedc 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -995,6 +995,7 @@ You can optionally use the `granularitySpec` object to configure the segment gra |Field|Description|Required| |-----|-----------|--------| |`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No| +|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No| > Unlike manual compaction, automatic compaction does not support query granularity. diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index 8ccf9a99a21..88942caed31 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -192,7 +192,8 @@ You can optionally use the `granularitySpec` object to configure the segment gra , "granularitySpec": { "segmentGranularity": , - "queryGranularity": + "queryGranularity": , + "rollup": true } ... ``` @@ -203,8 +204,9 @@ You can optionally use the `granularitySpec` object to configure the segment gra |-----|-----------|--------| |`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No| |`queryGranularity`|Time chunking period for the query granularity. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values. Not supported for automatic compaction.|No| +|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No| -For example, to set the segment granularity to "day" and the query granularity to "hour": +For example, to set the segment granularity to "day", the query granularity to "hour", and enabling rollup: ```json { "type" : "compact", @@ -213,11 +215,12 @@ For example, to set the segment granularity to "day" and the query granularity t "type": "compact", "inputSpec": { "type": "interval", - "interval": "2017-01-01/2018-01-01", + "interval": "2017-01-01/2018-01-01" }, "granularitySpec": { "segmentGranularity":"day", - "queryGranularity":"hour" + "queryGranularity":"hour", + "rollup": true } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index cdb637f9564..e2098e022f3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -227,7 +227,7 @@ public class CompactionTask extends AbstractBatchIndexTask )); } if (granularitySpec == null && segmentGranularity != null) { - this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null); + this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null, null); } else { this.granularitySpec = granularitySpec; } @@ -600,7 +600,7 @@ public class CompactionTask extends AbstractBatchIndexTask dimensionsSpec, metricsSpec, granularitySpec == null - ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null) + ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null) : granularitySpec.withSegmentGranularity(segmentGranularityToUse) ); @@ -729,7 +729,7 @@ public class CompactionTask extends AbstractBatchIndexTask final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec( Preconditions.checkNotNull(granularitySpec.getSegmentGranularity()), queryGranularityToUse, - rollup.get(), + granularitySpec.isRollup() == null ? rollup.get() : granularitySpec.isRollup(), Collections.singletonList(totalInterval) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index fd7b69957a9..8d78d320949 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -116,7 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest 1000, 100 ), - new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR), + new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), ImmutableMap.of("key", "value") ); @@ -202,6 +202,10 @@ public class ClientCompactionTaskQuerySerdeTest query.getGranularitySpec().getSegmentGranularity(), task.getGranularitySpec().getSegmentGranularity() ); + Assert.assertEquals( + query.getGranularitySpec().isRollup(), + task.getGranularitySpec().isRollup() + ); Assert.assertEquals( query.getIoConfig().isDropExisting(), task.getIoConfig().isDropExisting() @@ -264,7 +268,7 @@ public class ClientCompactionTaskQuerySerdeTest null ) ) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true)) .build(); final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( @@ -307,7 +311,7 @@ public class ClientCompactionTaskQuerySerdeTest 1000, 100 ), - new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR), + new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), new HashMap<>() ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 1732b3b31d0..1d4b03a84f1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -463,7 +463,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis // Set the dropExisting flag to true in the IOConfig of the compaction task .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true) .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null)) .build(); final Set compactedSegments = runTask(compactionTask); @@ -496,7 +496,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null)) .build(); final Set compactedSegments = runTask(compactionTask); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 19c6575600a..eebd87592e2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -605,7 +605,7 @@ public class CompactionTaskRunTest extends IngestionTestBase // day segmentGranularity final CompactionTask compactionTask1 = builder .interval(Intervals.of("2014-01-01/2014-01-02")) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, null)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null)) .build(); Pair> resultPair = runTask(compactionTask1); @@ -626,7 +626,7 @@ public class CompactionTaskRunTest extends IngestionTestBase // hour segmentGranularity final CompactionTask compactionTask2 = builder .interval(Intervals.of("2014-01-01/2014-01-02")) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, null)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, null, null)) .build(); resultPair = runTask(compactionTask2); @@ -660,7 +660,7 @@ public class CompactionTaskRunTest extends IngestionTestBase // day queryGranularity final CompactionTask compactionTask1 = builder .interval(Intervals.of("2014-01-01/2014-01-02")) - .granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.SECOND)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.SECOND, null)) .build(); Pair> resultPair = runTask(compactionTask1); @@ -705,7 +705,7 @@ public class CompactionTaskRunTest extends IngestionTestBase // day segmentGranularity and day queryGranularity final CompactionTask compactionTask1 = builder .interval(Intervals.of("2014-01-01/2014-01-02")) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY, null)) .build(); Pair> resultPair = runTask(compactionTask1); @@ -737,7 +737,7 @@ public class CompactionTaskRunTest extends IngestionTestBase final CompactionTask compactionTask1 = builder .interval(Intervals.of("2014-01-01/2014-01-02")) - .granularitySpec(new ClientCompactionTaskGranularitySpec(null, null)) + .granularitySpec(new ClientCompactionTaskGranularitySpec(null, null, null)) .build(); Pair> resultPair = runTask(compactionTask1); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 43d503d3170..e01f047b87c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -385,7 +385,7 @@ public class CompactionTaskTest ); builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder2.tuningConfig(createTuningConfig()); - builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY)); + builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, null)); final CompactionTask taskCreatedWithGranularitySpec = builder2.build(); Assert.assertEquals( taskCreatedWithGranularitySpec.getSegmentGranularity(), @@ -404,7 +404,7 @@ public class CompactionTaskTest builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); builder.segmentGranularity(Granularities.HOUR); - builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY)); + builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY, null)); try { builder.build(); } @@ -433,7 +433,7 @@ public class CompactionTaskTest builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); builder.segmentGranularity(Granularities.HOUR); - builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.DAY)); + builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.DAY, null)); try { builder.build(); } @@ -462,7 +462,7 @@ public class CompactionTaskTest builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); builder.segmentGranularity(Granularities.HOUR); - builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY)); + builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, null)); final CompactionTask taskCreatedWithSegmentGranularity = builder.build(); Assert.assertEquals(Granularities.HOUR, taskCreatedWithSegmentGranularity.getSegmentGranularity()); } @@ -1315,7 +1315,7 @@ public class CompactionTaskTest new PartitionConfigurationManager(TUNING_CONFIG), null, null, - new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null), + new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null), COORDINATOR_CLIENT, segmentCacheManagerFactory, RETRY_POLICY_FACTORY, @@ -1353,7 +1353,7 @@ public class CompactionTaskTest new PartitionConfigurationManager(TUNING_CONFIG), null, null, - new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)), + new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null), COORDINATOR_CLIENT, segmentCacheManagerFactory, RETRY_POLICY_FACTORY, @@ -1391,7 +1391,8 @@ public class CompactionTaskTest null, new ClientCompactionTaskGranularitySpec( new PeriodGranularity(Period.months(3), null, null), - new PeriodGranularity(Period.months(3), null, null) + new PeriodGranularity(Period.months(3), null, null), + null ), COORDINATOR_CLIENT, segmentCacheManagerFactory, @@ -1467,7 +1468,7 @@ public class CompactionTaskTest new PartitionConfigurationManager(TUNING_CONFIG), null, null, - new ClientCompactionTaskGranularitySpec(null, null), + new ClientCompactionTaskGranularitySpec(null, null, null), COORDINATOR_CLIENT, segmentCacheManagerFactory, RETRY_POLICY_FACTORY, @@ -1493,6 +1494,54 @@ public class CompactionTaskTest ); } + @Test + public void testGranularitySpecWithNotNullRollup() + throws IOException, SegmentLoadingException + { + final List ingestionSpecs = CompactionTask.createIngestionSchema( + toolbox, + LockGranularity.TIME_CHUNK, + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), + new PartitionConfigurationManager(TUNING_CONFIG), + null, + null, + new ClientCompactionTaskGranularitySpec(null, null, true), + COORDINATOR_CLIENT, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY, + IOConfig.DEFAULT_DROP_EXISTING + ); + + Assert.assertEquals(6, ingestionSpecs.size()); + for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) { + Assert.assertTrue(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup()); + } + } + + @Test + public void testGranularitySpecWithNullRollup() + throws IOException, SegmentLoadingException + { + final List ingestionSpecs = CompactionTask.createIngestionSchema( + toolbox, + LockGranularity.TIME_CHUNK, + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), + new PartitionConfigurationManager(TUNING_CONFIG), + null, + null, + new ClientCompactionTaskGranularitySpec(null, null, null), + COORDINATOR_CLIENT, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY, + IOConfig.DEFAULT_DROP_EXISTING + ); + Assert.assertEquals(6, ingestionSpecs.size()); + for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) { + //Expect false since rollup value in metadata of existing segments are null + Assert.assertFalse(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup()); + } + } + @Test public void testChooseFinestGranularityWithNulls() { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 83cc761ae1e..495ecf19923 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -19,6 +19,8 @@ package org.apache.druid.tests.coordinator.duty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.data.input.MaxSizeSplitHintSpec; @@ -71,6 +73,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest { private static final Logger LOG = new Logger(ITAutoCompactionTest.class); private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; + private static final String INDEX_TASK_NO_ROLLUP = "/indexer/wikipedia_index_task_no_rollup.json"; + private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000; private static final Period NO_SKIP_OFFSET = Period.seconds(0); @@ -290,7 +294,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); LOG.info("Auto compaction test with YEAR segment granularity"); @@ -307,7 +311,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest newGranularity = Granularities.DAY; // Set dropExisting to true - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); LOG.info("Auto compaction test with DAY segment granularity"); @@ -340,7 +344,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Granularity newGranularity = Granularities.YEAR; // Set dropExisting to false - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false); LOG.info("Auto compaction test with YEAR segment granularity"); @@ -357,7 +361,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest newGranularity = Granularities.DAY; // Set dropExisting to false - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false); LOG.info("Auto compaction test with DAY segment granularity"); @@ -397,7 +401,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); LOG.info("Auto compaction test with YEAR segment granularity"); @@ -439,7 +443,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity. // Now set auto compaction with DAY granularity in the granularitySpec Granularity newGranularity = Granularities.DAY; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -471,7 +475,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity. // Now set auto compaction with DAY granularity in the granularitySpec Granularity newGranularity = Granularities.YEAR; - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null)); forceTriggerAutoCompaction(1); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -495,7 +499,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); List expectedIntervalAfterCompaction = new ArrayList<>(); // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) @@ -521,7 +525,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest newGranularity = Granularities.MONTH; // Set dropExisting to true - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); // Since dropExisting is set to true... // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity expectedIntervalAfterCompaction = new ArrayList<>(); @@ -553,7 +557,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Granularity newGranularity = Granularities.YEAR; // Set dropExisting to false - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false); List expectedIntervalAfterCompaction = new ArrayList<>(); // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) @@ -579,7 +583,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest newGranularity = Granularities.MONTH; // Set dropExisting to false - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false); // Since dropExisting is set to true... // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity expectedIntervalAfterCompaction = new ArrayList<>(); @@ -605,6 +609,38 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } + @Test + public void testAutoCompactionDutyWithRollup() throws Exception + { + loadData(INDEX_TASK_NO_ROLLUP); + try (final Closeable ignored = unloader(fullDatasourceName)) { + Map expectedResult = ImmutableMap.of( + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(null, null, true), + false + ); + forceTriggerAutoCompaction(2); + expectedResult = ImmutableMap.of( + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); + + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + // Verify rollup segments does not get compacted again + forceTriggerAutoCompaction(2); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + private void loadData(String indexTask) throws Exception { String taskSpec = getResourceAsString(indexTask); @@ -625,6 +661,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } private void verifyQuery(String queryResource) throws Exception + { + verifyQuery(queryResource, ImmutableMap.of()); + } + + private void verifyQuery(String queryResource, Map expectedResults) throws Exception { String queryResponseTemplate; try { @@ -634,13 +675,18 @@ public class ITAutoCompactionTest extends AbstractIndexerTest catch (IOException e) { throw new ISE(e, "could not read query file: %s", queryResource); } - queryResponseTemplate = StringUtils.replace( queryResponseTemplate, "%%DATASOURCE%%", fullDatasourceName ); - + for (Map.Entry entry : expectedResults.entrySet()) { + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + entry.getKey(), + jsonMapper.writeValueAsString(entry.getValue()) + ); + } queryHelper.testQueriesFromString(queryResponseTemplate); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java index b9607079618..a4b1ed609cb 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java @@ -96,7 +96,7 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest null, 1 ), - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), + new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), new UserCompactionTaskIOConfig(true), null ); diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json new file mode 100644 index 00000000000..93c58bd9cf7 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json @@ -0,0 +1,56 @@ +[ + { + "description": "rows count", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter": { + "type": "selector", + "dimension": "language", + "value": "en", + "extractionFn": null + }, + "aggregations":[ + { + "type": "count", + "name": "count" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "count":%%EXPECTED_COUNT_RESULT%% + } + } + ] + }, + { + "description": "scan with filter", + "query":{ + "queryType" : "scan", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter": { + "type": "selector", + "dimension": "language", + "value": "en", + "extractionFn": null + }, + "columns": [ + "added" + ], + "resultFormat":"compactedList" + }, + "expectedResults": %%EXPECTED_SCAN_RESULT%%, + "fieldsToTest": ["events"] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json new file mode 100644 index 00000000000..821838b4c18 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json @@ -0,0 +1,76 @@ +{ + "type": "index", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "DAY", + "rollup": false, + "intervals" : [ "2013-08-31/2013-09-02" ] + }, + "parser": { + "parseSpec": { + "format" : "json", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + {"type": "string", "name": "language", "createBitmapIndex": false} + ] + } + } + } + }, + "ioConfig": { + "type": "index", + "firehose": { + "type": "local", + "baseDir": "/resources/data/batch_index/json", + "filter": "wikipedia_index_data*" + } + }, + "tuningConfig": { + "type": "index", + "maxRowsPerSegment": 10, + "awaitSegmentAvailabilityTimeoutMillis": %%SEGMENT_AVAIL_TIMEOUT_MILLIS%% + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java index 74fd559cc34..3ba732cfbf7 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java @@ -40,15 +40,18 @@ public class ClientCompactionTaskGranularitySpec { private final Granularity segmentGranularity; private final Granularity queryGranularity; + private final Boolean rollup; @JsonCreator public ClientCompactionTaskGranularitySpec( @JsonProperty("segmentGranularity") Granularity segmentGranularity, - @JsonProperty("queryGranularity") Granularity queryGranularity + @JsonProperty("queryGranularity") Granularity queryGranularity, + @JsonProperty("rollup") Boolean rollup ) { this.queryGranularity = queryGranularity; this.segmentGranularity = segmentGranularity; + this.rollup = rollup; } @JsonProperty @@ -63,9 +66,15 @@ public class ClientCompactionTaskGranularitySpec return queryGranularity; } + @JsonProperty + public Boolean isRollup() + { + return rollup; + } + public ClientCompactionTaskGranularitySpec withSegmentGranularity(Granularity segmentGranularity) { - return new ClientCompactionTaskGranularitySpec(segmentGranularity, queryGranularity); + return new ClientCompactionTaskGranularitySpec(segmentGranularity, queryGranularity, rollup); } @Override @@ -79,13 +88,14 @@ public class ClientCompactionTaskGranularitySpec } ClientCompactionTaskGranularitySpec that = (ClientCompactionTaskGranularitySpec) o; return Objects.equals(segmentGranularity, that.segmentGranularity) && - Objects.equals(queryGranularity, that.queryGranularity); + Objects.equals(queryGranularity, that.queryGranularity) && + Objects.equals(rollup, that.rollup); } @Override public int hashCode() { - return Objects.hash(segmentGranularity, queryGranularity); + return Objects.hash(segmentGranularity, queryGranularity, rollup); } @Override @@ -94,6 +104,7 @@ public class ClientCompactionTaskGranularitySpec return "ClientCompactionTaskGranularitySpec{" + "segmentGranularity=" + segmentGranularity + ", queryGranularity=" + queryGranularity + + ", rollup=" + rollup + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java index 9623e2a6dab..d6b320a2483 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java @@ -39,15 +39,18 @@ public class UserCompactionTaskGranularityConfig { private final Granularity segmentGranularity; private final Granularity queryGranularity; + private final Boolean rollup; @JsonCreator public UserCompactionTaskGranularityConfig( @JsonProperty("segmentGranularity") Granularity segmentGranularity, - @JsonProperty("queryGranularity") Granularity queryGranularity + @JsonProperty("queryGranularity") Granularity queryGranularity, + @JsonProperty("rollup") Boolean rollup ) { this.queryGranularity = queryGranularity; this.segmentGranularity = segmentGranularity; + this.rollup = rollup; } @JsonProperty("segmentGranularity") @@ -62,6 +65,12 @@ public class UserCompactionTaskGranularityConfig return queryGranularity; } + @JsonProperty("rollup") + public Boolean isRollup() + { + return rollup; + } + @Override public boolean equals(Object o) { @@ -73,13 +82,14 @@ public class UserCompactionTaskGranularityConfig } UserCompactionTaskGranularityConfig that = (UserCompactionTaskGranularityConfig) o; return Objects.equals(segmentGranularity, that.segmentGranularity) && - Objects.equals(queryGranularity, that.queryGranularity); + Objects.equals(queryGranularity, that.queryGranularity) && + Objects.equals(rollup, that.rollup); } @Override public int hashCode() { - return Objects.hash(segmentGranularity, queryGranularity); + return Objects.hash(segmentGranularity, queryGranularity, rollup); } @Override @@ -88,6 +98,7 @@ public class UserCompactionTaskGranularityConfig return "UserCompactionTaskGranularityConfig{" + "segmentGranularity=" + segmentGranularity + ", queryGranularity=" + queryGranularity + + ", rollup=" + rollup + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 9bb6f9c572a..0b2a5864a68 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -345,7 +345,9 @@ public class CompactSegments implements CoordinatorDuty if (config.getGranularitySpec() != null) { queryGranularitySpec = new ClientCompactionTaskGranularitySpec( config.getGranularitySpec().getSegmentGranularity(), - config.getGranularitySpec().getQueryGranularity() + config.getGranularitySpec().getQueryGranularity(), + config.getGranularitySpec().isRollup() + ); } else { queryGranularitySpec = null; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index 3f9ee930105..029117820c9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -36,7 +37,6 @@ import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.server.coordinator.CompactionStatistics; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.CompactionState; @@ -309,7 +309,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator } @VisibleForTesting - static PartitionsSpec findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig) + static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig) { final PartitionsSpec partitionsSpecFromTuningConfig = tuningConfig.getPartitionsSpec(); if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) { @@ -332,7 +332,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator Preconditions.checkState(!candidates.isEmpty(), "Empty candidates"); final ClientCompactionTaskQueryTuningConfig tuningConfig = ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()); - final PartitionsSpec partitionsSpecFromConfig = findPartitinosSpecFromConfig(tuningConfig); + final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig); final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState(); if (lastCompactionState == null) { log.info("Candidate segment[%s] is not compacted yet. Needs compaction.", candidates.segments.get(0).getId()); @@ -384,32 +384,48 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator return true; } - if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) { - // Only checks for segmentGranularity as auto compaction currently only supports segmentGranularity - final Granularity existingSegmentGranularity = lastCompactionState.getGranularitySpec() != null ? - objectMapper.convertValue(lastCompactionState.getGranularitySpec(), GranularitySpec.class).getSegmentGranularity() : - null; - if (existingSegmentGranularity == null) { - // Candidate segments were all compacted without segment granularity set. - // We need to check if all segments have the same segment granularity as the configured segment granularity. - boolean needsCompaction = candidates.segments.stream() - .anyMatch(segment -> !config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval())); - if (needsCompaction) { + if (config.getGranularitySpec() != null) { + + final ClientCompactionTaskGranularitySpec existingGranularitySpec = lastCompactionState.getGranularitySpec() != null ? + objectMapper.convertValue(lastCompactionState.getGranularitySpec(), ClientCompactionTaskGranularitySpec.class) : + null; + // Checks for segmentGranularity + if (config.getGranularitySpec().getSegmentGranularity() != null) { + final Granularity existingSegmentGranularity = existingGranularitySpec != null ? + existingGranularitySpec.getSegmentGranularity() : + null; + if (existingSegmentGranularity == null) { + // Candidate segments were all compacted without segment granularity set. + // We need to check if all segments have the same segment granularity as the configured segment granularity. + boolean needsCompaction = candidates.segments.stream() + .anyMatch(segment -> !config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval())); + if (needsCompaction) { + log.info( + "Segments were previously compacted but without segmentGranularity in auto compaction." + + " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction", + config.getGranularitySpec().getSegmentGranularity() + ); + return true; + } + + } else if (!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity)) { log.info( - "Segments were previously compacted but without segmentGranularity in auto compaction." - + " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction", - config.getGranularitySpec().getSegmentGranularity() + "Configured segmentGranularity[%s] is different from the segmentGranularity[%s] of segments. Needs compaction", + config.getGranularitySpec().getSegmentGranularity(), + existingSegmentGranularity ); return true; } + } - } else if (!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity)) { - log.info( - "Configured segmentGranularity[%s] is different from the segmentGranularity[%s] of segments. Needs compaction", - config.getGranularitySpec().getSegmentGranularity(), - existingSegmentGranularity - ); - return true; + // Checks for rollup + if (config.getGranularitySpec().isRollup() != null) { + final Boolean existingRollup = existingGranularitySpec != null ? + existingGranularitySpec.isRollup() : + null; + if (existingRollup == null || !config.getGranularitySpec().isRollup().equals(existingRollup)) { + return true; + } } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index c798d29e1ea..9ff6eac2f6b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -238,7 +238,7 @@ public class DataSourceCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, ImmutableMap.of("key", "val") ); @@ -265,7 +265,7 @@ public class DataSourceCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH, null), null, ImmutableMap.of("key", "val") ); @@ -308,7 +308,7 @@ public class DataSourceCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(null, null), + new UserCompactionTaskGranularityConfig(null, null, null), null, ImmutableMap.of("key", "val") ); @@ -325,6 +325,36 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); } + @Test + public void testSerdeGranularitySpecWithRollup() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + 500L, + null, + new Period(3600), + null, + new UserCompactionTaskGranularityConfig(null, null, true), + null, + ImmutableMap.of("key", "val") + ); + final String json = OBJECT_MAPPER.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); + Assert.assertNotNull(config.getGranularitySpec()); + Assert.assertNotNull(fromJson.getGranularitySpec()); + Assert.assertEquals(config.getGranularitySpec().isRollup(), fromJson.getGranularitySpec().isRollup()); + } + @Test public void testSerdeIOConfigWithNonNullDropExisting() throws IOException { @@ -335,7 +365,7 @@ public class DataSourceCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), new UserCompactionTaskIOConfig(true), ImmutableMap.of("key", "val") ); @@ -363,7 +393,7 @@ public class DataSourceCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), new UserCompactionTaskIOConfig(null), ImmutableMap.of("key", "val") ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index f4cdd9c9bc7..494f567a47f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -829,7 +829,7 @@ public class CompactSegmentsTest null, null ), - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), + new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), null, null ) @@ -852,7 +852,65 @@ public class CompactSegmentsTest Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); Assert.assertNotNull(actual); - ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null); + ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null); + Assert.assertEquals(expected, actual); + } + + @Test + public void testCompactWithRollupInGranularitySpec() + { + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + final String dataSource = DATA_SOURCE_PREFIX + 0; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, true), + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( + ClientCompactionTaskGranularitySpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + segmentsCaptor.capture(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); + ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); + Assert.assertNotNull(actual); + ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, true); Assert.assertEquals(expected, actual); } @@ -888,7 +946,7 @@ public class CompactSegmentsTest null ), null, - new ClientCompactionTaskGranularitySpec(Granularities.DAY, null), + new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null), null ) ); @@ -923,7 +981,7 @@ public class CompactSegmentsTest null, null ), - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), + new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), null, null ) @@ -951,7 +1009,7 @@ public class CompactSegmentsTest Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); Assert.assertNotNull(actual); - ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null); + ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null); Assert.assertEquals(expected, actual); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java index 4aad5df6033..a7d1d37be16 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java @@ -230,7 +230,7 @@ public class KillCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, ImmutableMap.of("key", "val") ); @@ -242,7 +242,7 @@ public class KillCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, ImmutableMap.of("key", "val") ); @@ -346,7 +346,7 @@ public class KillCompactionConfigTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, ImmutableMap.of("key", "val") ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java index 0cf0dededce..092bc9f4386 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java @@ -97,7 +97,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -137,7 +137,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -177,7 +177,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new DynamicPartitionsSpec(null, 1000L), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -217,7 +217,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new DynamicPartitionsSpec(100, 1000L), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -257,7 +257,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new DynamicPartitionsSpec(100, 1000L), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -297,7 +297,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -337,7 +337,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new DynamicPartitionsSpec(null, Long.MAX_VALUE), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -377,7 +377,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); @@ -417,7 +417,7 @@ public class NewestSegmentFirstIteratorTest ); Assert.assertEquals( new SingleDimensionPartitionsSpec(10000, null, "dim", false), - NewestSegmentFirstIterator.findPartitinosSpecFromConfig( + NewestSegmentFirstIterator.findPartitionsSpecFromConfig( ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()) ) ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 42cc20c2180..b79156fe7d8 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -422,7 +422,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -448,7 +448,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -557,7 +557,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -606,7 +606,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -641,7 +641,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -664,7 +664,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -688,7 +688,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -708,7 +708,7 @@ public class NewestSegmentFirstPolicyTest // Auto compaction config sets segmentGranularity=DAY final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -741,7 +741,7 @@ public class NewestSegmentFirstPolicyTest // Auto compaction config sets segmentGranularity=DAY final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -774,7 +774,7 @@ public class NewestSegmentFirstPolicyTest // Auto compaction config sets segmentGranularity=YEAR final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -797,7 +797,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -817,7 +817,7 @@ public class NewestSegmentFirstPolicyTest // Auto compaction config sets segmentGranularity=YEAR final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -840,7 +840,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -865,6 +865,7 @@ public class NewestSegmentFirstPolicyTest null, DateTimeZone.forTimeZone(TimeZone.getTimeZone("Asia/Bangkok")) ), + null, null ) ) @@ -891,7 +892,7 @@ public class NewestSegmentFirstPolicyTest // Same indexSpec as what is set in the auto compaction config Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); // Same partitionsSpec as what is set in the auto compaction config - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -915,6 +916,7 @@ public class NewestSegmentFirstPolicyTest DateTimes.of("2012-01-02T00:05:00.000Z"), DateTimeZone.UTC ), + null, null ) ) @@ -935,6 +937,66 @@ public class NewestSegmentFirstPolicyTest Assert.assertFalse(iterator.hasNext()); } + @Test + public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentRollup() + { + // Same indexSpec as what is set in the auto compaction config + Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); + // Same partitionsSpec as what is set in the auto compaction config + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + + // Create segments that were compacted (CompactionState != null) and have + // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, + // rollup=true for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, + // and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 + final VersionedIntervalTimeline timeline = createTimeline( + new SegmentGenerateSpec( + Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "false")) + ), + new SegmentGenerateSpec( + Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "true")) + ), + new SegmentGenerateSpec( + Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of()) + ) + ); + + // Auto compaction config sets rollup=true + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + // We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00 and interval 2017-10-03T00:00:00/2017-10-04T00:00:00. + Assert.assertTrue(iterator.hasNext()); + List expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + Assert.assertTrue(iterator.hasNext()); + expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + // No more + Assert.assertFalse(iterator.hasNext()); + } + @Test public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() { @@ -944,7 +1006,7 @@ public class NewestSegmentFirstPolicyTest ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); @@ -971,7 +1033,7 @@ public class NewestSegmentFirstPolicyTest // Different indexSpec as what is set in the auto compaction config IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null); Map newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference>() {}); - PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY final VersionedIntervalTimeline timeline = createTimeline( @@ -995,6 +1057,7 @@ public class NewestSegmentFirstPolicyTest null, DateTimeZone.UTC ), + null, null ) ) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index c130dc3ea4c..353550de46c 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -55,7 +55,7 @@ public class CoordinatorCompactionConfigsResourceTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, ImmutableMap.of("key", "val") ); @@ -150,7 +150,7 @@ public class CoordinatorCompactionConfigsResourceTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true), null, ImmutableMap.of("key", "val") ); @@ -190,7 +190,7 @@ public class CoordinatorCompactionConfigsResourceTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, ImmutableMap.of("key", "val") ); @@ -311,7 +311,7 @@ public class CoordinatorCompactionConfigsResourceTest null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), + new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null), null, ImmutableMap.of("key", "val") );