Add query granularity to compaction task (#10900)

* add query granularity to compaction task

* fix checkstyle

* fix checkstyle

* fix test

* fix test

* add tests

* fix test

* fix test

* cleanup

* rename class

* fix test

* fix test

* add test

* fix test
This commit is contained in:
Maytas Monsereenusorn 2021-03-02 11:23:52 -08:00 committed by GitHub
parent 05e8f8fe06
commit b7b0ee8362
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 957 additions and 235 deletions

View File

@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.base.Verify; import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSource;
@ -90,6 +91,7 @@ import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -141,7 +143,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable @Nullable
private final Granularity segmentGranularity; private final Granularity segmentGranularity;
@Nullable @Nullable
private final GranularitySpec granularitySpec; private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable @Nullable
private final ParallelIndexTuningConfig tuningConfig; private final ParallelIndexTuningConfig tuningConfig;
@JsonIgnore @JsonIgnore
@ -175,7 +177,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Deprecated @Nullable final Granularity segmentGranularity, @JsonProperty("segmentGranularity") @Deprecated @Nullable final Granularity segmentGranularity,
@JsonProperty("granularitySpec") @Nullable final GranularitySpec granularitySpec, @JsonProperty("granularitySpec") @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig, @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context, @JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory, @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@ -206,12 +208,7 @@ public class CompactionTask extends AbstractBatchIndexTask
this.metricsSpec = metricsSpec; this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity; this.segmentGranularity = segmentGranularity;
if (granularitySpec == null && segmentGranularity != null) { if (granularitySpec == null && segmentGranularity != null) {
this.granularitySpec = new UniformGranularitySpec( this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null);
segmentGranularity,
null,
null,
null
);
} else { } else {
this.granularitySpec = granularitySpec; this.granularitySpec = granularitySpec;
} }
@ -306,7 +303,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonProperty @JsonProperty
@Nullable @Nullable
public GranularitySpec getGranularitySpec() public ClientCompactionTaskGranularitySpec getGranularitySpec()
{ {
return granularitySpec; return granularitySpec;
} }
@ -368,7 +365,7 @@ public class CompactionTask extends AbstractBatchIndexTask
partitionConfigurationManager, partitionConfigurationManager,
dimensionsSpec, dimensionsSpec,
metricsSpec, metricsSpec,
getSegmentGranularity(), granularitySpec,
toolbox.getCoordinatorClient(), toolbox.getCoordinatorClient(),
segmentLoaderFactory, segmentLoaderFactory,
retryPolicyFactory retryPolicyFactory
@ -476,7 +473,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final PartitionConfigurationManager partitionConfigurationManager, final PartitionConfigurationManager partitionConfigurationManager,
@Nullable final DimensionsSpec dimensionsSpec, @Nullable final DimensionsSpec dimensionsSpec,
@Nullable final AggregatorFactory[] metricsSpec, @Nullable final AggregatorFactory[] metricsSpec,
@Nullable final Granularity segmentGranularity, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient, final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory, final SegmentLoaderFactory segmentLoaderFactory,
final RetryPolicyFactory retryPolicyFactory final RetryPolicyFactory retryPolicyFactory
@ -504,7 +501,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final ParallelIndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(); final ParallelIndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
if (segmentGranularity == null) { if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
// original granularity // original granularity
final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>( final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd() Comparators.intervalsByStartThenEnd()
@ -539,12 +536,15 @@ public class CompactionTask extends AbstractBatchIndexTask
for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) { for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs; final Interval interval = entry.lhs;
final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs; final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
// If granularitySpec is not null, then set segmentGranularity. Otherwise,
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
final DataSchema dataSchema = createDataSchema( final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource, segmentProvider.dataSource,
segmentsToCompact, segmentsToCompact,
dimensionsSpec, dimensionsSpec,
metricsSpec, metricsSpec,
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity() granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null) : granularitySpec.withSegmentGranularity(segmentGranularityToUse)
); );
specs.add( specs.add(
@ -571,7 +571,7 @@ public class CompactionTask extends AbstractBatchIndexTask
queryableIndexAndSegments, queryableIndexAndSegments,
dimensionsSpec, dimensionsSpec,
metricsSpec, metricsSpec,
segmentGranularity granularitySpec
); );
return Collections.singletonList( return Collections.singletonList(
@ -639,7 +639,7 @@ public class CompactionTask extends AbstractBatchIndexTask
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments, List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
@Nullable DimensionsSpec dimensionsSpec, @Nullable DimensionsSpec dimensionsSpec,
@Nullable AggregatorFactory[] metricsSpec, @Nullable AggregatorFactory[] metricsSpec,
Granularity segmentGranularity @Nonnull ClientCompactionTaskGranularitySpec granularitySpec
) )
{ {
// check index metadata & // check index metadata &
@ -648,15 +648,22 @@ public class CompactionTask extends AbstractBatchIndexTask
final SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>(); final SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>();
decideRollupAndQueryGranularityCarryOver(rollup, queryGranularity, queryableIndexAndSegments); decideRollupAndQueryGranularityCarryOver(rollup, queryGranularity, queryableIndexAndSegments);
// find granularity spec
final Interval totalInterval = JodaUtils.umbrellaInterval( final Interval totalInterval = JodaUtils.umbrellaInterval(
queryableIndexAndSegments.stream().map(p -> p.rhs.getInterval()).collect(Collectors.toList()) queryableIndexAndSegments.stream().map(p -> p.rhs.getInterval()).collect(Collectors.toList())
); );
final GranularitySpec granularitySpec = new UniformGranularitySpec( final Granularity queryGranularityToUse;
Preconditions.checkNotNull(segmentGranularity), if (granularitySpec.getQueryGranularity() == null) {
queryGranularity.get(), queryGranularityToUse = queryGranularity.get();
log.info("Generate compaction task spec with segments original query granularity [%s]", queryGranularityToUse);
} else {
queryGranularityToUse = granularitySpec.getQueryGranularity();
log.info("Generate compaction task spec with new query granularity overrided from input [%s]", queryGranularityToUse);
}
final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(granularitySpec.getSegmentGranularity()),
queryGranularityToUse,
rollup.get(), rollup.get(),
Collections.singletonList(totalInterval) Collections.singletonList(totalInterval)
); );
@ -675,7 +682,7 @@ public class CompactionTask extends AbstractBatchIndexTask
new TimestampSpec(null, null, null), new TimestampSpec(null, null, null),
finalDimensionsSpec, finalDimensionsSpec,
finalMetricsSpec, finalMetricsSpec,
granularitySpec, uniformGranularitySpec,
null null
); );
} }
@ -963,7 +970,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable @Nullable
private Granularity segmentGranularity; private Granularity segmentGranularity;
@Nullable @Nullable
private GranularitySpec granularitySpec; private ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable @Nullable
private TuningConfig tuningConfig; private TuningConfig tuningConfig;
@Nullable @Nullable
@ -1014,7 +1021,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return this; return this;
} }
public Builder granularitySpec(GranularitySpec granularitySpec) public Builder granularitySpec(ClientCompactionTaskGranularitySpec granularitySpec)
{ {
this.granularitySpec = granularitySpec; this.granularitySpec = granularitySpec;
return this; return this;

View File

@ -27,8 +27,8 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient;
@ -52,7 +52,6 @@ import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
@ -116,7 +115,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000, 1000,
100 100
), ),
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true), new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR),
ImmutableMap.of("key", "value") ImmutableMap.of("key", "value")
); );
@ -190,6 +189,10 @@ public class ClientCompactionTaskQuerySerdeTest
query.getTuningConfig().getTotalNumMergeTasks().intValue(), query.getTuningConfig().getTotalNumMergeTasks().intValue(),
task.getTuningConfig().getTotalNumMergeTasks() task.getTuningConfig().getTotalNumMergeTasks()
); );
Assert.assertEquals(
query.getGranularitySpec(),
task.getGranularitySpec()
);
Assert.assertEquals( Assert.assertEquals(
query.getGranularitySpec().getQueryGranularity(), query.getGranularitySpec().getQueryGranularity(),
task.getGranularitySpec().getQueryGranularity() task.getGranularitySpec().getQueryGranularity()
@ -198,10 +201,6 @@ public class ClientCompactionTaskQuerySerdeTest
query.getGranularitySpec().getSegmentGranularity(), query.getGranularitySpec().getSegmentGranularity(),
task.getGranularitySpec().getSegmentGranularity() task.getGranularitySpec().getSegmentGranularity()
); );
Assert.assertEquals(
query.getGranularitySpec().isRollup(),
task.getGranularitySpec().isRollup()
);
Assert.assertEquals(query.getContext(), task.getContext()); Assert.assertEquals(query.getContext(), task.getContext());
} }
@ -259,7 +258,7 @@ public class ClientCompactionTaskQuerySerdeTest
null null
) )
) )
.granularitySpec(new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, null)) .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR))
.build(); .build();
final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@ -301,7 +300,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000, 1000,
100 100
), ),
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true), new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR),
new HashMap<>() new HashMap<>()
); );

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.io.Files; import com.google.common.io.Files;
import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CSVParseSpec;
@ -586,6 +587,183 @@ public class CompactionTaskRunTest extends IngestionTestBase
} }
} }
@Test
public void testWithGranularitySpecNonNullSegmentGranularityAndNullQueryGranularity() throws Exception
{
runIndexTask();
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
// day segmentGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec());
Assert.assertEquals(
getDefaultCompactionState(Granularities.DAY, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
segments.get(0).getLastCompactionState()
);
// hour segmentGranularity
final CompactionTask compactionTask2 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, null))
.build();
resultPair = runTask(compactionTask2);
Assert.assertTrue(resultPair.lhs.isSuccess());
segments = resultPair.rhs;
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
segments.get(i).getLastCompactionState()
);
}
}
@Test
public void testWithGranularitySpecNonNullQueryGranularityAndNullSegmentGranularity() throws Exception
{
runIndexTask();
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
// day queryGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.SECOND))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.SECOND, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
segments.get(i).getLastCompactionState()
);
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
}
@Test
public void testWithGranularitySpecNonNullQueryGranularityAndNonNullSegmentGranularity() throws Exception
{
runIndexTask();
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
// day segmentGranularity and day queryGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec());
Assert.assertEquals(
getDefaultCompactionState(Granularities.DAY, Granularities.DAY, ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
segments.get(0).getLastCompactionState()
);
}
@Test
public void testWithGranularitySpecNullQueryGranularityAndNullSegmentGranularity() throws Exception
{
runIndexTask();
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.granularitySpec(new ClientCompactionTaskGranularitySpec(null, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
segments.get(i).getLastCompactionState()
);
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
}
@Test @Test
public void testCompactThenAppend() throws Exception public void testCompactThenAppend() throws Exception
{ {

View File

@ -36,6 +36,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.common.guava.SettableSupplier;
@ -376,7 +377,7 @@ public class CompactionTaskTest
); );
builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder2.tuningConfig(createTuningConfig()); builder2.tuningConfig(createTuningConfig());
builder2.granularitySpec(new UniformGranularitySpec(Granularities.HOUR, Granularities.DAY, null)); builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
final CompactionTask taskCreatedWithGranularitySpec = builder2.build(); final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(), taskCreatedWithSegmentGranularity.getSegmentGranularity()); Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(), taskCreatedWithSegmentGranularity.getSegmentGranularity());
} }
@ -392,7 +393,7 @@ public class CompactionTaskTest
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig()); builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR); builder.segmentGranularity(Granularities.HOUR);
builder.granularitySpec(new UniformGranularitySpec(Granularities.MINUTE, Granularities.DAY, null)); builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY));
final CompactionTask taskCreatedWithSegmentGranularity = builder.build(); final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
Assert.assertEquals(Granularities.MINUTE, taskCreatedWithSegmentGranularity.getSegmentGranularity()); Assert.assertEquals(Granularities.MINUTE, taskCreatedWithSegmentGranularity.getSegmentGranularity());
} }
@ -636,7 +637,8 @@ public class CompactionTaskTest
expectedDimensionsSpec, expectedDimensionsSpec,
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
); );
} }
@ -706,7 +708,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
tuningConfig, tuningConfig,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
); );
} }
@ -776,7 +779,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
tuningConfig, tuningConfig,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
); );
} }
@ -846,7 +850,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
tuningConfig, tuningConfig,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
); );
} }
@ -907,7 +912,8 @@ public class CompactionTaskTest
dimensionsSpecs, dimensionsSpecs,
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
); );
} }
@ -948,7 +954,8 @@ public class CompactionTaskTest
expectedDimensionsSpec, expectedDimensionsSpec,
Arrays.asList(customMetricsSpec), Arrays.asList(customMetricsSpec),
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
); );
} }
@ -981,7 +988,8 @@ public class CompactionTaskTest
expectedDimensionsSpec, expectedDimensionsSpec,
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
); );
} }
@ -1050,7 +1058,7 @@ public class CompactionTaskTest
} }
@Test @Test
public void testSegmentGranularity() throws IOException, SegmentLoadingException public void testSegmentGranularityAndNullQueryGranularity() throws IOException, SegmentLoadingException
{ {
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema( final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox, toolbox,
@ -1059,7 +1067,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(TUNING_CONFIG), new PartitionConfigurationManager(TUNING_CONFIG),
null, null,
null, null,
new PeriodGranularity(Period.months(3), null, null), new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null),
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY
@ -1080,12 +1088,86 @@ public class CompactionTaskTest
expectedDimensionsSpec, expectedDimensionsSpec,
AGGREGATORS, AGGREGATORS,
Collections.singletonList(COMPACTION_INTERVAL), Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null),
Granularities.NONE
);
}
@Test
public void testQueryGranularityAndNullSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)),
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
new PeriodGranularity(Period.months(3), null, null) new PeriodGranularity(Period.months(3), null, null)
); );
} }
@Test @Test
public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingException public void testQueryGranularityAndSegmentGranularityNonNull() throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(
new PeriodGranularity(Period.months(3), null, null),
new PeriodGranularity(Period.months(3), null, null)
),
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null),
new PeriodGranularity(Period.months(3), null, null)
);
}
@Test
public void testNullGranularitySpec() throws IOException, SegmentLoadingException
{ {
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema( final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox, toolbox,
@ -1113,7 +1195,42 @@ public class CompactionTaskTest
expectedDimensionsSpec, expectedDimensionsSpec,
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH Granularities.MONTH,
Granularities.NONE
);
}
@Test
public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
new ClientCompactionTaskGranularitySpec(null, null),
COORDINATOR_CLIENT,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
Granularities.NONE
); );
} }
@ -1220,7 +1337,8 @@ public class CompactionTaskTest
List<DimensionsSpec> expectedDimensionsSpecs, List<DimensionsSpec> expectedDimensionsSpecs,
List<AggregatorFactory> expectedMetricsSpec, List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals, List<Interval> expectedSegmentIntervals,
Granularity expectedSegmentGranularity Granularity expectedSegmentGranularity,
Granularity expectedQueryGranularity
) )
{ {
assertIngestionSchema( assertIngestionSchema(
@ -1264,7 +1382,8 @@ public class CompactionTaskTest
null, null,
null null
), ),
expectedSegmentGranularity expectedSegmentGranularity,
expectedQueryGranularity
); );
} }
@ -1274,7 +1393,8 @@ public class CompactionTaskTest
List<AggregatorFactory> expectedMetricsSpec, List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals, List<Interval> expectedSegmentIntervals,
ParallelIndexTuningConfig expectedTuningConfig, ParallelIndexTuningConfig expectedTuningConfig,
Granularity expectedSegmentGranularity Granularity expectedSegmentGranularity,
Granularity expectedQueryGranularity
) )
{ {
Preconditions.checkArgument( Preconditions.checkArgument(
@ -1307,7 +1427,7 @@ public class CompactionTaskTest
Assert.assertEquals( Assert.assertEquals(
new UniformGranularitySpec( new UniformGranularitySpec(
expectedSegmentGranularity, expectedSegmentGranularity,
Granularities.NONE, expectedQueryGranularity,
false, false,
Collections.singletonList(expectedSegmentIntervals.get(i)) Collections.singletonList(expectedSegmentIntervals.get(i))
), ),

View File

@ -31,11 +31,10 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient; import org.apache.druid.testing.clients.CompactionResourceTestClient;
@ -298,7 +297,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR; Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UniformGranularitySpec(newGranularity, null, null)); submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
LOG.info("Auto compaction test with YEAR segment granularity"); LOG.info("Auto compaction test with YEAR segment granularity");
@ -314,7 +313,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
checkCompactionIntervals(expectedIntervalAfterCompaction); checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.DAY; newGranularity = Granularities.DAY;
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UniformGranularitySpec(newGranularity, null, null)); submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
LOG.info("Auto compaction test with DAY segment granularity"); LOG.info("Auto compaction test with DAY segment granularity");
@ -374,7 +373,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null); submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null);
} }
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, GranularitySpec granularitySpec) throws Exception private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec) throws Exception
{ {
submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec); submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec);
} }
@ -383,7 +382,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
PartitionsSpec partitionsSpec, PartitionsSpec partitionsSpec,
Period skipOffsetFromLatest, Period skipOffsetFromLatest,
int maxNumConcurrentSubTasks, int maxNumConcurrentSubTasks,
GranularitySpec granularitySpec UserCompactionTaskGranularityConfig granularitySpec
) throws Exception ) throws Exception
{ {
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig( DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(

View File

@ -41,8 +41,10 @@ import java.io.InputStream;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE}) @Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE})
@ -52,10 +54,13 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static final Logger LOG = new Logger(ITCompactionTaskTest.class); private static final Logger LOG = new Logger(ITCompactionTaskTest.class);
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_QUERIES_YEAR_RESOURCE = "/indexer/wikipedia_index_queries_year_query_granularity.json";
private static final String INDEX_QUERIES_HOUR_RESOURCE = "/indexer/wikipedia_index_queries_hour_query_granularity.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test"; private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String SEGMENT_METADATA_QUERY_RESOURCE_QR4 = "/indexer/segment_metadata_qr4.json"; private static final String SEGMENT_METADATA_QUERY_RESOURCE = "/indexer/segment_metadata_query.json";
private static final String SEGMENT_METADATA_QUERY_RESOURCE_QR2 = "/indexer/segment_metadata_qr2.json";
private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json"; private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = "/indexer/wikipedia_compaction_task_with_segment_granularity.json";
@ -87,11 +92,68 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
} }
@Test @Test
public void testCompactionWithGranularitySpec() throws Exception public void testCompactionWithSegmentGranularityInGranularitySpec() throws Exception
{ {
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH); loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH);
} }
@Test
public void testCompactionWithQueryGranularityInGranularitySpec() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
// QueryGranularity was SECOND, now we will change it to HOUR
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, null, GranularityType.HOUR);
// The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_HOUR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.HOUR.name(), 2);
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
@Test
public void testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() throws Exception
{
try (final Closeable ignored = unloader(fullDatasourceName)) {
loadData(INDEX_TASK);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
String queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
compactData(COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.YEAR, GranularityType.YEAR);
// The original 4 segments should be compacted into 1 new segment
checkNumberOfSegments(1);
queryResponseTemplate = getQueryResponseTemplate(INDEX_QUERIES_YEAR_RESOURCE);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.YEAR.name(), 1);
List<String> newIntervals = new ArrayList<>();
for (String interval : expectedIntervalAfterCompaction) {
for (Interval newinterval : GranularityType.YEAR.getDefaultGranularity().getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
newIntervals.add(newinterval.toString());
}
}
expectedIntervalAfterCompaction = newIntervals;
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
@Test @Test
public void testCompactionWithTimestampDimension() throws Exception public void testCompactionWithTimestampDimension() throws Exception
{ {
@ -105,37 +167,22 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
GranularityType newSegmentGranularity GranularityType newSegmentGranularity
) throws Exception ) throws Exception
{ {
loadData(indexTask);
// 4 segments across 2 days
checkNumberOfSegments(4);
List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction.sort(null);
try (final Closeable ignored = unloader(fullDatasourceName)) { try (final Closeable ignored = unloader(fullDatasourceName)) {
String queryResponseTemplate; loadData(indexTask);
try { // 4 segments across 2 days
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queriesResource); checkNumberOfSegments(4);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); List<String> expectedIntervalAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
} expectedIntervalAfterCompaction.sort(null);
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queriesResource);
}
queryResponseTemplate = StringUtils.replace( checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 4);
queryResponseTemplate, String queryResponseTemplate = getQueryResponseTemplate(queriesResource);
"%%DATASOURCE%%",
fullDatasourceName
);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE_QR4);
queryHelper.testQueriesFromString(queryResponseTemplate); queryHelper.testQueriesFromString(queryResponseTemplate);
compactData(compactionResource, newSegmentGranularity); compactData(compactionResource, newSegmentGranularity, null);
// The original 4 segments should be compacted into 2 new segments // The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2); checkNumberOfSegments(2);
queryHelper.testQueriesFromString(queryResponseTemplate); queryHelper.testQueriesFromString(queryResponseTemplate);
checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE_QR2); checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, GranularityType.SECOND.name(), 2);
if (newSegmentGranularity != null) { if (newSegmentGranularity != null) {
List<String> newIntervals = new ArrayList<>(); List<String> newIntervals = new ArrayList<>();
@ -164,18 +211,31 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
); );
} }
private void compactData(String compactionResource, GranularityType newSegmentGranularity) throws Exception private void compactData(String compactionResource, GranularityType newSegmentGranularity, GranularityType newQueryGranularity) throws Exception
{ {
String template = getResourceAsString(compactionResource); String template = getResourceAsString(compactionResource);
template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); template = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName);
// For the new granularitySpec map
Map<String, String> granularityMap = new HashMap<>();
if (newSegmentGranularity != null) {
granularityMap.put("segmentGranularity", newSegmentGranularity.name());
}
if (newQueryGranularity != null) {
granularityMap.put("queryGranularity", newQueryGranularity.name());
}
template = StringUtils.replace(
template,
"%%GRANULARITY_SPEC%%",
jsonMapper.writeValueAsString(granularityMap)
);
// For the deprecated segment granularity field
if (newSegmentGranularity != null) { if (newSegmentGranularity != null) {
template = StringUtils.replace( template = StringUtils.replace(
template, template,
"%%SEGMENTGRANULARITY%%", "%%SEGMENT_GRANULARITY%%",
newSegmentGranularity.name() newSegmentGranularity.name()
); );
} }
final String taskID = indexer.submitTask(template); final String taskID = indexer.submitTask(template);
LOG.info("TaskID for compaction task %s", taskID); LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID); indexer.waitUntilTaskCompletes(taskID);
@ -186,7 +246,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
); );
} }
private void checkQueryGranularity(String queryResource) throws Exception private void checkQueryGranularity(String queryResource, String expectedQueryGranularity, int segmentCount) throws Exception
{ {
String queryResponseTemplate; String queryResponseTemplate;
try { try {
@ -212,6 +272,17 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
"%%INTERVALS%%", "%%INTERVALS%%",
"2013-08-31/2013-09-02" "2013-08-31/2013-09-02"
); );
List<Map<String, String>> expectedResults = new ArrayList<>();
for (int i = 0; i < segmentCount; i++) {
Map<String, String> result = new HashMap<>();
result.put("queryGranularity", expectedQueryGranularity);
expectedResults.add(result);
}
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%EXPECTED_QUERY_GRANULARITY%%",
jsonMapper.writeValueAsString(expectedResults)
);
queryHelper.testQueriesFromString(queryResponseTemplate); queryHelper.testQueriesFromString(queryResponseTemplate);
} }
@ -240,4 +311,22 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
"Compaction interval check" "Compaction interval check"
); );
} }
private String getQueryResponseTemplate(String queryResourcePath)
{
String queryResponseTemplate;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryResourcePath);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryResourcePath);
}
return queryResponseTemplate;
}
} }

