Add rollup config to auto and manual compaction (#11850)

* add rollup to auto and manual compaction

* add unit tests

* add unit tests

* add IT

* fix checkstyle
This commit is contained in:
Maytas Monsereenusorn 2021-10-29 10:22:25 -07:00 committed by GitHub
parent a96aed021e
commit 33d9d9bd74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 542 additions and 116 deletions

View File

@ -995,6 +995,7 @@ You can optionally use the `granularitySpec` object to configure the segment gra
|Field|Description|Required|
|-----|-----------|--------|
|`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No|
> Unlike manual compaction, automatic compaction does not support query granularity.

View File

@ -192,7 +192,8 @@ You can optionally use the `granularitySpec` object to configure the segment gra
,
"granularitySpec": {
"segmentGranularity": <time_period>,
"queryGranularity": <time_period>
"queryGranularity": <time_period>,
"rollup": true
}
...
```
@ -203,8 +204,9 @@ You can optionally use the `granularitySpec` object to configure the segment gra
|-----|-----------|--------|
|`segmentGranularity`|Time chunking period for the segment granularity. Defaults to 'null', which preserves the original segment granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`queryGranularity`|Time chunking period for the query granularity. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values. Not supported for automatic compaction.|No|
|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No|
For example, to set the segment granularity to "day" and the query granularity to "hour":
For example, to set the segment granularity to "day", the query granularity to "hour", and enabling rollup:
```json
{
"type" : "compact",
@ -213,11 +215,12 @@ For example, to set the segment granularity to "day" and the query granularity t
"type": "compact",
"inputSpec": {
"type": "interval",
"interval": "2017-01-01/2018-01-01",
"interval": "2017-01-01/2018-01-01"
},
"granularitySpec": {
"segmentGranularity":"day",
"queryGranularity":"hour"
"queryGranularity":"hour",
"rollup": true
}
}
}

View File

@ -227,7 +227,7 @@ public class CompactionTask extends AbstractBatchIndexTask
));
}
if (granularitySpec == null && segmentGranularity != null) {
this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null);
this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null, null);
} else {
this.granularitySpec = granularitySpec;
}
@ -600,7 +600,7 @@ public class CompactionTask extends AbstractBatchIndexTask
dimensionsSpec,
metricsSpec,
granularitySpec == null
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null)
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null)
: granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);
@ -729,7 +729,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(granularitySpec.getSegmentGranularity()),
queryGranularityToUse,
rollup.get(),
granularitySpec.isRollup() == null ? rollup.get() : granularitySpec.isRollup(),
Collections.singletonList(totalInterval)
);

View File

@ -116,7 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000,
100
),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
ImmutableMap.of("key", "value")
);
@ -202,6 +202,10 @@ public class ClientCompactionTaskQuerySerdeTest
query.getGranularitySpec().getSegmentGranularity(),
task.getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(
query.getGranularitySpec().isRollup(),
task.getGranularitySpec().isRollup()
);
Assert.assertEquals(
query.getIoConfig().isDropExisting(),
task.getIoConfig().isDropExisting()
@ -264,7 +268,7 @@ public class ClientCompactionTaskQuerySerdeTest
null
)
)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true))
.build();
final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@ -307,7 +311,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000,
100
),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
new HashMap<>()
);

View File

@ -463,7 +463,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
// Set the dropExisting flag to true in the IOConfig of the compaction task
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
@ -496,7 +496,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);

View File

@ -605,7 +605,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// day segmentGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, null))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@ -626,7 +626,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// hour segmentGranularity
final CompactionTask compactionTask2 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, null))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, null, null))
.build();
resultPair = runTask(compactionTask2);
@ -660,7 +660,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// day queryGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.SECOND))
.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.SECOND, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@ -705,7 +705,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// day segmentGranularity and day queryGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@ -737,7 +737,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(null, null))
.granularitySpec(new ClientCompactionTaskGranularitySpec(null, null, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);

View File

@ -385,7 +385,7 @@ public class CompactionTaskTest
);
builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder2.tuningConfig(createTuningConfig());
builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, null));
final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
Assert.assertEquals(
taskCreatedWithGranularitySpec.getSegmentGranularity(),
@ -404,7 +404,7 @@ public class CompactionTaskTest
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY));
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY, null));
try {
builder.build();
}
@ -433,7 +433,7 @@ public class CompactionTaskTest
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.DAY));
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.DAY, null));
try {
builder.build();
}
@ -462,7 +462,7 @@ public class CompactionTaskTest
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, null));
final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
Assert.assertEquals(Granularities.HOUR, taskCreatedWithSegmentGranularity.getSegmentGranularity());
}
@ -1315,7 +1315,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null),
new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
@ -1353,7 +1353,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)),
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
@ -1391,7 +1391,8 @@ public class CompactionTaskTest
null,
new ClientCompactionTaskGranularitySpec(
new PeriodGranularity(Period.months(3), null, null),
new PeriodGranularity(Period.months(3), null, null)
new PeriodGranularity(Period.months(3), null, null),
null
),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
@ -1467,7 +1468,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(null, null),
new ClientCompactionTaskGranularitySpec(null, null, null),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
@ -1493,6 +1494,54 @@ public class CompactionTaskTest
);
}
@Test
public void testGranularitySpecWithNotNullRollup()
throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, true),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
);
Assert.assertEquals(6, ingestionSpecs.size());
for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
Assert.assertTrue(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
}
}
@Test
public void testGranularitySpecWithNullRollup()
throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
);
Assert.assertEquals(6, ingestionSpecs.size());
for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
//Expect false since rollup value in metadata of existing segments are null
Assert.assertFalse(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
}
}
@Test
public void testChooseFinestGranularityWithNulls()
{

View File

@ -19,6 +19,8 @@
package org.apache.druid.tests.coordinator.duty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
@ -71,6 +73,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITAutoCompactionTest.class);
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_TASK_NO_ROLLUP = "/indexer/wikipedia_index_task_no_rollup.json";
private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
private static final Period NO_SKIP_OFFSET = Period.seconds(0);
@ -290,7 +294,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
LOG.info("Auto compaction test with YEAR segment granularity");
@ -307,7 +311,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.DAY;
// Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
LOG.info("Auto compaction test with DAY segment granularity");
@ -340,7 +344,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
LOG.info("Auto compaction test with YEAR segment granularity");
@ -357,7 +361,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.DAY;
// Set dropExisting to false
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
LOG.info("Auto compaction test with DAY segment granularity");
@ -397,7 +401,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null));
LOG.info("Auto compaction test with YEAR segment granularity");
@ -439,7 +443,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.DAY;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null));
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
@ -471,7 +475,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Segments were compacted and already has DAY granularity since it was initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null));
forceTriggerAutoCompaction(1);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
@ -495,7 +499,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@ -521,7 +525,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.MONTH;
// Set dropExisting to true
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
@ -553,7 +557,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@ -579,7 +583,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
newGranularity = Granularities.MONTH;
// Set dropExisting to false
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
@ -605,6 +609,38 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithRollup() throws Exception
{
loadData(INDEX_TASK_NO_ROLLUP);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Map<String, Object> expectedResult = ImmutableMap.of(
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, null, true),
false
);
forceTriggerAutoCompaction(2);
expectedResult = ImmutableMap.of(
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
// Verify rollup segments does not get compacted again
forceTriggerAutoCompaction(2);
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
@ -625,6 +661,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
private void verifyQuery(String queryResource) throws Exception
{
verifyQuery(queryResource, ImmutableMap.of());
}
private void verifyQuery(String queryResource, Map<String, Object> expectedResults) throws Exception
{
String queryResponseTemplate;
try {
@ -634,13 +675,18 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryResource);
}
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
for (Map.Entry<String, Object> entry : expectedResults.entrySet()) {
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
entry.getKey(),
jsonMapper.writeValueAsString(entry.getValue())
);
}
queryHelper.testQueriesFromString(queryResponseTemplate);
}

View File

@ -96,7 +96,7 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
null,
1
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
new UserCompactionTaskIOConfig(true),
null
);

View File

@ -0,0 +1,56 @@
[
{
"description": "rows count",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter": {
"type": "selector",
"dimension": "language",
"value": "en",
"extractionFn": null
},
"aggregations":[
{
"type": "count",
"name": "count"
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"count":%%EXPECTED_COUNT_RESULT%%
}
}
]
},
{
"description": "scan with filter",
"query":{
"queryType" : "scan",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter": {
"type": "selector",
"dimension": "language",
"value": "en",
"extractionFn": null
},
"columns": [
"added"
],
"resultFormat":"compactedList"
},
"expectedResults": %%EXPECTED_SCAN_RESULT%%,
"fieldsToTest": ["events"]
}
]

View File

@ -0,0 +1,76 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"name": "thetaSketch",
"type": "thetaSketch",
"fieldName": "user"
},
{
"name": "quantilesDoublesSketch",
"type": "quantilesDoublesSketch",
"fieldName": "delta"
},
{
"name": "HLLSketchBuild",
"type": "HLLSketchBuild",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "DAY",
"rollup": false,
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
{"type": "string", "name": "language", "createBitmapIndex": false}
]
}
}
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "local",
"baseDir": "/resources/data/batch_index/json",
"filter": "wikipedia_index_data*"
}
},
"tuningConfig": {
"type": "index",
"maxRowsPerSegment": 10,
"awaitSegmentAvailabilityTimeoutMillis": %%SEGMENT_AVAIL_TIMEOUT_MILLIS%%
}
}
}

View File

@ -40,15 +40,18 @@ public class ClientCompactionTaskGranularitySpec
{
private final Granularity segmentGranularity;
private final Granularity queryGranularity;
private final Boolean rollup;
@JsonCreator
public ClientCompactionTaskGranularitySpec(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") Granularity queryGranularity
@JsonProperty("queryGranularity") Granularity queryGranularity,
@JsonProperty("rollup") Boolean rollup
)
{
this.queryGranularity = queryGranularity;
this.segmentGranularity = segmentGranularity;
this.rollup = rollup;
}
@JsonProperty
@ -63,9 +66,15 @@ public class ClientCompactionTaskGranularitySpec
return queryGranularity;
}
@JsonProperty
public Boolean isRollup()
{
return rollup;
}
public ClientCompactionTaskGranularitySpec withSegmentGranularity(Granularity segmentGranularity)
{
return new ClientCompactionTaskGranularitySpec(segmentGranularity, queryGranularity);
return new ClientCompactionTaskGranularitySpec(segmentGranularity, queryGranularity, rollup);
}
@Override
@ -79,13 +88,14 @@ public class ClientCompactionTaskGranularitySpec
}
ClientCompactionTaskGranularitySpec that = (ClientCompactionTaskGranularitySpec) o;
return Objects.equals(segmentGranularity, that.segmentGranularity) &&
Objects.equals(queryGranularity, that.queryGranularity);
Objects.equals(queryGranularity, that.queryGranularity) &&
Objects.equals(rollup, that.rollup);
}
@Override
public int hashCode()
{
return Objects.hash(segmentGranularity, queryGranularity);
return Objects.hash(segmentGranularity, queryGranularity, rollup);
}
@Override
@ -94,6 +104,7 @@ public class ClientCompactionTaskGranularitySpec
return "ClientCompactionTaskGranularitySpec{" +
"segmentGranularity=" + segmentGranularity +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
}
}

View File

@ -39,15 +39,18 @@ public class UserCompactionTaskGranularityConfig
{
private final Granularity segmentGranularity;
private final Granularity queryGranularity;
private final Boolean rollup;
@JsonCreator
public UserCompactionTaskGranularityConfig(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") Granularity queryGranularity
@JsonProperty("queryGranularity") Granularity queryGranularity,
@JsonProperty("rollup") Boolean rollup
)
{
this.queryGranularity = queryGranularity;
this.segmentGranularity = segmentGranularity;
this.rollup = rollup;
}
@JsonProperty("segmentGranularity")
@ -62,6 +65,12 @@ public class UserCompactionTaskGranularityConfig
return queryGranularity;
}
@JsonProperty("rollup")
public Boolean isRollup()
{
return rollup;
}
@Override
public boolean equals(Object o)
{
@ -73,13 +82,14 @@ public class UserCompactionTaskGranularityConfig
}
UserCompactionTaskGranularityConfig that = (UserCompactionTaskGranularityConfig) o;
return Objects.equals(segmentGranularity, that.segmentGranularity) &&
Objects.equals(queryGranularity, that.queryGranularity);
Objects.equals(queryGranularity, that.queryGranularity) &&
Objects.equals(rollup, that.rollup);
}
@Override
public int hashCode()
{
return Objects.hash(segmentGranularity, queryGranularity);
return Objects.hash(segmentGranularity, queryGranularity, rollup);
}
@Override
@ -88,6 +98,7 @@ public class UserCompactionTaskGranularityConfig
return "UserCompactionTaskGranularityConfig{" +
"segmentGranularity=" + segmentGranularity +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
}
}

View File

@ -345,7 +345,9 @@ public class CompactSegments implements CoordinatorDuty
if (config.getGranularitySpec() != null) {
queryGranularitySpec = new ClientCompactionTaskGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity()
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup()
);
} else {
queryGranularitySpec = null;

View File

@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@ -36,7 +37,6 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
@ -309,7 +309,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
}
@VisibleForTesting
static PartitionsSpec findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
{
final PartitionsSpec partitionsSpecFromTuningConfig = tuningConfig.getPartitionsSpec();
if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) {
@ -332,7 +332,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
final ClientCompactionTaskQueryTuningConfig tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment());
final PartitionsSpec partitionsSpecFromConfig = findPartitinosSpecFromConfig(tuningConfig);
final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig);
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
log.info("Candidate segment[%s] is not compacted yet. Needs compaction.", candidates.segments.get(0).getId());
@ -384,32 +384,48 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return true;
}
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
// Only checks for segmentGranularity as auto compaction currently only supports segmentGranularity
final Granularity existingSegmentGranularity = lastCompactionState.getGranularitySpec() != null ?
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), GranularitySpec.class).getSegmentGranularity() :
null;
if (existingSegmentGranularity == null) {
// Candidate segments were all compacted without segment granularity set.
// We need to check if all segments have the same segment granularity as the configured segment granularity.
boolean needsCompaction = candidates.segments.stream()
.anyMatch(segment -> !config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
if (needsCompaction) {
if (config.getGranularitySpec() != null) {
final ClientCompactionTaskGranularitySpec existingGranularitySpec = lastCompactionState.getGranularitySpec() != null ?
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), ClientCompactionTaskGranularitySpec.class) :
null;
// Checks for segmentGranularity
if (config.getGranularitySpec().getSegmentGranularity() != null) {
final Granularity existingSegmentGranularity = existingGranularitySpec != null ?
existingGranularitySpec.getSegmentGranularity() :
null;
if (existingSegmentGranularity == null) {
// Candidate segments were all compacted without segment granularity set.
// We need to check if all segments have the same segment granularity as the configured segment granularity.
boolean needsCompaction = candidates.segments.stream()
.anyMatch(segment -> !config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
if (needsCompaction) {
log.info(
"Segments were previously compacted but without segmentGranularity in auto compaction."
+ " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction",
config.getGranularitySpec().getSegmentGranularity()
);
return true;
}
} else if (!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity)) {
log.info(
"Segments were previously compacted but without segmentGranularity in auto compaction."
+ " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction",
config.getGranularitySpec().getSegmentGranularity()
"Configured segmentGranularity[%s] is different from the segmentGranularity[%s] of segments. Needs compaction",
config.getGranularitySpec().getSegmentGranularity(),
existingSegmentGranularity
);
return true;
}
}
} else if (!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity)) {
log.info(
"Configured segmentGranularity[%s] is different from the segmentGranularity[%s] of segments. Needs compaction",
config.getGranularitySpec().getSegmentGranularity(),
existingSegmentGranularity
);
return true;
// Checks for rollup
if (config.getGranularitySpec().isRollup() != null) {
final Boolean existingRollup = existingGranularitySpec != null ?
existingGranularitySpec.isRollup() :
null;
if (existingRollup == null || !config.getGranularitySpec().isRollup().equals(existingRollup)) {
return true;
}
}
}

View File

@ -238,7 +238,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);
@ -265,7 +265,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH, null),
null,
ImmutableMap.of("key", "val")
);
@ -308,7 +308,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(null, null),
new UserCompactionTaskGranularityConfig(null, null, null),
null,
ImmutableMap.of("key", "val")
);
@ -325,6 +325,36 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
}
@Test
public void testSerdeGranularitySpecWithRollup() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(null, null, true),
null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
Assert.assertNotNull(config.getGranularitySpec());
Assert.assertNotNull(fromJson.getGranularitySpec());
Assert.assertEquals(config.getGranularitySpec().isRollup(), fromJson.getGranularitySpec().isRollup());
}
@Test
public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
{
@ -335,7 +365,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
new UserCompactionTaskIOConfig(true),
ImmutableMap.of("key", "val")
);
@ -363,7 +393,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
new UserCompactionTaskIOConfig(null),
ImmutableMap.of("key", "val")
);

View File

@ -829,7 +829,7 @@ public class CompactSegmentsTest
null,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
null
)
@ -852,7 +852,65 @@ public class CompactSegmentsTest
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue();
Assert.assertNotNull(actual);
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
Assert.assertEquals(expected, actual);
}
@Test
public void testCompactWithRollupInGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, true),
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<ClientCompactionTaskGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
ClientCompactionTaskGranularitySpec.class);
Mockito.verify(mockIndexingServiceClient).compactSegments(
ArgumentMatchers.anyString(),
segmentsCaptor.capture(),
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue();
Assert.assertNotNull(actual);
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, true);
Assert.assertEquals(expected, actual);
}
@ -888,7 +946,7 @@ public class CompactSegmentsTest
null
),
null,
new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null),
null
)
);
@ -923,7 +981,7 @@ public class CompactSegmentsTest
null,
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
null
)
@ -951,7 +1009,7 @@ public class CompactSegmentsTest
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue();
Assert.assertNotNull(actual);
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
Assert.assertEquals(expected, actual);
}

View File

@ -230,7 +230,7 @@ public class KillCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);
@ -242,7 +242,7 @@ public class KillCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);
@ -346,7 +346,7 @@ public class KillCompactionConfigTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);

View File

@ -97,7 +97,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -137,7 +137,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -177,7 +177,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, 1000L),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -217,7 +217,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -257,7 +257,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -297,7 +297,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -337,7 +337,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -377,7 +377,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);
@ -417,7 +417,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
)
);

View File

@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -422,7 +422,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -448,7 +448,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -557,7 +557,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -606,7 +606,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -641,7 +641,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -664,7 +664,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -688,7 +688,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@ -708,7 +708,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@ -741,7 +741,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@ -774,7 +774,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -797,7 +797,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@ -817,7 +817,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -840,7 +840,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@ -865,6 +865,7 @@ public class NewestSegmentFirstPolicyTest
null,
DateTimeZone.forTimeZone(TimeZone.getTimeZone("Asia/Bangkok"))
),
null,
null
)
)
@ -891,7 +892,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@ -915,6 +916,7 @@ public class NewestSegmentFirstPolicyTest
DateTimes.of("2012-01-02T00:05:00.000Z"),
DateTimeZone.UTC
),
null,
null
)
)
@ -935,6 +937,66 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentRollup()
{
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
// rollup=true for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
// and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "false"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "true"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
new Period("P1D"),
null,
new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
)
);
// Auto compaction config sets rollup=true
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
// We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00 and interval 2017-10-03T00:00:00/2017-10-04T00:00:00.
Assert.assertTrue(iterator.hasNext());
List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
);
// No more
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
{
@ -944,7 +1006,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null))),
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@ -971,7 +1033,7 @@ public class NewestSegmentFirstPolicyTest
// Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null);
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@ -995,6 +1057,7 @@ public class NewestSegmentFirstPolicyTest
null,
DateTimeZone.UTC
),
null,
null
)
)

View File

@ -55,7 +55,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);
@ -150,7 +150,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true),
null,
ImmutableMap.of("key", "val")
);
@ -190,7 +190,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);
@ -311,7 +311,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);