diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java index 227bd3a6d19..3c4263ba999 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java +++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java @@ -19,9 +19,13 @@ package org.apache.druid.data.input; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; +import javax.validation.constraints.NotNull; +import java.util.Set; + /** * Schema of {@link InputRow}. */ @@ -30,16 +34,39 @@ public class InputRowSchema private final TimestampSpec timestampSpec; private final DimensionsSpec dimensionsSpec; private final ColumnsFilter columnsFilter; + /** + * Set of metric names for further downstream processing by {@link InputSource}. + * Empty set if no metric given. + */ + @NotNull + private final Set metricNames; public InputRowSchema( final TimestampSpec timestampSpec, final DimensionsSpec dimensionsSpec, final ColumnsFilter columnsFilter ) + { + this(timestampSpec, dimensionsSpec, columnsFilter, ImmutableSet.of()); + } + + public InputRowSchema( + final TimestampSpec timestampSpec, + final DimensionsSpec dimensionsSpec, + final ColumnsFilter columnsFilter, + final Set metricNames + ) { this.timestampSpec = timestampSpec; this.dimensionsSpec = dimensionsSpec; this.columnsFilter = columnsFilter; + this.metricNames = metricNames == null ? ImmutableSet.of() : metricNames; + } + + @NotNull + public Set getMetricNames() + { + return metricNames; } public TimestampSpec getTimestampSpec() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index ff1dd617f1b..6f1a6d6446f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -23,12 +23,14 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -241,8 +243,26 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter); + return new InputEntityIteratingReader( + getInputRowSchemaToUse(inputRowSchema), + inputFormat, + entityIterator, + temporaryDirectory + ); + } + + @VisibleForTesting + InputRowSchema getInputRowSchemaToUse(InputRowSchema inputRowSchema) + { final InputRowSchema inputRowSchemaToUse; + ColumnsFilter columnsFilterToUse = inputRowSchema.getColumnsFilter(); + if (inputRowSchema.getMetricNames() != null) { + for (String metricName : inputRowSchema.getMetricNames()) { + columnsFilterToUse = columnsFilterToUse.plus(metricName); + } + } + if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) { // Legacy compatibility mode; see https://github.com/apache/druid/pull/10267. LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with " @@ -251,10 +271,14 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI inputRowSchemaToUse = new InputRowSchema( new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null), inputRowSchema.getDimensionsSpec(), - inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME) + columnsFilterToUse.plus(ColumnHolder.TIME_COLUMN_NAME) ); } else { - inputRowSchemaToUse = inputRowSchema; + inputRowSchemaToUse = new InputRowSchema( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + columnsFilterToUse + ); } if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn()) @@ -268,12 +292,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI ); } - return new InputEntityIteratingReader( - inputRowSchemaToUse, - inputFormat, - entityIterator, - temporaryDirectory - ); + return inputRowSchemaToUse; } private List> createTimeline() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java index f273be7922b..c895eb14b71 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.transform.Transform; import org.apache.druid.segment.transform.TransformSpec; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; @@ -56,7 +57,10 @@ public class InputRowSchemas dataSchema.getDimensionsSpec(), dataSchema.getTransformSpec(), dataSchema.getAggregators() - ) + ), + Arrays.stream(dataSchema.getAggregators()) + .map(AggregatorFactory::getName) + .collect(Collectors.toSet()) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index d2ed8b31e5e..29952ebc1c0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -57,6 +57,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -93,6 +94,7 @@ public class ClientCompactionTaskQuerySerdeTest true ), new ClientCompactionTaskQueryTuningConfig( + null, null, 40000, 2000L, @@ -249,7 +251,7 @@ public class ClientCompactionTaskQuerySerdeTest new ParallelIndexTuningConfig( null, null, - null, + new OnheapIncrementalIndex.Spec(true), 40000, 2000L, null, @@ -313,6 +315,7 @@ public class ClientCompactionTaskQuerySerdeTest ), new ClientCompactionTaskQueryTuningConfig( 100, + new OnheapIncrementalIndex.Spec(true), 40000, 2000L, 30000L, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java index ebc2b94f328..2989415a5e6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java @@ -23,8 +23,13 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; @@ -35,12 +40,15 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.Arrays; + public class DruidInputSourceTest { private final IndexIO indexIO = EasyMock.createMock(IndexIO.class); @@ -221,4 +229,74 @@ public class DruidInputSourceTest mapper.readValue(json, InputSource.class); } + + @Test + public void testReaderColumnsFilterWithMetricGiven() + { + String datasource = "foo"; + Interval interval = Intervals.of("2000/2001"); + String column = "c1"; + String metricName = "m1"; + ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column)); + InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")) + ), + originalColumnsFilter, + ImmutableSet.of(metricName) + ); + DruidInputSource druidInputSource = new DruidInputSource( + datasource, + interval, + null, + null, + ImmutableList.of("a"), + ImmutableList.of("b"), + indexIO, + coordinatorClient, + segmentCacheManagerFactory, + retryPolicyFactory, + taskConfig + ); + InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema); + ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter(); + Assert.assertTrue(columnsFilter.apply(column)); + Assert.assertTrue(columnsFilter.apply(metricName)); + } + + @Test + public void testReaderColumnsFilterWithNoMetricGiven() + { + String datasource = "foo"; + Interval interval = Intervals.of("2000/2001"); + String column = "c1"; + String metricName = "m1"; + ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column)); + InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")) + ), + originalColumnsFilter, + ImmutableSet.of() + ); + DruidInputSource druidInputSource = new DruidInputSource( + datasource, + interval, + null, + null, + ImmutableList.of("a"), + ImmutableList.of("b"), + indexIO, + coordinatorClient, + segmentCacheManagerFactory, + retryPolicyFactory, + taskConfig + ); + InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema); + ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter(); + Assert.assertTrue(columnsFilter.apply(column)); + Assert.assertFalse(columnsFilter.apply(metricName)); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java index ae6ed8816d6..991a5950f9a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java @@ -23,18 +23,28 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.NullHandlingTest; import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.DoubleDimensionSchema; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; + public class InputRowSchemasTest extends NullHandlingTest { @Test @@ -98,4 +108,65 @@ public class InputRowSchemasTest extends NullHandlingTest columnsFilter ); } + + @Test + public void testFromDataSchema() + { + TimestampSpec timestampSpec = new TimestampSpec(null, null, null); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ) + ); + DataSchema schema = new DataSchema( + "dataSourceName", + new TimestampSpec(null, null, null), + dimensionsSpec, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("met", "met") + }, + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null + ); + + InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema); + Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec()); + Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions()); + Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames()); + Assert.assertEquals(ImmutableSet.of("count", "met"), inputRowSchema.getMetricNames()); + } + + @Test + public void testFromDataSchemaWithNoAggregator() + { + TimestampSpec timestampSpec = new TimestampSpec(null, null, null); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ) + ); + DataSchema schema = new DataSchema( + "dataSourceName", + new TimestampSpec(null, null, null), + dimensionsSpec, + new AggregatorFactory[]{}, + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null + ); + + InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema); + Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec()); + Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions()); + Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames()); + Assert.assertEquals(ImmutableSet.of(), inputRowSchema.getMetricNames()); + } } diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index fb378b766ed..71e6add527f 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -432,6 +432,11 @@ ${aws.sdk.version} runtime + + org.apache.datasketches + datasketches-java + runtime + diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java index 463c4b2c0b9..accaec0146b 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java @@ -53,6 +53,7 @@ public class CompactionUtil null, null, null, + null, new MaxSizeSplitHintSpec(null, 1), new DynamicPartitionsSpec(maxRowsPerSegment, null), null, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 21e34ca8daf..83e0ac91097 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; +import org.apache.datasketches.hll.TgtHllType; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.TaskState; @@ -41,8 +42,12 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -90,7 +95,10 @@ public class ITAutoCompactionTest extends AbstractIndexerTest private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json"; private static final String INDEX_TASK_WITH_DIMENSION_SPEC = "/indexer/wikipedia_index_task_with_dimension_spec.json"; private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json"; + private static final String INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE = "/indexer/wikipedia_index_sketch_queries.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS = "/indexer/wikipedia_index_rollup_preserve_metric.json"; + private static final String INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS = "/indexer/wikipedia_index_no_rollup_preserve_metric.json"; private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000; private static final Period NO_SKIP_OFFSET = Period.seconds(0); @@ -110,6 +118,226 @@ public class ITAutoCompactionTest extends AbstractIndexerTest fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix(); } + @Test + public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception + { + // added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2 + loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS); + // added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null + loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 2 segments across 1 days... + verifySegmentsCount(2); + ArrayList nullList = new ArrayList(); + nullList.add(null); + Map queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "count", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "sum_added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%QUANTILESRESULT%%", 2, + "%%THETARESULT%%", 2.0, + "%%HLLRESULT%%", 2 + ); + verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields); + + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(null, null, true), + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), + null, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("sum_added", "added"), + new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), + new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false), + new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L) + }, + false + ); + // should now only have 1 row after compaction + // added = null, count = 3, sum_added = 93 + forceTriggerAutoCompaction(1); + + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "count", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "sum_added", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%QUANTILESRESULT%%", 3, + "%%THETARESULT%%", 3.0, + "%%HLLRESULT%%", 3 + ); + verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields); + + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + // Verify rollup segments does not get compacted again + forceTriggerAutoCompaction(1); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + + @Test + public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception + { + // added = 31, count = null, sum_added = null + loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS); + // added = 31, count = null, sum_added = null + loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 2 segments across 1 days... + verifySegmentsCount(2); + ArrayList nullList = new ArrayList(); + nullList.add(null); + Map queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(null, null, true), + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), + null, + new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")}, + false + ); + // should now only have 1 row after compaction + // added = null, count = 2, sum_added = 62 + forceTriggerAutoCompaction(1); + + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "count", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "sum_added", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + // Verify rollup segments does not get compacted again + forceTriggerAutoCompaction(1); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + + @Test + public void testAutoCompactionOnlyRowsWithMetricShouldPreserveExistingMetrics() throws Exception + { + // added = null, count = 2, sum_added = 62 + loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS); + // added = null, count = 2, sum_added = 62 + loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 2 segments across 1 days... + verifySegmentsCount(2); + Map queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "count", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "sum_added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(null, null, true), + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))), + null, + new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")}, + false + ); + // should now only have 1 row after compaction + // added = null, count = 4, sum_added = 124 + forceTriggerAutoCompaction(1); + + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "count", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(4)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "sum_added", + "%%EXPECTED_COUNT_RESULT%%", 1, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(124)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + // Verify rollup segments does not get compacted again + forceTriggerAutoCompaction(1); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + @Test public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception { @@ -646,12 +874,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); try (final Closeable ignored = unloader(fullDatasourceName)) { - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); submitCompactionConfig( MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, @@ -667,12 +895,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // does not have data on every week on the month forceTriggerAutoCompaction(3); // Make sure that no data is lost after compaction - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); List tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName); TaskResponseObject compactTask = null; @@ -696,12 +924,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); try (final Closeable ignored = unloader(fullDatasourceName)) { - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); submitCompactionConfig( MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, @@ -714,12 +942,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // we expect the compaction task's interval to align with the MONTH segmentGranularity (2013-08-01 to 2013-10-01) forceTriggerAutoCompaction(2); // Make sure that no data is lost after compaction - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); List tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName); TaskResponseObject compactTask = null; @@ -742,12 +970,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); try (final Closeable ignored = unloader(fullDatasourceName)) { - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); submitCompactionConfig( MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, @@ -755,12 +983,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest false ); forceTriggerAutoCompaction(2); - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 1, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); @@ -778,12 +1006,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); try (final Closeable ignored = unloader(fullDatasourceName)) { - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); submitCompactionConfig( MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, @@ -791,12 +1019,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest false ); forceTriggerAutoCompaction(2); - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 1, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); @@ -820,12 +1048,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCount(4); // Result is not rollup - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); // Compact and change dimension to only "language" submitCompactionConfig( @@ -840,12 +1068,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest forceTriggerAutoCompaction(2); // Result should rollup on language dimension - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 1, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); @@ -868,12 +1096,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Result is not rollup // For dim "page", result has values "Gypsy Danger" and "Striker Eureka" - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); // Compact and filter with selector on dim "page" and value "Striker Eureka" submitCompactionConfig( @@ -888,12 +1116,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest forceTriggerAutoCompaction(2); // For dim "page", result should only contain value "Striker Eureka" - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 1, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); @@ -915,12 +1143,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCount(4); // For dim "page", result has values "Gypsy Danger" and "Striker Eureka" - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); // Compact and add longSum and doubleSum metrics submitCompactionConfig( @@ -936,19 +1164,19 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Result should be the same with the addition of new metrics, "double_sum_added" and "long_sum_added". // These new metrics should have the same value as the input field "added" - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "double_sum_added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); - expectedResult = ImmutableMap.of( + queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "long_sum_added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57), ImmutableList.of(459)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); @@ -976,12 +1204,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest // Result is not rollup // For dim "page", result has values "Gypsy Danger" and "Striker Eureka" - Map expectedResult = ImmutableMap.of( + Map queryAndResultFields = ImmutableMap.of( "%%FIELD_TO_QUERY%%", "added", "%%EXPECTED_COUNT_RESULT%%", 2, "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) ); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); submitCompactionConfig( MAX_ROWS_PER_SEGMENT_COMPACTED, @@ -994,11 +1222,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest ); // Compact the MONTH segment forceTriggerAutoCompaction(2); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); // Compact the WEEK segment forceTriggerAutoCompaction(2); - verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); // Verify all task succeed List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); @@ -1133,6 +1361,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest null, null, null, + null, new MaxSizeSplitHintSpec(null, 1), partitionsSpec, null, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java index 7b7eefac6c3..7c5afb1ffd3 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java @@ -81,6 +81,7 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest null, null, null, + null, new MaxSizeSplitHintSpec(null, 1), newPartitionsSpec, null, diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json b/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json new file mode 100644 index 00000000000..75e6b663a9e --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json @@ -0,0 +1,76 @@ +{ + "type": "index_parallel", + "spec": { + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "inline", + "data": "{\"isRobot\":true,\"language\":\"en\",\"timestamp\":\"2013-08-31T00:00:11.080Z\",\"flags\":\"NB\",\"isUnpatrolled\":false,\"page\":\"Salo Toraut\",\"diffUrl\":\"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918\",\"added\":31,\"comment\":\"Botskapande Indonesien omdirigering\",\"commentLength\":35,\"isNew\":true,\"isMinor\":false,\"delta\":31,\"isAnonymous\":false,\"user\":\"maytas3\",\"deltaBucket\":0.0,\"deleted\":0,\"namespace\":\"Main\"}\n" + }, + "inputFormat": { + "type": "json" + }, + "appendToExisting": true + }, + "tuningConfig": { + "type": "index_parallel", + "partitionsSpec": { + "type": "dynamic" + } + }, + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "isRobot", + "language", + "flags", + "isUnpatrolled", + "page", + "diffUrl", + { + "type": "long", + "name": "added" + }, + "comment", + { + "type": "long", + "name": "commentLength" + }, + "isNew", + "isMinor", + { + "type": "long", + "name": "delta" + }, + "isAnonymous", + "user", + { + "type": "long", + "name": "deltaBucket" + }, + { + "type": "long", + "name": "deleted" + }, + "namespace", + "cityName", + "countryName", + "regionIsoCode", + "metroCode", + "countryIsoCode", + "regionName" + ] + }, + "granularitySpec": { + "queryGranularity": "hour", + "rollup": false, + "segmentGranularity": "day" + } + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json new file mode 100644 index 00000000000..dacedf0ef1d --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json @@ -0,0 +1,95 @@ +{ + "type": "index_parallel", + "spec": { + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "inline", + "data": "{\"isRobot\":true,\"language\":\"en\",\"timestamp\":\"2013-08-31T00:00:11.080Z\",\"flags\":\"NB\",\"isUnpatrolled\":false,\"page\":\"Salo Toraut\",\"diffUrl\":\"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918\",\"added\":31,\"comment\":\"Botskapande Indonesien omdirigering\",\"commentLength\":35,\"isNew\":true,\"isMinor\":false,\"delta\":31,\"isAnonymous\":false,\"user\":\"maytas1\",\"deltaBucket\":0.0,\"deleted\":0,\"namespace\":\"Main\"}\n{\"isRobot\":true,\"language\":\"en\",\"timestamp\":\"2013-08-31T00:00:11.080Z\",\"flags\":\"NB\",\"isUnpatrolled\":false,\"page\":\"Salo Toraut\",\"diffUrl\":\"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918\",\"added\":31,\"comment\":\"Botskapande Indonesien omdirigering\",\"commentLength\":35,\"isNew\":true,\"isMinor\":false,\"delta\":11,\"isAnonymous\":false,\"user\":\"maytas2\",\"deltaBucket\":0.0,\"deleted\":0,\"namespace\":\"Main\"}\n" + }, + "inputFormat": { + "type": "json" + }, + "appendToExisting": true + }, + "dataSchema": { + "granularitySpec": { + "segmentGranularity": "day", + "queryGranularity": "hour", + "rollup": true + }, + "dataSource": "%%DATASOURCE%%", + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "isRobot", + "language", + "flags", + "isUnpatrolled", + "page", + "diffUrl", + "comment", + "isNew", + "isMinor", + "isAnonymous", + "namespace" + ] + }, + "metricsSpec": [ + { + "name": "count", + "type": "count" + }, + { + "name": "sum_added", + "type": "longSum", + "fieldName": "added" + }, + { + "name": "sum_commentLength", + "type": "longSum", + "fieldName": "commentLength" + }, + { + "name": "sum_delta", + "type": "longSum", + "fieldName": "delta" + }, + { + "name": "sum_deltaBucket", + "type": "longSum", + "fieldName": "deltaBucket" + }, + { + "name": "sum_deleted", + "type": "longSum", + "fieldName": "deleted" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ] + }, + "tuningConfig": { + "type": "index_parallel", + "partitionsSpec": { + "type": "dynamic" + } + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json new file mode 100644 index 00000000000..2b7a0625bcb --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json @@ -0,0 +1,49 @@ +[ + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "quantilesSketch":%%QUANTILESRESULT%%, + "approxCountTheta":%%THETARESULT%%, + "approxCountHLL":%%HLLRESULT%% + } + } + ] + } +] \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java index 5260669f8bb..1269fe1e6b3 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java @@ -34,6 +34,12 @@ public abstract class AppendableIndexBuilder protected boolean sortFacts = true; protected int maxRowCount = 0; protected long maxBytesInMemory = 0; + // When set to true, for any row that already has metric (with the same name defined in metricSpec), + // the metric aggregator in metricSpec is skipped and the existing metric is unchanged. If the row does not already have + // the metric, then the metric aggregator is applied on the source column as usual. This should only be set for + // DruidInputSource since that is the only case where we can have existing metrics. + // This is currently only use by auto compaction and should not be use for anything else. + protected boolean preserveExistingMetrics = false; protected boolean useMaxMemoryEstimates = true; protected final Logger log = new Logger(this.getClass()); @@ -56,7 +62,7 @@ public abstract class AppendableIndexBuilder @VisibleForTesting public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics) { - return setSimpleTestingIndexSchema(null, metrics); + return setSimpleTestingIndexSchema(null, null, metrics); } @@ -70,10 +76,15 @@ public abstract class AppendableIndexBuilder * @return this */ @VisibleForTesting - public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics) + public AppendableIndexBuilder setSimpleTestingIndexSchema( + @Nullable Boolean rollup, + @Nullable Boolean preserveExistingMetrics, + final AggregatorFactory... metrics + ) { IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics); this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build(); + this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : false; return this; } @@ -107,6 +118,12 @@ public abstract class AppendableIndexBuilder return this; } + public AppendableIndexBuilder setPreserveExistingMetrics(final boolean preserveExistingMetrics) + { + this.preserveExistingMetrics = preserveExistingMetrics; + return this; + } + public AppendableIndexBuilder setUseMaxMemoryEstimates(final boolean useMaxMemoryEstimates) { this.useMaxMemoryEstimates = useMaxMemoryEstimates; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index b36ae6872ed..c2e36cade3a 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -227,6 +227,7 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable private final AggregatorFactory[] metrics; private final boolean deserializeComplexMetrics; private final Metadata metadata; + protected final boolean preserveExistingMetrics; private final Map metricDescs; @@ -257,12 +258,20 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input * value for aggregators that return metrics other than float. * @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe + * @param preserveExistingMetrics When set to true, for any row that already has metric + * (with the same name defined in metricSpec), the metric aggregator in metricSpec + * is skipped and the existing metric is unchanged. If the row does not already have + * the metric, then the metric aggregator is applied on the source column as usual. + * This should only be set for DruidInputSource since that is the only case where we + * can have existing metrics. This is currently only use by auto compaction and + * should not be use for anything else. * @param useMaxMemoryEstimates true if max values should be used to estimate memory */ protected IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, final boolean concurrentEventAdd, + final boolean preserveExistingMetrics, final boolean useMaxMemoryEstimates ) { @@ -273,6 +282,7 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable this.metrics = incrementalIndexSchema.getMetrics(); this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; + this.preserveExistingMetrics = preserveExistingMetrics; this.useMaxMemoryEstimates = useMaxMemoryEstimates; this.timeAndMetricsColumnCapabilities = new HashMap<>(); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 493180db458..a28a0a620f4 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -19,6 +19,8 @@ package org.apache.druid.segment.incremental; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import com.google.common.collect.Iterators; import com.google.common.collect.Maps; @@ -52,6 +54,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; /** * @@ -109,6 +112,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Nullable private volatile Map selectors; @Nullable + private volatile Map combiningAggSelectors; + @Nullable private String outOfRowsReason = null; OnheapIncrementalIndex( @@ -118,10 +123,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex boolean sortFacts, int maxRowCount, long maxBytesInMemory, + // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can have existing metrics + // This is currently only use by auto compaction and should not be use for anything else. + boolean preserveExistingMetrics, boolean useMaxMemoryEstimates ) { - super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, useMaxMemoryEstimates); + super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates); this.maxRowCount = maxRowCount; this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) @@ -182,6 +190,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex ) { selectors = new HashMap<>(); + combiningAggSelectors = new HashMap<>(); for (AggregatorFactory agg : metrics) { selectors.put( agg.getName(), @@ -190,6 +199,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex concurrentEventAdd ) ); + if (preserveExistingMetrics) { + AggregatorFactory combiningAgg = agg.getCombiningFactory(); + combiningAggSelectors.put( + combiningAgg.getName(), + new CachingColumnSelectorFactory( + makeColumnSelectorFactory(combiningAgg, rowSupplier, deserializeComplexMetrics), + concurrentEventAdd + ) + ); + } } } @@ -214,7 +233,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex long aggSizeDelta = doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages); totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta); } else { - aggs = new Aggregator[metrics.length]; + if (preserveExistingMetrics) { + aggs = new Aggregator[metrics.length * 2]; + } else { + aggs = new Aggregator[metrics.length]; + } long aggSizeForRow = factorizeAggs(metrics, aggs, rowContainer, row); aggSizeForRow += doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages); @@ -279,23 +302,33 @@ public class OnheapIncrementalIndex extends IncrementalIndex { long totalInitialSizeBytes = 0L; rowContainer.set(row); - final long aggReferenceSize = Long.BYTES; for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; - + // Creates aggregators to aggregate from input into output fields if (useMaxMemoryEstimates) { aggs[i] = agg.factorize(selectors.get(agg.getName())); } else { - AggregatorAndSize aggregatorAndSize = - agg.factorizeWithSize(selectors.get(agg.getName())); + AggregatorAndSize aggregatorAndSize = agg.factorizeWithSize(selectors.get(agg.getName())); aggs[i] = aggregatorAndSize.getAggregator(); totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes(); totalInitialSizeBytes += aggReferenceSize; } + // Creates aggregators to combine already aggregated field + if (preserveExistingMetrics) { + if (useMaxMemoryEstimates) { + AggregatorFactory combiningAgg = agg.getCombiningFactory(); + aggs[i + metrics.length] = combiningAgg.factorize(combiningAggSelectors.get(combiningAgg.getName())); + } else { + AggregatorFactory combiningAgg = agg.getCombiningFactory(); + AggregatorAndSize aggregatorAndSize = combiningAgg.factorizeWithSize(combiningAggSelectors.get(combiningAgg.getName())); + aggs[i + metrics.length] = aggregatorAndSize.getAggregator(); + totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes(); + totalInitialSizeBytes += aggReferenceSize; + } + } } rowContainer.set(null); - return totalInitialSizeBytes; } @@ -315,10 +348,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex ) { rowContainer.set(row); - long totalIncrementalBytes = 0L; - for (int i = 0; i < aggs.length; i++) { - final Aggregator agg = aggs[i]; + for (int i = 0; i < metrics.length; i++) { + final Aggregator agg; + if (preserveExistingMetrics && row instanceof MapBasedRow && ((MapBasedRow) row).getEvent().containsKey(metrics[i].getName())) { + agg = aggs[i + metrics.length]; + } else { + agg = aggs[i]; + } synchronized (agg) { try { if (useMaxMemoryEstimates) { @@ -329,8 +366,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex } catch (ParseException e) { // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. - log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName()); - parseExceptionsHolder.add(e.getMessage()); + if (preserveExistingMetrics) { + log.warn(e, "Failing ingestion as preserveExistingMetrics is enabled but selector of aggregator[%s] recieved incompatible type.", metrics[i].getName()); + throw e; + } else { + log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName()); + parseExceptionsHolder.add(e.getMessage()); + } } } } @@ -410,31 +452,35 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override public float getMetricFloatValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getFloat(); + return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat); } @Override public long getMetricLongValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getLong(); + return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong); } @Override public Object getMetricObjectValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].get(); + return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::get); } @Override protected double getMetricDoubleValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getDouble(); + return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble); } @Override public boolean isNull(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].isNull(); + if (preserveExistingMetrics) { + return concurrentGet(rowOffset)[aggOffset].isNull() && concurrentGet(rowOffset)[aggOffset + getMetricAggs().length].isNull(); + } else { + return concurrentGet(rowOffset)[aggOffset].isNull(); + } } @Override @@ -475,8 +521,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex } Aggregator[] aggs = getAggsForRow(rowOffset); - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), aggs[i].get()); + int aggLength = preserveExistingMetrics ? aggs.length / 2 : aggs.length; + for (int i = 0; i < aggLength; ++i) { + theVals.put(metrics[i].getName(), getMetricHelper(metrics, aggs, i, Aggregator::get)); } if (postAggs != null) { @@ -492,6 +539,40 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } + /** + * Apply the getMetricTypeFunction function to the retrieve aggregated value given the list of aggregators and offset. + * If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator + * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed + */ + private T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function getMetricTypeFunction) + { + if (preserveExistingMetrics) { + // Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values + // from two aggregators, the aggregator for aggregating from input into output field and the aggregator + // for combining already aggregated field + if (aggs[aggOffset].isNull()) { + // If the aggregator for aggregating from input into output field is null, then we get the value from the + // aggregator that we use for combining already aggregated field + return getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]); + } else if (aggs[aggOffset + metrics.length].isNull()) { + // If the aggregator for combining already aggregated field is null, then we get the value from the + // aggregator for aggregating from input into output field + return getMetricTypeFunction.apply(aggs[aggOffset]); + } else { + // Since both aggregators is not null and contain values, we will have to retrieve the values from both + // aggregators and combine them + AggregatorFactory aggregatorFactory = metrics[aggOffset]; + T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]); + T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]); + return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined); + } + } else { + // If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the + // given aggOffset + return getMetricTypeFunction.apply(aggs[aggOffset]); + } + } + /** * Clear out maps to allow GC * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing @@ -506,6 +587,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex if (selectors != null) { selectors.clear(); } + if (combiningAggSelectors != null) { + combiningAggSelectors.clear(); + } } /** @@ -571,6 +655,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex sortFacts, maxRowCount, maxBytesInMemory, + preserveExistingMetrics, useMaxMemoryEstimates ); } @@ -578,12 +663,39 @@ public class OnheapIncrementalIndex extends IncrementalIndex public static class Spec implements AppendableIndexSpec { + private static final boolean DEFAULT_PRESERVE_EXISTING_METRICS = false; public static final String TYPE = "onheap"; + // When set to true, for any row that already has metric (with the same name defined in metricSpec), + // the metric aggregator in metricSpec is skipped and the existing metric is unchanged. If the row does not already have + // the metric, then the metric aggregator is applied on the source column as usual. This should only be set for + // DruidInputSource since that is the only case where we can have existing metrics. + // This is currently only use by auto compaction and should not be use for anything else. + final boolean preserveExistingMetrics; + + public Spec() + { + this.preserveExistingMetrics = DEFAULT_PRESERVE_EXISTING_METRICS; + } + + @JsonCreator + public Spec( + final @JsonProperty("preserveExistingMetrics") @Nullable Boolean preserveExistingMetrics + ) + { + this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : DEFAULT_PRESERVE_EXISTING_METRICS; + } + + @JsonProperty + public boolean isPreserveExistingMetrics() + { + return preserveExistingMetrics; + } + @Override public AppendableIndexBuilder builder() { - return new Builder(); + return new Builder().setPreserveExistingMetrics(preserveExistingMetrics); } @Override @@ -596,15 +708,22 @@ public class OnheapIncrementalIndex extends IncrementalIndex } @Override - public boolean equals(Object that) + public boolean equals(Object o) { - return that.getClass().equals(this.getClass()); + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Spec spec = (Spec) o; + return preserveExistingMetrics == spec.preserveExistingMetrics; } @Override public int hashCode() { - return Objects.hash(this.getClass()); + return Objects.hash(preserveExistingMetrics); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java index 646f1d66bf7..3257ec6d49a 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -37,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.QueryPlus; @@ -70,6 +72,7 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -92,23 +95,30 @@ import java.util.concurrent.atomic.AtomicInteger; public class IncrementalIndexTest extends InitializedNullHandlingTest { public final IncrementalIndexCreator indexCreator; + private final boolean isPreserveExistingMetrics; + @Rule + public ExpectedException expectedException = ExpectedException.none(); @Rule public final CloserRule closer = new CloserRule(false); - public IncrementalIndexTest(String indexType, String mode) throws JsonProcessingException + public IncrementalIndexTest(String indexType, String mode, boolean isPreserveExistingMetrics) throws JsonProcessingException { + this.isPreserveExistingMetrics = isPreserveExistingMetrics; indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder - .setSimpleTestingIndexSchema("rollup".equals(mode), (AggregatorFactory[]) args[0]) + .setSimpleTestingIndexSchema("rollup".equals(mode), isPreserveExistingMetrics, (AggregatorFactory[]) args[0]) .setMaxRowCount(1_000_000) .build() )); } - @Parameterized.Parameters(name = "{index}: {0}, {1}") + @Parameterized.Parameters(name = "{index}: {0}, {1}, {2}") public static Collection constructorFeeder() { - return IncrementalIndexCreator.indexTypeCartesianProduct(ImmutableList.of("rollup", "plain")); + return IncrementalIndexCreator.indexTypeCartesianProduct( + ImmutableList.of("rollup", "plain"), + ImmutableList.of(true, false) + ); } public static AggregatorFactory[] getDefaultCombiningAggregatorFactories() @@ -155,7 +165,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest } return new OnheapIncrementalIndex.Builder() - .setSimpleTestingIndexSchema(false, aggregatorFactories) + .setSimpleTestingIndexSchema(false, false, aggregatorFactories) .setMaxRowCount(1000000) .build(); } @@ -721,4 +731,269 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest Assert.assertEquals(2, index.size()); } + + @Test + public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetric() throws IndexSizeExceededException + { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("sum_of_x", "x") + }; + final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "x", 2) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "x", 3) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5) + ) + ); + + Assert.assertEquals(index.isRollup() ? 1 : 4, index.size()); + Iterator iterator = index.iterator(); + int rowCount = 0; + while (iterator.hasNext()) { + rowCount++; + Row row = iterator.next(); + Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch()); + if (index.isRollup()) { + // All rows are rollup into one row + Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue()); + Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue()); + } else { + // We still have 4 rows + if (rowCount == 1 || rowCount == 2) { + Assert.assertEquals(1, row.getMetric("count").intValue()); + Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue()); + } else { + if (isPreserveExistingMetrics) { + Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue()); + Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue()); + } else { + Assert.assertEquals(1, row.getMetric("count").intValue()); + // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true) + Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x")); + } + } + } + } + } + + @Test + public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException + { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("sum_of_x", "x") + }; + final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", 3, "x", 3, "sum_of_x", 5) + ) + ); + + Assert.assertEquals(index.isRollup() ? 1 : 2, index.size()); + Iterator iterator = index.iterator(); + int rowCount = 0; + while (iterator.hasNext()) { + rowCount++; + Row row = iterator.next(); + Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch()); + if (index.isRollup()) { + // All rows are rollup into one row + Assert.assertEquals(isPreserveExistingMetrics ? 5 : 2, row.getMetric("count").intValue()); + Assert.assertEquals(isPreserveExistingMetrics ? 9 : 3, row.getMetric("sum_of_x").intValue()); + } else { + // We still have 2 rows + if (rowCount == 1) { + if (isPreserveExistingMetrics) { + Assert.assertEquals(2, row.getMetric("count").intValue()); + Assert.assertEquals(4, row.getMetric("sum_of_x").intValue()); + } else { + Assert.assertEquals(1, row.getMetric("count").intValue()); + // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true) + Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x")); + } + } else { + Assert.assertEquals(isPreserveExistingMetrics ? 3 : 1, row.getMetric("count").intValue()); + Assert.assertEquals(isPreserveExistingMetrics ? 5 : 3, row.getMetric("sum_of_x").intValue()); + } + } + } + } + + @Test + public void testSchemaRollupWithRowsWithNoMetrics() throws IndexSizeExceededException + { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("sum_of_x", "x") + }; + final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "x", 4) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "x", 3) + ) + ); + + Assert.assertEquals(index.isRollup() ? 1 : 2, index.size()); + Iterator iterator = index.iterator(); + int rowCount = 0; + while (iterator.hasNext()) { + rowCount++; + Row row = iterator.next(); + Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch()); + if (index.isRollup()) { + // All rows are rollup into one row + Assert.assertEquals(2, row.getMetric("count").intValue()); + Assert.assertEquals(7, row.getMetric("sum_of_x").intValue()); + } else { + // We still have 2 rows + if (rowCount == 1) { + Assert.assertEquals(1, row.getMetric("count").intValue()); + Assert.assertEquals(4, row.getMetric("sum_of_x").intValue()); + } else { + Assert.assertEquals(1, row.getMetric("count").intValue()); + Assert.assertEquals(3, row.getMetric("sum_of_x").intValue()); + } + } + } + } + + @Test + public void testSchemaRollupWithRowWithMixedTypeMetrics() throws IndexSizeExceededException + { + if (isPreserveExistingMetrics) { + expectedException.expect(ParseException.class); + } + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("sum_of_x", "x") + }; + final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", "not a number 1", "sum_of_x", 4) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "host", "host", "count", 3, "x", 3, "sum_of_x", "not a number 2") + ) + ); + + Assert.assertEquals(index.isRollup() ? 1 : 2, index.size()); + Iterator iterator = index.iterator(); + int rowCount = 0; + while (iterator.hasNext()) { + rowCount++; + Row row = iterator.next(); + Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch()); + if (index.isRollup()) { + // All rows are rollup into one row + Assert.assertEquals(2, row.getMetric("count").intValue()); + Assert.assertEquals(3, row.getMetric("sum_of_x").intValue()); + } else { + // We still have 2 rows + if (rowCount == 1) { + Assert.assertEquals(1, row.getMetric("count").intValue()); + // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true) + Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x")); + } else { + Assert.assertEquals(1, row.getMetric("count").intValue()); + Assert.assertEquals(3, row.getMetric("sum_of_x").intValue()); + } + } + } + } + + @Test + public void testSchemaRollupWithRowsWithNonRolledUpSameColumnName() throws IndexSizeExceededException + { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("sum_of_x", "x") + }; + final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "sum_of_x", 100, "x", 4) + ) + ); + index.add( + new MapBasedInputRow( + 1481871600000L, + Arrays.asList("name", "host"), + ImmutableMap.of("name", "name1", "sum_of_x", 100, "x", 3) + ) + ); + + Assert.assertEquals(index.isRollup() ? 1 : 2, index.size()); + Iterator iterator = index.iterator(); + int rowCount = 0; + while (iterator.hasNext()) { + rowCount++; + Row row = iterator.next(); + Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch()); + if (index.isRollup()) { + // All rows are rollup into one row + Assert.assertEquals(2, row.getMetric("count").intValue()); + Assert.assertEquals(isPreserveExistingMetrics ? 200 : 7, row.getMetric("sum_of_x").intValue()); + } else { + // We still have 2 rows + if (rowCount == 1) { + Assert.assertEquals(1, row.getMetric("count").intValue()); + Assert.assertEquals(isPreserveExistingMetrics ? 100 : 4, row.getMetric("sum_of_x").intValue()); + } else { + Assert.assertEquals(1, row.getMetric("count").intValue()); + Assert.assertEquals(isPreserveExistingMetrics ? 100 : 3, row.getMetric("sum_of_x").intValue()); + } + } + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java index 5200ada8ef3..096d5a0aef1 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java @@ -62,7 +62,7 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest public IncrementalIndexAdapterTest(String indexType) throws JsonProcessingException { indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder - .setSimpleTestingIndexSchema("rollup".equals(args[0]), new CountAggregatorFactory("count")) + .setSimpleTestingIndexSchema("rollup".equals(args[0]), null, new CountAggregatorFactory("count")) .setMaxRowCount(1_000_000) .build() )); diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index b33d16b07dd..02d43e27506 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -124,6 +124,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark sortFacts, maxRowCount, maxBytesInMemory, + false, true ); } @@ -147,6 +148,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark true, maxRowCount, maxBytesInMemory, + false, true ); } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java index 3e3d215e58d..f35257e244a 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java @@ -25,6 +25,8 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.joda.time.Duration; @@ -72,6 +74,8 @@ public class ClientCompactionTaskQueryTuningConfig private final Integer maxNumSegmentsToMerge; @Nullable private final Integer totalNumMergeTasks; + @Nullable + private final AppendableIndexSpec appendableIndexSpec; public static ClientCompactionTaskQueryTuningConfig from( @Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig, @@ -81,6 +85,7 @@ public class ClientCompactionTaskQueryTuningConfig if (userCompactionTaskQueryTuningConfig == null) { return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, + new OnheapIncrementalIndex.Spec(true), null, null, null, @@ -100,8 +105,12 @@ public class ClientCompactionTaskQueryTuningConfig null ); } else { + AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null + ? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() + : new OnheapIncrementalIndex.Spec(true); return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, + appendableIndexSpecToUse, userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(), userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(), userCompactionTaskQueryTuningConfig.getMaxTotalRows(), @@ -126,6 +135,7 @@ public class ClientCompactionTaskQueryTuningConfig @JsonCreator public ClientCompactionTaskQueryTuningConfig( @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment, + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @@ -146,6 +156,7 @@ public class ClientCompactionTaskQueryTuningConfig ) { this.maxRowsPerSegment = maxRowsPerSegment; + this.appendableIndexSpec = appendableIndexSpec; this.maxRowsInMemory = maxRowsInMemory; this.maxBytesInMemory = maxBytesInMemory; this.maxTotalRows = maxTotalRows; @@ -306,6 +317,13 @@ public class ClientCompactionTaskQueryTuningConfig return totalNumMergeTasks; } + @JsonProperty + @Nullable + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + @Override public boolean equals(Object o) { @@ -333,7 +351,8 @@ public class ClientCompactionTaskQueryTuningConfig Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout) && Objects.equals(chatHandlerNumRetries, that.chatHandlerNumRetries) && Objects.equals(maxNumSegmentsToMerge, that.maxNumSegmentsToMerge) && - Objects.equals(totalNumMergeTasks, that.totalNumMergeTasks); + Objects.equals(totalNumMergeTasks, that.totalNumMergeTasks) && + Objects.equals(appendableIndexSpec, that.appendableIndexSpec); } @Override @@ -357,7 +376,8 @@ public class ClientCompactionTaskQueryTuningConfig chatHandlerTimeout, chatHandlerNumRetries, maxNumSegmentsToMerge, - totalNumMergeTasks + totalNumMergeTasks, + appendableIndexSpec ); } @@ -383,6 +403,7 @@ public class ClientCompactionTaskQueryTuningConfig ", chatHandlerNumRetries=" + chatHandlerNumRetries + ", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge + ", totalNumMergeTasks=" + totalNumMergeTasks + + ", appendableIndexSpec=" + appendableIndexSpec + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java index 38f559036b9..07ab96c3436 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Duration; @@ -36,6 +37,7 @@ public class UserCompactionTaskQueryTuningConfig extends ClientCompactionTaskQue @JsonCreator public UserCompactionTaskQueryTuningConfig( @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec, @@ -56,6 +58,7 @@ public class UserCompactionTaskQueryTuningConfig extends ClientCompactionTaskQue { super( null, + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxTotalRows, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index c643da154c4..6725dd3428c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.testing.InitializedNullHandlingTest; import org.joda.time.Duration; @@ -123,6 +124,7 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest null, new Period(3600), new UserCompactionTaskQueryTuningConfig( + null, null, null, 10000L, @@ -170,6 +172,7 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest 10000, new Period(3600), new UserCompactionTaskQueryTuningConfig( + null, null, null, 10000L, @@ -213,6 +216,47 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest { final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig( 40000, + null, + 2000L, + null, + new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null), + new DynamicPartitionsSpec(1000, 20000L), + new IndexSpec( + new DefaultBitmapSerdeFactory(), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + new IndexSpec( + new DefaultBitmapSerdeFactory(), + CompressionStrategy.LZ4, + CompressionStrategy.UNCOMPRESSED, + LongEncodingStrategy.AUTO + ), + 2, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100 + ); + + final String json = OBJECT_MAPPER.writeValueAsString(tuningConfig); + final UserCompactionTaskQueryTuningConfig fromJson = + OBJECT_MAPPER.readValue(json, UserCompactionTaskQueryTuningConfig.class); + Assert.assertEquals(tuningConfig, fromJson); + } + + @Test + public void testSerdeUserCompactionTuningConfigWithAppendableIndexSpec() throws IOException + { + final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig( + 40000, + new OnheapIncrementalIndex.Spec(true), 2000L, null, new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java index 7aa6540abf5..dc52f784223 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java @@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory; import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.joda.time.Duration; import org.junit.Assert; @@ -60,6 +61,7 @@ public class UserCompactionTaskQueryTuningConfigTest null, null, null, + null, null ); final String json = OBJECT_MAPPER.writeValueAsString(config); @@ -75,6 +77,7 @@ public class UserCompactionTaskQueryTuningConfigTest { final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig( 40000, + new OnheapIncrementalIndex.Spec(true), 2000L, null, new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index b25ebbaaf1a..8cc8fad3f15 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -722,6 +722,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -786,6 +787,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -844,6 +846,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -902,6 +905,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -968,6 +972,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1029,6 +1034,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1089,6 +1095,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1190,6 +1197,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1315,6 +1323,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1376,6 +1385,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1441,6 +1451,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1587,6 +1598,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -1683,6 +1695,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, @@ -2003,6 +2016,7 @@ public class CompactSegmentsTest null, null, null, + null, partitionsSpec, null, null, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java index 3ad596347af..fd7549c6298 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java @@ -120,6 +120,7 @@ public class NewestSegmentFirstIteratorTest null, null, null, + null, new DynamicPartitionsSpec(null, null), null, null, @@ -163,6 +164,7 @@ public class NewestSegmentFirstIteratorTest null, null, null, + null, new DynamicPartitionsSpec(null, 1000L), null, null, @@ -206,6 +208,7 @@ public class NewestSegmentFirstIteratorTest null, null, null, + null, new DynamicPartitionsSpec(100, 1000L), null, null, @@ -245,6 +248,7 @@ public class NewestSegmentFirstIteratorTest 100, null, new UserCompactionTaskQueryTuningConfig( + null, null, null, 1000L, @@ -292,6 +296,7 @@ public class NewestSegmentFirstIteratorTest null, null, null, + null, new DynamicPartitionsSpec(null, null), null, null, @@ -331,6 +336,7 @@ public class NewestSegmentFirstIteratorTest null, null, new UserCompactionTaskQueryTuningConfig( + null, null, null, 1000L, @@ -378,6 +384,7 @@ public class NewestSegmentFirstIteratorTest null, null, null, + null, new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")), null, null, @@ -421,6 +428,7 @@ public class NewestSegmentFirstIteratorTest null, null, null, + null, new SingleDimensionPartitionsSpec(10000, null, "dim", false), null, null, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index a29bd9eceb5..20268731032 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -45,10 +45,12 @@ import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.ConciseBitmapSerdeFactory; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -1490,6 +1492,96 @@ public class NewestSegmentFirstPolicyTest Assert.assertFalse(iterator.hasNext()); } + @Test + public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() + { + NullHandling.initializeForTests(); + PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null)); + final VersionedIntervalTimeline timeline = createTimeline( + new SegmentGenerateSpec( + Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), + new Period("P1D"), + null, + new CompactionState( + partitionsSpec, + null, + null, + null, + mapper.convertValue(new IndexSpec(), new TypeReference>() {}), + null + ) + ) + ); + + CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig( + 130000, + new Period("P0D"), + null, + null, + null, + new UserCompactionTaskQueryTuningConfig( + null, + new OnheapIncrementalIndex.Spec(true), + null, + 1000L, + null, + partitionsSpec, + new IndexSpec(), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ), + null + )), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + Assert.assertFalse(iterator.hasNext()); + + iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig( + 130000, + new Period("P0D"), + null, + null, + null, + new UserCompactionTaskQueryTuningConfig( + null, + new OnheapIncrementalIndex.Spec(false), + null, + 1000L, + null, + partitionsSpec, + new IndexSpec(), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ), + null + )), + ImmutableMap.of(DATA_SOURCE, timeline), + Collections.emptyMap() + ); + Assert.assertFalse(iterator.hasNext()); + } + private static void assertCompactSegmentIntervals( CompactionSegmentIterator iterator, Period segmentPeriod, @@ -1586,7 +1678,7 @@ public class NewestSegmentFirstPolicyTest UserCompactionTaskGranularityConfig granularitySpec ) { - return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null, null, null); + return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null, null, null, null); } private DataSourceCompactionConfig createCompactionConfig( @@ -1597,6 +1689,19 @@ public class NewestSegmentFirstPolicyTest UserCompactionTaskTransformConfig transformSpec, AggregatorFactory[] metricsSpec ) + { + return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, dimensionsSpec, transformSpec, null, metricsSpec); + } + + private DataSourceCompactionConfig createCompactionConfig( + long inputSegmentSizeBytes, + Period skipOffsetFromLatest, + UserCompactionTaskGranularityConfig granularitySpec, + UserCompactionTaskDimensionsConfig dimensionsSpec, + UserCompactionTaskTransformConfig transformSpec, + UserCompactionTaskQueryTuningConfig tuningConfig, + AggregatorFactory[] metricsSpec + ) { return new DataSourceCompactionConfig( DATA_SOURCE, @@ -1604,7 +1709,7 @@ public class NewestSegmentFirstPolicyTest inputSegmentSizeBytes, null, skipOffsetFromLatest, - null, + tuningConfig, granularitySpec, dimensionsSpec, metricsSpec,