View File

@ -1,29 +0,0 @@
[
{
"query": {
"queryType": "segmentMetadata",
"dataSource": "%%DATASOURCE%%",
"analysisTypes": [
"%%ANALYSIS_TYPE%%"
],
"intervals": [
"%%INTERVALS%%"
]
},
"expectedResults": [
{
"queryGranularity": "SECOND"
},
{
"queryGranularity": "SECOND"
},
{
"queryGranularity": "SECOND"
},
{
"queryGranularity": "SECOND"
}
],
"fieldsToTest": ["queryGranularity"]
}
]

View File

@ -10,14 +10,7 @@
"%%INTERVALS%%" "%%INTERVALS%%"
] ]
}, },
"expectedResults": [ "expectedResults": %%EXPECTED_QUERY_GRANULARITY%%,
{
"queryGranularity": "SECOND"
},
{
"queryGranularity": "SECOND"
}
],
"fieldsToTest": ["queryGranularity"] "fieldsToTest": ["queryGranularity"]
} }
] ]

View File

@ -8,9 +8,7 @@
"interval": "2013-08-31/2013-09-02" "interval": "2013-08-31/2013-09-02"
} }
}, },
"granularitySpec": { "granularitySpec": %%GRANULARITY_SPEC%%,
"segmentGranularity": "%%SEGMENTGRANULARITY%%"
},
"context" : { "context" : {
"storeCompactionState" : true "storeCompactionState" : true
} }

