From ba2874ee1f022d483961fe77a44b052b801f8902 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 1 Nov 2021 15:18:44 -0700 Subject: [PATCH] Support changing query granularity in Auto Compaction (#11856) * add queryGranularity * fix checkstyle * fix test --- docs/configuration/index.md | 3 +- docs/ingestion/compaction.md | 2 +- .../duty/ITAutoCompactionTest.java | 54 +++++++++++++++- ...dia_index_task_with_granularity_spec.json} | 7 +-- .../DataSourceCompactionConfig.java | 5 -- .../duty/NewestSegmentFirstIterator.java | 11 ++++ .../DataSourceCompactionConfigTest.java | 22 +++++-- .../duty/NewestSegmentFirstPolicyTest.java | 62 ++++++++++++++++++- 8 files changed, 145 insertions(+), 21 deletions(-) rename integration-tests/src/test/resources/indexer/{wikipedia_index_task_no_rollup.json => wikipedia_index_task_with_granularity_spec.json} (89%) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b346f4beedc..c3eb5761c16 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -995,10 +995,9 @@ You can optionally use the `granularitySpec` object to configure the segment gra |Field|Description|Required| |-----|-----------|--------| |`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No| +|`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No| |`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No| -> Unlike manual compaction, automatic compaction does not support query granularity. - ###### Automatic compaction IOConfig Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md). diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index 88942caed31..fa63a63f42e 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -203,7 +203,7 @@ You can optionally use the `granularitySpec` object to configure the segment gra |Field|Description|Required| |-----|-----------|--------| |`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No| -|`queryGranularity`|Time chunking period for the query granularity. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values. Not supported for automatic compaction.|No| +|`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No| |`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No| For example, to set the segment granularity to "day", the query granularity to "hour", and enabling rollup: diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 495ecf19923..157b8d5fdc4 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -28,11 +28,13 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -73,7 +75,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest { private static final Logger LOG = new Logger(ITAutoCompactionTest.class); private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; - private static final String INDEX_TASK_NO_ROLLUP = "/indexer/wikipedia_index_task_no_rollup.json"; + private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json"; private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000; @@ -612,7 +614,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest @Test public void testAutoCompactionDutyWithRollup() throws Exception { - loadData(INDEX_TASK_NO_ROLLUP); + final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); + Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); + loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); try (final Closeable ignored = unloader(fullDatasourceName)) { Map expectedResult = ImmutableMap.of( "%%EXPECTED_COUNT_RESULT%%", 2, @@ -641,7 +645,46 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } + @Test + public void testAutoCompactionDutyWithQueryGranularity() throws Exception + { + final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); + Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); + loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); + try (final Closeable ignored = unloader(fullDatasourceName)) { + Map expectedResult = ImmutableMap.of( + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(null, Granularities.DAY, null), + false + ); + forceTriggerAutoCompaction(2); + expectedResult = ImmutableMap.of( + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); + + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + // Verify rollup segments does not get compacted again + forceTriggerAutoCompaction(2); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + private void loadData(String indexTask) throws Exception + { + loadData(indexTask, ImmutableMap.of()); + } + + private void loadData(String indexTask, Map specs) throws Exception { String taskSpec = getResourceAsString(indexTask); taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); @@ -650,6 +693,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%", jsonMapper.writeValueAsString("0") ); + for (Map.Entry entry : specs.entrySet()) { + taskSpec = StringUtils.replace( + taskSpec, + entry.getKey(), + jsonMapper.writeValueAsString(entry.getValue()) + ); + } final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_granularity_spec.json similarity index 89% rename from integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json rename to integration-tests/src/test/resources/indexer/wikipedia_index_task_with_granularity_spec.json index 821838b4c18..544d191d925 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_granularity_spec.json @@ -39,12 +39,7 @@ "fieldName": "user" } ], - "granularitySpec": { - "segmentGranularity": "DAY", - "queryGranularity": "DAY", - "rollup": false, - "intervals" : [ "2013-08-31/2013-09-02" ] - }, + "granularitySpec": %%GRANULARITYSPEC%%, "parser": { "parseSpec": { "format" : "json", diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index e7a4240b06d..dfe1f5775de 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -74,11 +74,6 @@ public class DataSourceCompactionConfig this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest; this.tuningConfig = tuningConfig; this.ioConfig = ioConfig; - if (granularitySpec != null) { - Preconditions.checkArgument( - granularitySpec.getQueryGranularity() == null, - "Auto compaction granularitySpec does not support query granularity value"); - } this.granularitySpec = granularitySpec; this.taskContext = taskContext; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index 029117820c9..4225fecc7f0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -427,6 +427,17 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator return true; } } + + // Checks for queryGranularity + if (config.getGranularitySpec().getQueryGranularity() != null) { + + final Granularity existingQueryGranularity = existingGranularitySpec != null ? + existingGranularitySpec.getQueryGranularity() : + null; + if (!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity)) { + return true; + } + } } return false; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 9ff6eac2f6b..4f8fd12635c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -255,20 +255,34 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); } - @Test(expected = IllegalArgumentException.class) - public void testFailIfGranularitySpecContainsNonDefaultQueryGranularity() + @Test + public void testSerdeGranularitySpecWithQueryGranularity() throws Exception { - new DataSourceCompactionConfig( + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( "dataSource", null, 500L, null, new Period(3600), null, - new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH, null), + new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null), null, ImmutableMap.of("key", "val") ); + final String json = OBJECT_MAPPER.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); + Assert.assertNotNull(config.getGranularitySpec()); + Assert.assertNotNull(fromJson.getGranularitySpec()); + Assert.assertEquals(config.getGranularitySpec().getQueryGranularity(), fromJson.getGranularitySpec().getQueryGranularity()); } @Test diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index b79156fe7d8..a202b6dd1a0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -948,7 +948,7 @@ public class NewestSegmentFirstPolicyTest // Create segments that were compacted (CompactionState != null) and have // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // rollup=true for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, - // and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 + // and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (queryGranularity was not set during last compaction) final VersionedIntervalTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), @@ -997,6 +997,66 @@ public class NewestSegmentFirstPolicyTest Assert.assertFalse(iterator.hasNext()); } + @Test + public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentQueryGranularity() + { + // Same indexSpec as what is set in the auto compaction config + Map indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference>() {}); + // Same partitionsSpec as what is set in the auto compaction config + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + + // Create segments that were compacted (CompactionState != null) and have + // queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, + // queryGranularity=MINUTE for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, + // and queryGranularity=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (queryGranularity was not set during last compaction) + final VersionedIntervalTimeline timeline = createTimeline( + new SegmentGenerateSpec( + Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "day")) + ), + new SegmentGenerateSpec( + Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "minute")) + ), + new SegmentGenerateSpec( + Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), + new Period("P1D"), + null, + new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of()) + ) + ); + + // Auto compaction config sets queryGranularity=MINUTE + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + // We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00 and interval 2017-10-03T00:00:00/2017-10-04T00:00:00. + Assert.assertTrue(iterator.hasNext()); + List expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + Assert.assertTrue(iterator.hasNext()); + expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next()) + ); + // No more + Assert.assertFalse(iterator.hasNext()); + } + @Test public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() {