Support changing query granularity in Auto Compaction (#11856)

* add queryGranularity

* fix checkstyle

* fix test
This commit is contained in:
Maytas Monsereenusorn 2021-11-01 15:18:44 -07:00 committed by GitHub
parent 9bd2ccbb9b
commit ba2874ee1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 145 additions and 21 deletions

View File

@ -995,10 +995,9 @@ You can optionally use the `granularitySpec` object to configure the segment gra
|Field|Description|Required|
|-----|-----------|--------|
|`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No|
> Unlike manual compaction, automatic compaction does not support query granularity.
###### Automatic compaction IOConfig
Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md).

View File

@ -203,7 +203,7 @@ You can optionally use the `granularitySpec` object to configure the segment gra
|Field|Description|Required|
|-----|-----------|--------|
|`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`queryGranularity`|Time chunking period for the query granularity. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values. Not supported for automatic compaction.|No|
|`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No|
For example, to set the segment granularity to "day", the query granularity to "hour", and enabling rollup:

View File

@ -28,11 +28,13 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -73,7 +75,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITAutoCompactionTest.class);
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_TASK_NO_ROLLUP = "/indexer/wikipedia_index_task_no_rollup.json";
private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json";
private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
@ -612,7 +614,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
@Test
public void testAutoCompactionDutyWithRollup() throws Exception
{
loadData(INDEX_TASK_NO_ROLLUP);
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Map<String, Object> expectedResult = ImmutableMap.of(
"%%EXPECTED_COUNT_RESULT%%", 2,
@ -641,7 +645,46 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithQueryGranularity() throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Map<String, Object> expectedResult = ImmutableMap.of(
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, Granularities.DAY, null),
false
);
forceTriggerAutoCompaction(2);
expectedResult = ImmutableMap.of(
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
// Verify rollup segments does not get compacted again
forceTriggerAutoCompaction(2);
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}
private void loadData(String indexTask) throws Exception
{
loadData(indexTask, ImmutableMap.of());
}
private void loadData(String indexTask, Map<String, Object> specs) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
@ -650,6 +693,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
);
for (Map.Entry<String, Object> entry : specs.entrySet()) {
taskSpec = StringUtils.replace(
taskSpec,
entry.getKey(),
jsonMapper.writeValueAsString(entry.getValue())
);
}
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);

View File

@ -39,12 +39,7 @@
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "DAY",
"rollup": false,
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"granularitySpec": %%GRANULARITYSPEC%%,
"parser": {
"parseSpec": {
"format" : "json",

View File

@ -74,11 +74,6 @@ public class DataSourceCompactionConfig
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
this.ioConfig = ioConfig;
if (granularitySpec != null) {
Preconditions.checkArgument(
granularitySpec.getQueryGranularity() == null,
"Auto compaction granularitySpec does not support query granularity value");
}
this.granularitySpec = granularitySpec;
this.taskContext = taskContext;
}

View File

@ -427,6 +427,17 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return true;
}
}
// Checks for queryGranularity
if (config.getGranularitySpec().getQueryGranularity() != null) {
final Granularity existingQueryGranularity = existingGranularitySpec != null ?
existingGranularitySpec.getQueryGranularity() :
null;
if (!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity)) {
return true;
}
}
}
return false;

View File

@ -255,20 +255,34 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
}
@Test(expected = IllegalArgumentException.class)
public void testFailIfGranularitySpecContainsNonDefaultQueryGranularity()
@Test
public void testSerdeGranularitySpecWithQueryGranularity() throws Exception
{
new DataSourceCompactionConfig(
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH, null),
new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null),
null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
Assert.assertNotNull(config.getGranularitySpec());
Assert.assertNotNull(fromJson.getGranularitySpec());
Assert.assertEquals(config.getGranularitySpec().getQueryGranularity(), fromJson.getGranularitySpec().getQueryGranularity());
}
@Test

View File

@ -948,7 +948,7 @@ public class NewestSegmentFirstPolicyTest
// Create segments that were compacted (CompactionState != null) and have
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
// rollup=true for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
// and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00
// and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (queryGranularity was not set during last compaction)
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
@ -997,6 +997,66 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentQueryGranularity()
{
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
// queryGranularity=MINUTE for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
// and queryGranularity=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (queryGranularity was not set during last compaction)
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "day"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "minute"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
new Period("P1D"),
null,
new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
)
);
// Auto compaction config sets queryGranularity=MINUTE
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
// We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00 and interval 2017-10-03T00:00:00/2017-10-04T00:00:00.
Assert.assertTrue(iterator.hasNext());
List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
);
// No more
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
{