View File

@ -8,7 +8,7 @@
"interval": "2013-08-31/2013-09-02" "interval": "2013-08-31/2013-09-02"
} }
}, },
"segmentGranularity": "%%SEGMENTGRANULARITY%%", "segmentGranularity": "%%SEGMENT_GRANULARITY%%",
"context" : { "context" : {
"storeCompactionState" : true "storeCompactionState" : true
} }

View File

@ -0,0 +1,150 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2013-08-31T01:00:00.000Z",
"result" : {
"minTime" : "2013-08-31T01:00:00.000Z",
"maxTime" : "2013-09-01T12:00:00.000Z"
}
}
]
},
{
"description": "timeseries, datasketch aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "HLLSketchMerge",
"name": "approxCountHLL",
"fieldName": "HLLSketchBuild",
"lgK": 12,
"tgtHllType": "HLL_4",
"round": true
},
{
"type":"thetaSketch",
"name":"approxCountTheta",
"fieldName":"thetaSketch",
"size":16384,
"shouldFinalize":true,
"isInputThetaSketch":false,
"errorBoundsStdDev":null
},
{
"type":"quantilesDoublesSketch",
"name":"quantilesSketch",
"fieldName":"quantilesDoublesSketch",
"k":128
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"quantilesSketch":5,
"approxCountTheta":5.0,
"approxCountHLL":5
}
}
]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"page"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"zh"
},
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"added",
"name":"added_count"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 9050.0,
"page" : "Crimson Typhoon",
"added_count" : 905,
"rows" : 1
}
} ]
},
{
"description": "timeseries, stringFirst/stringLast aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "stringFirst",
"name": "first_user",
"fieldName": "user"
},
{
"type":"stringLast",
"name":"last_user",
"fieldName":"user"
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"first_user":"nuclear",
"last_user":"stringer"
}
}
]
}
]

View File

@ -0,0 +1,150 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2013-01-01T00:00:00.000Z",
"result" : {
"minTime" : "2013-01-01T00:00:00.000Z",
"maxTime" : "2013-01-01T00:00:00.000Z"
}
}
]
},
{
"description": "timeseries, datasketch aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-01-01T00:00/2014-01-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "HLLSketchMerge",
"name": "approxCountHLL",
"fieldName": "HLLSketchBuild",
"lgK": 12,
"tgtHllType": "HLL_4",
"round": true
},
{
"type":"thetaSketch",
"name":"approxCountTheta",
"fieldName":"thetaSketch",
"size":16384,
"shouldFinalize":true,
"isInputThetaSketch":false,
"errorBoundsStdDev":null
},
{
"type":"quantilesDoublesSketch",
"name":"quantilesSketch",
"fieldName":"quantilesDoublesSketch",
"k":128
}
]
},
"expectedResults":[
{
"timestamp" : "2013-01-01T00:00:00.000Z",
"result" : {
"quantilesSketch":10,
"approxCountTheta":5.0,
"approxCountHLL":5
}
}
]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"page"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"zh"
},
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"added",
"name":"added_count"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-01-01T00:00/2014-01-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-01-01T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 18100.0,
"page" : "Crimson Typhoon",
"added_count" : 1810,
"rows" : 1
}
} ]
},
{
"description": "timeseries, stringFirst/stringLast aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-01-01T00:00/2014-01-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "stringFirst",
"name": "first_user",
"fieldName": "user"
},
{
"type":"stringLast",
"name":"last_user",
"fieldName":"user"
}
]
},
"expectedResults":[
{
"timestamp" : "2013-01-01T00:00:00.000Z",
"result" : {
"first_user":"masterYi",
"last_user":"speed"
}
}
]
}
]

View File

@ -22,26 +22,22 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
import java.util.Objects; import java.util.Objects;
public class ClientCompactionTaskQueryGranularitySpec public class ClientCompactionTaskGranularitySpec
{ {
private final Granularity segmentGranularity; private final Granularity segmentGranularity;
private final Granularity queryGranularity; private final Granularity queryGranularity;
private final boolean rollup;
@JsonCreator @JsonCreator
public ClientCompactionTaskQueryGranularitySpec( public ClientCompactionTaskGranularitySpec(
@JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") Granularity queryGranularity, @JsonProperty("queryGranularity") Granularity queryGranularity
@JsonProperty("rollup") Boolean rollup
) )
{ {
this.queryGranularity = queryGranularity == null ? BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY : queryGranularity; this.queryGranularity = queryGranularity;
this.rollup = rollup == null ? BaseGranularitySpec.DEFAULT_ROLLUP : rollup; this.segmentGranularity = segmentGranularity;
this.segmentGranularity = segmentGranularity == null ? BaseGranularitySpec.DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
} }
@JsonProperty @JsonProperty
@ -56,20 +52,9 @@ public class ClientCompactionTaskQueryGranularitySpec
return queryGranularity; return queryGranularity;
} }
@JsonProperty public ClientCompactionTaskGranularitySpec withSegmentGranularity(Granularity segmentGranularity)
public boolean isRollup()
{ {
return rollup; return new ClientCompactionTaskGranularitySpec(segmentGranularity, queryGranularity);
}
@Override
public String toString()
{
return "ClientCompactionTaskQueryGranularitySpec{" +
"segmentGranularity=" + segmentGranularity +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
} }
@Override @Override
@ -81,15 +66,23 @@ public class ClientCompactionTaskQueryGranularitySpec
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
ClientCompactionTaskQueryGranularitySpec that = (ClientCompactionTaskQueryGranularitySpec) o; ClientCompactionTaskGranularitySpec that = (ClientCompactionTaskGranularitySpec) o;
return Objects.equals(segmentGranularity, that.segmentGranularity) && return Objects.equals(segmentGranularity, that.segmentGranularity) &&
Objects.equals(queryGranularity, that.queryGranularity) && Objects.equals(queryGranularity, that.queryGranularity);
Objects.equals(rollup, that.rollup);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(segmentGranularity, queryGranularity, rollup); return Objects.hash(segmentGranularity, queryGranularity);
}
@Override
public String toString()
{
return "ClientCompactionTaskGranularitySpec{" +
"segmentGranularity=" + segmentGranularity +
", queryGranularity=" + queryGranularity +
'}';
} }
} }

View File

@ -38,7 +38,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
private final String dataSource; private final String dataSource;
private final ClientCompactionIOConfig ioConfig; private final ClientCompactionIOConfig ioConfig;
private final ClientCompactionTaskQueryTuningConfig tuningConfig; private final ClientCompactionTaskQueryTuningConfig tuningConfig;
private final ClientCompactionTaskQueryGranularitySpec granularitySpec; private final ClientCompactionTaskGranularitySpec granularitySpec;
private final Map<String, Object> context; private final Map<String, Object> context;
@JsonCreator @JsonCreator
@ -47,7 +47,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig, @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
@JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig, @JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig,
@JsonProperty("granularitySpec") ClientCompactionTaskQueryGranularitySpec granularitySpec, @JsonProperty("granularitySpec") ClientCompactionTaskGranularitySpec granularitySpec,
@JsonProperty("context") Map<String, Object> context @JsonProperty("context") Map<String, Object> context
) )
{ {
@ -93,7 +93,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
} }
@JsonProperty @JsonProperty
public ClientCompactionTaskQueryGranularitySpec getGranularitySpec() public ClientCompactionTaskGranularitySpec getGranularitySpec()
{ {
return granularitySpec; return granularitySpec;
} }

View File

@ -79,7 +79,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
List<DataSegment> segments, List<DataSegment> segments,
int compactionTaskPriority, int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec, @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable Map<String, Object> context @Nullable Map<String, Object> context
) )
{ {

View File

@ -40,7 +40,7 @@ public interface IndexingServiceClient
List<DataSegment> segments, List<DataSegment> segments,
int compactionTaskPriority, int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec, @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable Map<String, Object> context @Nullable Map<String, Object> context
); );

View File

@ -22,9 +22,6 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Period; import org.joda.time.Period;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -49,7 +46,7 @@ public class DataSourceCompactionConfig
private final Integer maxRowsPerSegment; private final Integer maxRowsPerSegment;
private final Period skipOffsetFromLatest; private final Period skipOffsetFromLatest;
private final UserCompactionTaskQueryTuningConfig tuningConfig; private final UserCompactionTaskQueryTuningConfig tuningConfig;
private final GranularitySpec granularitySpec; private final UserCompactionTaskGranularityConfig granularitySpec;
private final Map<String, Object> taskContext; private final Map<String, Object> taskContext;
@JsonCreator @JsonCreator
@ -60,7 +57,7 @@ public class DataSourceCompactionConfig
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment, @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig, @JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig,
@JsonProperty("granularitySpec") @Nullable GranularitySpec granularitySpec, @JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
) )
{ {
@ -76,20 +73,8 @@ public class DataSourceCompactionConfig
this.tuningConfig = tuningConfig; this.tuningConfig = tuningConfig;
if (granularitySpec != null) { if (granularitySpec != null) {
Preconditions.checkArgument( Preconditions.checkArgument(
granularitySpec instanceof UniformGranularitySpec, granularitySpec.getQueryGranularity() == null,
"Auto compaction granularitySpec only supports uniform type" "Auto compaction granularitySpec does not support query granularity value");
);
Preconditions.checkArgument(
granularitySpec.isRollup() == BaseGranularitySpec.DEFAULT_ROLLUP,
"Auto compaction granularitySpec only supports default rollup value"
);
Preconditions.checkArgument(
granularitySpec.getQueryGranularity().equals(BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY),
"Auto compaction granularitySpec only supports default query granularity value");
Preconditions.checkArgument(
granularitySpec.inputIntervals().isEmpty(),
"Auto compaction granularitySpec does not supports interval value"
);
} }
this.granularitySpec = granularitySpec; this.granularitySpec = granularitySpec;
this.taskContext = taskContext; this.taskContext = taskContext;
@ -136,7 +121,7 @@ public class DataSourceCompactionConfig
@JsonProperty @JsonProperty
@Nullable @Nullable
public GranularitySpec getGranularitySpec() public UserCompactionTaskGranularityConfig getGranularitySpec()
{ {
return granularitySpec; return granularitySpec;
} }

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity;
import java.util.Objects;
public class UserCompactionTaskGranularityConfig
{
private final Granularity segmentGranularity;
private final Granularity queryGranularity;
@JsonCreator
public UserCompactionTaskGranularityConfig(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") Granularity queryGranularity
)
{
this.queryGranularity = queryGranularity;
this.segmentGranularity = segmentGranularity;
}
@JsonProperty("segmentGranularity")
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
@JsonProperty("queryGranularity")
public Granularity getQueryGranularity()
{
return queryGranularity;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UserCompactionTaskGranularityConfig that = (UserCompactionTaskGranularityConfig) o;
return Objects.equals(segmentGranularity, that.segmentGranularity) &&
Objects.equals(queryGranularity, that.queryGranularity);
}
@Override
public int hashCode()
{
return Objects.hash(segmentGranularity, queryGranularity);
}
@Override
public String toString()
{
return "UserCompactionTaskGranularityConfig{" +
"segmentGranularity=" + segmentGranularity +
", queryGranularity=" + queryGranularity +
'}';
}
}

View File

@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskPayloadResponse;
@ -310,12 +310,11 @@ public class CompactSegments implements CoordinatorDuty
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size()); snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
ClientCompactionTaskQueryGranularitySpec queryGranularitySpec; ClientCompactionTaskGranularitySpec queryGranularitySpec;
if (config.getGranularitySpec() != null) { if (config.getGranularitySpec() != null) {
queryGranularitySpec = new ClientCompactionTaskQueryGranularitySpec( queryGranularitySpec = new ClientCompactionTaskGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(), config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(), config.getGranularitySpec().getQueryGranularity()
config.getGranularitySpec().isRollup()
); );
} else { } else {
queryGranularitySpec = null; queryGranularitySpec = null;

View File

@ -50,7 +50,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
List<DataSegment> segments, List<DataSegment> segments,
int compactionTaskPriority, int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec, @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable Map<String, Object> context @Nullable Map<String, Object> context
) )
{ {

View File

@ -20,20 +20,16 @@
package org.apache.druid.server.coordinator; package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory; import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
@ -238,7 +234,7 @@ public class DataSourceCompactionConfigTest
null, null,
new Period(3600), new Period(3600),
null, null,
new UniformGranularitySpec(Granularities.HOUR, null, null), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config); final String json = OBJECT_MAPPER.writeValueAsString(config);
@ -264,55 +260,60 @@ public class DataSourceCompactionConfigTest
null, null,
new Period(3600), new Period(3600),
null, null,
new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, null), new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH),
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
} }
@Test(expected = IllegalArgumentException.class) @Test
public void testFailIfGranularitySpecContainsNonDefaultRollup() public void testSerdeWithNullGranularitySpec() throws IOException
{ {
new DataSourceCompactionConfig( final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource", "dataSource",
null, null,
500L, 500L,
null, null,
new Period(3600), new Period(3600),
null, null,
new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, false, null), null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
} }
@Test(expected = IllegalArgumentException.class) @Test
public void testFailIfGranularitySpecContainsNonEmptyInterval() public void testSerdeGranularitySpecWithNullValues() throws IOException
{ {
new DataSourceCompactionConfig( final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource", "dataSource",
null, null,
500L, 500L,
null, null,
new Period(3600), new Period(3600),
null, null,
new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH, ImmutableList.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z"))), new UserCompactionTaskGranularityConfig(null, null),
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
} }
@Test(expected = IllegalArgumentException.class)
public void testFailIfGranularitySpecIsNotUniform()
{
new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new ArbitraryGranularitySpec(null, null, null),
ImmutableMap.of("key", "val")
);
}
} }

View File

@ -30,8 +30,8 @@ import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.HttpIndexingServiceClient;
@ -58,7 +58,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
@ -66,6 +65,7 @@ import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -597,7 +597,8 @@ public class CompactSegmentsTest
); );
doCompactSegments(compactSegments, compactionConfigs); doCompactSegments(compactSegments, compactionConfigs);
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class); ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class); ArgumentCaptor<ClientCompactionTaskGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
ClientCompactionTaskGranularitySpec.class);
Mockito.verify(mockIndexingServiceClient).compactSegments( Mockito.verify(mockIndexingServiceClient).compactSegments(
ArgumentMatchers.anyString(), ArgumentMatchers.anyString(),
segmentsCaptor.capture(), segmentsCaptor.capture(),
@ -644,13 +645,14 @@ public class CompactSegmentsTest
null, null,
null null
), ),
new UniformGranularitySpec(Granularities.YEAR, null, null), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
null null
) )
); );
doCompactSegments(compactSegments, compactionConfigs); doCompactSegments(compactSegments, compactionConfigs);
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class); ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class); ArgumentCaptor<ClientCompactionTaskGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
ClientCompactionTaskGranularitySpec.class);
Mockito.verify(mockIndexingServiceClient).compactSegments( Mockito.verify(mockIndexingServiceClient).compactSegments(
ArgumentMatchers.anyString(), ArgumentMatchers.anyString(),
segmentsCaptor.capture(), segmentsCaptor.capture(),
@ -662,7 +664,10 @@ public class CompactSegmentsTest
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
// are within the same year // are within the same year
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
Assert.assertEquals(Granularities.YEAR, granularitySpecArgumentCaptor.getValue().getSegmentGranularity()); ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue();
Assert.assertNotNull(actual);
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
Assert.assertEquals(expected, actual);
} }
@Test @Test
@ -696,7 +701,7 @@ public class CompactSegmentsTest
) )
), ),
null, null,
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, null, null), new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
null null
) )
); );
@ -731,7 +736,7 @@ public class CompactSegmentsTest
null, null,
null null
), ),
new UniformGranularitySpec(Granularities.YEAR, null, null), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
null null
) )
); );
@ -742,7 +747,8 @@ public class CompactSegmentsTest
// Make sure that we do not skip interval of conflict task. // Make sure that we do not skip interval of conflict task.
// Since we cancel the task and will have to compact those intervals with the new segmentGranulartity // Since we cancel the task and will have to compact those intervals with the new segmentGranulartity
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class); ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class); ArgumentCaptor<ClientCompactionTaskGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
ClientCompactionTaskGranularitySpec.class);
Mockito.verify(mockIndexingServiceClient).compactSegments( Mockito.verify(mockIndexingServiceClient).compactSegments(
ArgumentMatchers.anyString(), ArgumentMatchers.anyString(),
segmentsCaptor.capture(), segmentsCaptor.capture(),
@ -754,7 +760,10 @@ public class CompactSegmentsTest
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
// are within the same year // are within the same year
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
Assert.assertEquals(Granularities.YEAR, granularitySpecArgumentCaptor.getValue().getSegmentGranularity()); ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue();
Assert.assertNotNull(actual);
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
Assert.assertEquals(expected, actual);
} }
@Test @Test
@ -810,7 +819,6 @@ public class CompactSegmentsTest
1, 1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT) stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
); );
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
// Note: Subsequent compaction run after the dataSource was compacted will show different numbers than // Note: Subsequent compaction run after the dataSource was compacted will show different numbers than
// on the run it was compacted. For example, in a compaction run, if a dataSource had 4 segments compacted, // on the run it was compacted. For example, in a compaction run, if a dataSource had 4 segments compacted,
// on the same compaction run the segment compressed count will be 4 but on subsequent run it might be 2 // on the same compaction run the segment compressed count will be 4 but on subsequent run it might be 2

View File

@ -29,9 +29,8 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -383,7 +382,7 @@ public class NewestSegmentFirstPolicyTest
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
); );
@ -410,7 +409,7 @@ public class NewestSegmentFirstPolicyTest
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
); );
@ -436,7 +435,7 @@ public class NewestSegmentFirstPolicyTest
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
); );
@ -545,7 +544,7 @@ public class NewestSegmentFirstPolicyTest
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
); );
@ -594,7 +593,7 @@ public class NewestSegmentFirstPolicyTest
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
); );
@ -629,7 +628,7 @@ public class NewestSegmentFirstPolicyTest
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
); );
@ -735,7 +734,7 @@ public class NewestSegmentFirstPolicyTest
private DataSourceCompactionConfig createCompactionConfig( private DataSourceCompactionConfig createCompactionConfig(
long inputSegmentSizeBytes, long inputSegmentSizeBytes,
Period skipOffsetFromLatest, Period skipOffsetFromLatest,
GranularitySpec granularitySpec UserCompactionTaskGranularityConfig granularitySpec
) )
{ {
return new DataSourceCompactionConfig( return new DataSourceCompactionConfig(