From 50d52a24fc5fd613cc1a3e3de215ef13399698b1 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 3 Aug 2016 02:13:05 +0800 Subject: [PATCH] ability to not rollup at index time, make pre aggregation an option (#3020) * ability to not rollup at index time, make pre aggregation an option * rename getRowIndexForRollup to getPriorIndex * fix doc misspelling * test query using no-rollup indexes * fix benchmark fail due to jmh bug --- .../IncrementalIndexReadBenchmark.java | 4 + .../indexing/IndexIngestionBenchmark.java | 6 +- .../indexing/IndexMergeBenchmark.java | 8 +- .../indexing/IndexPersistBenchmark.java | 4 + docs/content/ingestion/index.md | 2 + docs/content/ingestion/tasks.md | 6 +- docs/content/querying/segmentmetadataquery.md | 6 + .../indexer/DetermineHashedPartitionsJob.java | 1 + .../io/druid/indexer/IndexGeneratorJob.java | 6 +- .../path/GranularUnprocessedPathSpec.java | 1 + .../common/index/YeOldePlumberSchool.java | 2 +- .../druid/indexing/common/task/MergeTask.java | 4 + .../indexing/common/task/TaskSerdeTest.java | 1 + .../SegmentMetadataQueryQueryToolChest.java | 14 +- .../SegmentMetadataQueryRunnerFactory.java | 16 +- .../metadata/metadata/SegmentAnalysis.java | 15 +- .../metadata/SegmentMetadataQuery.java | 8 +- .../java/io/druid/segment/IndexMerger.java | 45 ++- .../main/java/io/druid/segment/Metadata.java | 42 ++ .../segment/incremental/IncrementalIndex.java | 367 ++++++++++++++++-- .../incremental/IncrementalIndexSchema.java | 21 +- .../IncrementalIndexStorageAdapter.java | 22 +- .../incremental/OffheapIncrementalIndex.java | 57 +-- .../incremental/OnheapIncrementalIndex.java | 51 ++- .../io/druid/query/QueryRunnerTestHelper.java | 4 + .../aggregation/AggregationTestHelper.java | 13 +- ...egmentMetadataQueryQueryToolChestTest.java | 75 ++++ .../metadata/SegmentMetadataQueryTest.java | 113 +++++- .../SegmentMetadataUnionQueryTest.java | 16 +- .../java/io/druid/segment/EmptyIndexTest.java | 1 + .../java/io/druid/segment/IndexBuilder.java | 3 +- .../io/druid/segment/IndexMergerTest.java | 274 +++++++++++++ .../IndexMergerV9WithSpatialIndexTest.java | 1 + .../java/io/druid/segment/MetadataTest.java | 6 + .../io/druid/segment/SchemalessIndex.java | 5 +- .../test/java/io/druid/segment/TestIndex.java | 47 ++- .../segment/data/IncrementalIndexTest.java | 61 ++- .../filter/SpatialFilterBonusTest.java | 1 + .../segment/filter/SpatialFilterTest.java | 1 + .../incremental/IncrementalIndexTest.java | 1 + .../OnheapIncrementalIndexBenchmark.java | 4 +- .../OnheapIncrementalIndexTest.java | 5 +- .../granularity/ArbitraryGranularitySpec.java | 22 ++ .../indexing/granularity/GranularitySpec.java | 2 + .../granularity/UniformGranularitySpec.java | 27 +- .../appenderator/AppenderatorImpl.java | 1 + .../realtime/plumber/RealtimePlumber.java | 1 + .../druid/segment/realtime/plumber/Sink.java | 1 + .../granularity/ArbitraryGranularityTest.java | 17 + .../granularity/UniformGranularityTest.java | 18 + 50 files changed, 1261 insertions(+), 168 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java index c074038a388..e8c6fee1eba 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java @@ -82,6 +82,9 @@ public class IncrementalIndexReadBenchmark @Param({"basic"}) private String schema; + @Param({"true", "false"}) + private boolean rollup; + private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class); private static final int RNG_SEED = 9999; private IncrementalIndex incIndex; @@ -125,6 +128,7 @@ public class IncrementalIndexReadBenchmark .withQueryGranularity(QueryGranularities.NONE) .withMetrics(schemaInfo.getAggsArray()) .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .withRollup(rollup) .build(), true, false, diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java index 5756d5ebf0e..5d8b8e4671b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java @@ -63,6 +63,9 @@ public class IndexIngestionBenchmark @Param({"basic"}) private String schema; + @Param({"true", "false"}) + private boolean rollup; + private static final Logger log = new Logger(IndexIngestionBenchmark.class); private static final int RNG_SEED = 9999; @@ -107,11 +110,12 @@ public class IndexIngestionBenchmark .withQueryGranularity(QueryGranularities.NONE) .withMetrics(schemaInfo.getAggsArray()) .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .withRollup(rollup) .build(), true, false, true, - rowsPerSegment + rowsPerSegment * 2 ); } diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java index 462b39882a5..006350e72d6 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java @@ -75,6 +75,9 @@ public class IndexMergeBenchmark @Param({"basic"}) private String schema; + @Param({"true", "false"}) + private boolean rollup; + private static final Logger log = new Logger(IndexMergeBenchmark.class); private static final int RNG_SEED = 9999; private static final IndexMerger INDEX_MERGER; @@ -155,6 +158,7 @@ public class IndexMergeBenchmark .withQueryGranularity(QueryGranularities.NONE) .withMetrics(schemaInfo.getAggsArray()) .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .withRollup(rollup) .build(), true, false, @@ -174,7 +178,7 @@ public class IndexMergeBenchmark log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); tmpFile.deleteOnExit(); - File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec()); + File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec()); blackhole.consume(mergedFile); @@ -192,7 +196,7 @@ public class IndexMergeBenchmark log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); tmpFile.deleteOnExit(); - File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec()); + File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec()); blackhole.consume(mergedFile); diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java index e79e492772f..89f31fe6444 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java @@ -72,6 +72,9 @@ public class IndexPersistBenchmark @Param({"basic"}) private String schema; + @Param({"true", "false"}) + private boolean rollup; + private static final Logger log = new Logger(IndexPersistBenchmark.class); private static final int RNG_SEED = 9999; @@ -156,6 +159,7 @@ public class IndexPersistBenchmark .withQueryGranularity(QueryGranularities.NONE) .withMetrics(schemaInfo.getAggsArray()) .withDimensionsSpec(new DimensionsSpec(null, null, null)) + .withRollup(rollup) .build(), true, false, diff --git a/docs/content/ingestion/index.md b/docs/content/ingestion/index.md index 63f2a62bf65..cff399205a6 100644 --- a/docs/content/ingestion/index.md +++ b/docs/content/ingestion/index.md @@ -186,6 +186,7 @@ This spec is used to generated segments with uniform intervals. | type | string | The type of granularity spec. | no (default == 'uniform') | | segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') | | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | +| rollup | boolean | rollup or not | no (default == true) | | intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | ### Arbitrary Granularity Spec @@ -196,6 +197,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre |-------|------|-------------|----------| | type | string | The type of granularity spec. | no (default == 'uniform') | | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | +| rollup | boolean | rollup or not | no (default == true) | | intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | # IO Config diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index c86f3de4446..ead17089335 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -159,7 +159,10 @@ Append tasks append a list of segments together into a single segment (one after ### Merge Task -Merge tasks merge a list of segments together. Any common timestamps are merged. The grammar is: +Merge tasks merge a list of segments together. Any common timestamps are merged. +If rollup is disabled as part of ingestion, common timestamps are not merged and rows are reordered by their timestamp. + +The grammar is: ```json { @@ -167,6 +170,7 @@ Merge tasks merge a list of segments together. Any common timestamps are merged. "id": , "dataSource": , "aggregations": , + "rollup": , "segments": } ``` diff --git a/docs/content/querying/segmentmetadataquery.md b/docs/content/querying/segmentmetadataquery.md index 1d7d7ec256d..9189e1fc83c 100644 --- a/docs/content/querying/segmentmetadataquery.md +++ b/docs/content/querying/segmentmetadataquery.md @@ -11,6 +11,7 @@ Segment metadata queries return per-segment information about: * Interval the segment covers * Column type of all the columns in the segment * Estimated total segment byte size in if it was stored in a flat format +* Is the segment rolled up * Segment id ```json @@ -143,6 +144,11 @@ null if the aggregators are unknown or unmergeable (if merging is enabled). * The form of the result is a map of column name to aggregator. +#### rollup + +* `rollup` in the result is true/false/null. +* When merging is enabled, if some are rollup, others are not, result is null. + ### lenientAggregatorMerge Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 125237a909d..954be0effe7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -142,6 +142,7 @@ public class DetermineHashedPartitionsJob implements Jobby new UniformGranularitySpec( config.getGranularitySpec().getSegmentGranularity(), config.getGranularitySpec().getQueryGranularity(), + config.getGranularitySpec().isRollup(), intervals ) ); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index ac9edc263b4..2f48b8d4d3d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -226,6 +226,7 @@ public class IndexGeneratorJob implements Jobby .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withMetrics(aggs) + .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup()) .build(); OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex( @@ -514,13 +515,14 @@ public class IndexGeneratorJob implements Jobby ProgressIndicator progressIndicator ) throws IOException { + boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup(); if (config.isBuildV9Directly()) { return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex( - indexes, aggs, file, config.getIndexSpec(), progressIndicator + indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator ); } else { return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex( - indexes, aggs, file, config.getIndexSpec(), progressIndicator + indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java index 1f20cff84f9..2d11ebe3521 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java @@ -112,6 +112,7 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec new UniformGranularitySpec( segmentGranularity, config.getGranularitySpec().getQueryGranularity(), + config.getGranularitySpec().isRollup(), Lists.newArrayList(bucketsToRun) ) ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 02d256c55d6..de01f0099a2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -188,7 +188,7 @@ public class YeOldePlumberSchool implements PlumberSchool } fileToUpload = new File(tmpSegmentDir, "merged"); - theIndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec()); + theIndexMerger.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec()); } // Map merged segment so we can extract dimensions diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java index 1fcc468e0d5..9df49936e09 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java @@ -44,6 +44,7 @@ public class MergeTask extends MergeTaskBase { @JsonIgnore private final List aggregators; + private final Boolean rollup; private final IndexSpec indexSpec; @JsonCreator @@ -52,12 +53,14 @@ public class MergeTask extends MergeTaskBase @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("aggregations") List aggregators, + @JsonProperty("rollup") Boolean rollup, @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("context") Map context ) { super(id, dataSource, segments, context); this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations"); + this.rollup = rollup == null ? Boolean.TRUE : rollup; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; } @@ -82,6 +85,7 @@ public class MergeTask extends MergeTaskBase } } ), + rollup, aggregators.toArray(new AggregatorFactory[aggregators.size()]), outDir, indexSpec diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 86b584ca8c5..ab06fa4dcf5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -176,6 +176,7 @@ public class TaskSerdeTest "foo", segments, aggregators, + true, indexSpec, null ); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 9f732d12d54..3bdd5468fb3 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -355,6 +355,14 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest private final Map aggregators; private final TimestampSpec timestampSpec; private final QueryGranularity queryGranularity; + private final Boolean rollup; @JsonCreator public SegmentAnalysis( @@ -50,7 +51,8 @@ public class SegmentAnalysis implements Comparable @JsonProperty("numRows") long numRows, @JsonProperty("aggregators") Map aggregators, @JsonProperty("timestampSpec") TimestampSpec timestampSpec, - @JsonProperty("queryGranularity") QueryGranularity queryGranularity + @JsonProperty("queryGranularity") QueryGranularity queryGranularity, + @JsonProperty("rollup") Boolean rollup ) { this.id = id; @@ -61,6 +63,7 @@ public class SegmentAnalysis implements Comparable this.aggregators = aggregators; this.timestampSpec = timestampSpec; this.queryGranularity = queryGranularity; + this.rollup = rollup; } @JsonProperty @@ -105,6 +108,12 @@ public class SegmentAnalysis implements Comparable return queryGranularity; } + @JsonProperty + public Boolean isRollup() + { + return rollup; + } + @JsonProperty public Map getAggregators() { @@ -123,6 +132,7 @@ public class SegmentAnalysis implements Comparable ", aggregators=" + aggregators + ", timestampSpec=" + timestampSpec + ", queryGranularity=" + queryGranularity + + ", rollup=" + rollup + '}'; } @@ -141,6 +151,7 @@ public class SegmentAnalysis implements Comparable SegmentAnalysis that = (SegmentAnalysis) o; return size == that.size && numRows == that.numRows && + rollup == that.rollup && Objects.equals(id, that.id) && Objects.equals(interval, that.interval) && Objects.equals(columns, that.columns) && @@ -156,7 +167,7 @@ public class SegmentAnalysis implements Comparable @Override public int hashCode() { - return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity); + return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity, rollup); } @Override diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java index 751f6ed9c9b..dbac143ba3f 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -58,7 +58,8 @@ public class SegmentMetadataQuery extends BaseQuery AGGREGATORS, MINMAX, TIMESTAMPSPEC, - QUERYGRANULARITY; + QUERYGRANULARITY, + ROLLUP; @JsonValue @Override @@ -199,6 +200,11 @@ public class SegmentMetadataQuery extends BaseQuery return analysisTypes.contains(AnalysisType.QUERYGRANULARITY); } + public boolean hasRollup() + { + return analysisTypes.contains(AnalysisType.ROLLUP); + } + public boolean hasMinMax() { return analysisTypes.contains(AnalysisType.MINMAX); diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 1831e1a1f67..1bace7920d0 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -38,6 +38,7 @@ import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.io.OutputSupplier; import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import com.google.inject.Inject; import com.metamx.collections.bitmap.BitmapFactory; import com.metamx.collections.bitmap.ImmutableBitmap; @@ -200,6 +201,11 @@ public class IndexMerger indexSpec.getBitmapSerdeFactory().getBitmapFactory() ) ), + // if index is not rolled up, then it should be not rollup here + // if index is rolled up, then it is no need to rollup again. + // In this case, true/false won't cause reOrdering in merge stage + // while merging a single iterable + false, index.getMetricAggs(), outDir, indexSpec, @@ -209,16 +215,18 @@ public class IndexMerger public File mergeQueryableIndex( List indexes, + boolean rollup, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec ) throws IOException { - return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator()); + return mergeQueryableIndex(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator()); } public File mergeQueryableIndex( List indexes, + boolean rollup, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, @@ -243,6 +251,7 @@ public class IndexMerger ); return merge( indexAdapteres, + rollup, metricAggs, outDir, indexSpec, @@ -252,12 +261,13 @@ public class IndexMerger public File merge( List indexes, + boolean rollup, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec ) throws IOException { - return merge(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator()); + return merge(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator()); } private static List getLexicographicMergedDimensions(List indexes) @@ -328,6 +338,7 @@ public class IndexMerger public File merge( List indexes, + final boolean rollup, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, @@ -409,14 +420,28 @@ public class IndexMerger @Nullable ArrayList> boats ) { - return CombiningIterable.create( - new MergeIterable( - Ordering.natural().nullsFirst(), - boats - ), - Ordering.natural().nullsFirst(), - new RowboatMergeFunction(sortedMetricAggs) - ); + if (rollup) { + return CombiningIterable.create( + new MergeIterable( + Ordering.natural().nullsFirst(), + boats + ), + Ordering.natural().nullsFirst(), + new RowboatMergeFunction(sortedMetricAggs) + ); + } else { + return new MergeIterable( + new Ordering() + { + @Override + public int compare(Rowboat left, Rowboat right) + { + return Longs.compare(left.getTimestamp(), right.getTimestamp()); + } + }.nullsFirst(), + boats + ); + } } }; diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java index 25455620a70..bf605fd86f1 100644 --- a/processing/src/main/java/io/druid/segment/Metadata.java +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -49,6 +49,9 @@ public class Metadata @JsonProperty private QueryGranularity queryGranularity; + @JsonProperty + private Boolean rollup; + public Metadata() { container = new ConcurrentHashMap<>(); @@ -87,6 +90,17 @@ public class Metadata return this; } + public Boolean isRollup() + { + return rollup; + } + + public Metadata setRollup(Boolean rollup) + { + this.rollup = rollup; + return this; + } + public Metadata putAll(Map other) { if (other != null) { @@ -128,6 +142,7 @@ public class Metadata List timestampSpecsToMerge = new ArrayList<>(); List gransToMerge = new ArrayList<>(); + List rollupToMerge = new ArrayList<>(); for (Metadata metadata : toBeMerged) { if (metadata != null) { @@ -143,6 +158,10 @@ public class Metadata if (gransToMerge != null) { gransToMerge.add(metadata.getQueryGranularity()); } + + if (rollupToMerge != null) { + rollupToMerge.add(metadata.isRollup()); + } mergedContainer.putAll(metadata.container); } else { //if metadata and hence aggregators and queryGranularity for some segment being merged are unknown then @@ -150,6 +169,7 @@ public class Metadata aggregatorsToMerge = null; timestampSpecsToMerge = null; gransToMerge = null; + rollupToMerge = null; } } @@ -172,6 +192,23 @@ public class Metadata result.setQueryGranularity(QueryGranularity.mergeQueryGranularities(gransToMerge)); } + Boolean rollup = null; + if (rollupToMerge != null && !rollupToMerge.isEmpty()) { + rollup = rollupToMerge.get(0); + for (Boolean r : rollupToMerge) { + if (r == null) { + rollup = null; + break; + } else if (!r.equals(rollup)) { + rollup = null; + break; + } else { + rollup = r; + } + } + } + + result.setRollup(rollup); result.container.putAll(mergedContainer); return result; @@ -199,6 +236,9 @@ public class Metadata if (timestampSpec != null ? !timestampSpec.equals(metadata.timestampSpec) : metadata.timestampSpec != null) { return false; } + if (rollup != null ? !rollup.equals(metadata.rollup) : metadata.rollup != null) { + return false; + } return queryGranularity != null ? queryGranularity.equals(metadata.queryGranularity) : metadata.queryGranularity == null; @@ -212,6 +252,7 @@ public class Metadata result = 31 * result + Arrays.hashCode(aggregators); result = 31 * result + (timestampSpec != null ? timestampSpec.hashCode() : 0); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); + result = 31 * result + (rollup != null ? rollup.hashCode() : 0); return result; } @@ -223,6 +264,7 @@ public class Metadata ", aggregators=" + Arrays.toString(aggregators) + ", timestampSpec=" + timestampSpec + ", queryGranularity=" + queryGranularity + + ", rollup=" + rollup + '}'; } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 27110753485..cef407d2238 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -25,6 +25,7 @@ import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -70,11 +71,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -353,12 +358,12 @@ public abstract class IncrementalIndex implements Iterable, private final long minTimestamp; private final QueryGranularity gran; + private final boolean rollup; private final List> rowTransformers; private final AggregatorFactory[] metrics; private final AggregatorType[] aggs; private final boolean deserializeComplexMetrics; private final boolean reportParseExceptions; - private final boolean sortFacts; private final Metadata metadata; private final Map metricDescs; @@ -396,22 +401,22 @@ public abstract class IncrementalIndex implements Iterable, public IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, - final boolean reportParseExceptions, - final boolean sortFacts + final boolean reportParseExceptions ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); this.gran = incrementalIndexSchema.getGran(); + this.rollup = incrementalIndexSchema.isRollup(); this.metrics = incrementalIndexSchema.getMetrics(); this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; this.reportParseExceptions = reportParseExceptions; - this.sortFacts = sortFacts; this.metadata = new Metadata() .setAggregators(getCombiningAggregators(metrics)) .setTimestampSpec(incrementalIndexSchema.getTimestampSpec()) - .setQueryGranularity(this.gran); + .setQueryGranularity(this.gran) + .setRollup(this.rollup); this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); this.columnCapabilities = Maps.newHashMap(); @@ -452,7 +457,8 @@ public abstract class IncrementalIndex implements Iterable, } } - private DimDim newDimDim(String dimension, ValueType type) { + private DimDim newDimDim(String dimension, ValueType type) + { DimDim newDimDim; switch (type) { case LONG: @@ -473,7 +479,12 @@ public abstract class IncrementalIndex implements Iterable, // use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation protected abstract DimDim makeDimDim(String dimension, Object lock); - public abstract ConcurrentMap getFacts(); + public boolean isRollup() + { + return rollup; + } + + public abstract FactsHolder getFacts(); public abstract boolean canAppendRow(); @@ -579,7 +590,8 @@ public abstract class IncrementalIndex implements Iterable, * * @return the number of rows in the data set after adding the InputRow */ - public int add(InputRow row) throws IndexSizeExceededException { + public int add(InputRow row) throws IndexSizeExceededException + { TimeAndDims key = toTimeAndDims(row); final int rv = addToFacts( metrics, @@ -694,20 +706,12 @@ public abstract class IncrementalIndex implements Iterable, private long getMinTimeMillis() { - if (sortFacts) { - return ((ConcurrentNavigableMap) getFacts()).firstKey().getTimestamp(); - } else { - throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); - } + return getFacts().getMinTimeMillis(); } private long getMaxTimeMillis() { - if (sortFacts) { - return ((ConcurrentNavigableMap) getFacts()).lastKey().getTimestamp(); - } else { - throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); - } + return getFacts().getMaxTimeMillis(); } private int[] getDimVals(final DimDim dimLookup, final List dimValues) @@ -858,15 +862,6 @@ public abstract class IncrementalIndex implements Iterable, return columnCapabilities.get(column); } - public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) - { - if (sortFacts) { - return ((ConcurrentNavigableMap) getFacts()).subMap(start, end); - } else { - throw new UnsupportedOperationException("can't get subMap from unsorted facts data."); - } - } - public Metadata getMetadata() { return metadata; @@ -896,15 +891,8 @@ public abstract class IncrementalIndex implements Iterable, { final List dimensions = getDimensions(); - Map facts = null; - if (descending && sortFacts) { - facts = ((ConcurrentNavigableMap) getFacts()).descendingMap(); - } else { - facts = getFacts(); - } - return Iterators.transform( - facts.entrySet().iterator(), + getFacts().iterator(descending), new Function, Row>() { @Override @@ -1320,4 +1308,313 @@ public abstract class IncrementalIndex implements Iterable, return retVal; } } + + public static class FactsEntry implements Map.Entry + { + TimeAndDims key = null; + Integer value = null; + + public FactsEntry(TimeAndDims key, Integer value) + { + this.key = key; + this.value = value; + } + + public TimeAndDims getKey() + { + return key; + } + + public Integer getValue() + { + return value; + } + + @Override + public Integer setValue(Integer value) + { + return value; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FactsEntry that = (FactsEntry) o; + + if (key != null ? !key.equals(that.key) : that.key != null) { + return false; + } + return value != null ? value.equals(that.value) : that.value == null; + } + + @Override + public int hashCode() + { + int result = key != null ? key.hashCode() : 0; + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } + } + + interface FactsHolder + { + /** + * @return the previous value associated with the specified key, or + * {@code null} if there was no mapping for the key. + */ + Integer getPriorIndex(TimeAndDims key); + + long getMinTimeMillis(); + + long getMaxTimeMillis(); + + Iterable> entrySet(); + + Iterator> iterator(boolean descending); + + Iterable> timeRangeIterable(boolean descending, long timeStart, long timeEnd); + + Iterable keySet(); + + /** + * @return the previous value associated with the specified key, or + * {@code null} if there was no mapping for the key. + */ + Integer putIfAbsent(TimeAndDims key, Integer rowIndex); + + void clear(); + } + + static class RollupFactsHolder implements FactsHolder + { + private final boolean sortFacts; + private final ConcurrentMap facts; + + public RollupFactsHolder(boolean sortFacts, Comparator timeAndDimsComparator) + { + this.sortFacts = sortFacts; + if (sortFacts) { + this.facts = new ConcurrentSkipListMap<>(timeAndDimsComparator); + } else { + this.facts = new ConcurrentHashMap<>(); + } + } + + @Override + public Integer getPriorIndex(TimeAndDims key) + { + return facts.get(key); + } + + @Override + public long getMinTimeMillis() + { + if (sortFacts) { + return ((ConcurrentNavigableMap) facts).firstKey().getTimestamp(); + } else { + throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); + } + } + + @Override + public long getMaxTimeMillis() + { + if (sortFacts) { + return ((ConcurrentNavigableMap) facts).lastKey().getTimestamp(); + } else { + throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); + } + } + + public Iterable> entrySet() + { + return facts.entrySet(); + } + + @Override + public Iterator> iterator(boolean descending) + { + if (descending && sortFacts) { + return ((ConcurrentNavigableMap) facts).descendingMap().entrySet().iterator(); + } + return entrySet().iterator(); + } + + @Override + public Iterable> timeRangeIterable(boolean descending, long timeStart, long timeEnd) + { + if (!sortFacts) { + throw new UnsupportedOperationException("can't get timeRange from unsorted facts data."); + } + TimeAndDims start = new TimeAndDims(timeStart, new int[][]{}); + TimeAndDims end = new TimeAndDims(timeEnd, new int[][]{}); + ConcurrentNavigableMap subMap = + ((ConcurrentNavigableMap) facts).subMap(start, end); + final Map rangeMap = descending ? subMap.descendingMap() : subMap; + return rangeMap.entrySet(); + } + + @Override + public Iterable keySet() + { + return facts.keySet(); + } + + @Override + public Integer putIfAbsent(TimeAndDims key, Integer rowIndex) + { + return facts.putIfAbsent(key, rowIndex); + } + + @Override + public void clear() + { + facts.clear(); + } + } + + static class PlainFactsHolder implements FactsHolder + { + private final boolean sortFacts; + private final ConcurrentMap>> facts; + + public PlainFactsHolder(boolean sortFacts) + { + this.sortFacts = sortFacts; + if (sortFacts) { + this.facts = new ConcurrentSkipListMap<>(new Comparator() + { + @Override + public int compare(Long lhs, Long rhs) + { + return Longs.compare(lhs, rhs); + } + }); + } else { + this.facts = new ConcurrentHashMap<>(); + } + } + + @Override + public Integer getPriorIndex(TimeAndDims key) + { + // always return null to indicate that no prior key cause we always add new row + return null; + } + + @Override + public long getMinTimeMillis() + { + if (sortFacts) { + return ((ConcurrentNavigableMap>>) facts).firstKey(); + } else { + throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); + } + } + + @Override + public long getMaxTimeMillis() + { + if (sortFacts) { + return ((ConcurrentNavigableMap>>) facts).lastKey(); + } else { + throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); + } + } + + public Iterable> entrySet() + { + return concat(facts.values(), false); + } + + @Override + public Iterator> iterator(boolean descending) + { + if (descending && sortFacts) { + return concat(((ConcurrentNavigableMap>>) facts) + .descendingMap().values(), true).iterator(); + } + return concat(facts.values(), false).iterator(); + } + + @Override + public Iterable> timeRangeIterable(boolean descending, long timeStart, long timeEnd) + { + ConcurrentNavigableMap>> subMap = + ((ConcurrentNavigableMap>>) facts).subMap(timeStart, timeEnd); + final Map>> rangeMap = descending ? subMap.descendingMap() : subMap; + return concat(rangeMap.values(), descending); + } + + private Iterable> concat( + final Iterable>> iterable, + final boolean descending + ) + { + return new Iterable>() + { + @Override + public Iterator> iterator() + { + return Iterators.concat( + Iterators.transform( + iterable.iterator(), + new Function>, Iterator>>() + { + @Override + public Iterator> apply(Deque> input) + { + return descending ? input.descendingIterator() : input.iterator(); + } + } + ) + ); + } + }; + } + + @Override + public Iterable keySet() + { + return Iterables.transform( + entrySet(), + new Function, TimeAndDims>() + { + @Override + public TimeAndDims apply(Map.Entry input) + { + return input.getKey(); + } + } + ); + } + + @Override + public Integer putIfAbsent(TimeAndDims key, Integer rowIndex) + { + Long time = key.getTimestamp(); + Deque> rows = facts.get(time); + if (rows == null) { + facts.putIfAbsent(time, new ConcurrentLinkedDeque>()); + // in race condition, rows may be put by other thread, so always get latest status from facts + rows = facts.get(time); + } + rows.add(new FactsEntry(key, rowIndex)); + // always return null to indicate that we always add new row + return null; + } + + @Override + public void clear() + { + facts.clear(); + } + } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java index e6cb94315d4..282249a0d7b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java @@ -30,18 +30,21 @@ import io.druid.query.aggregation.AggregatorFactory; */ public class IncrementalIndexSchema { + public static final boolean DEFAULT_ROLLUP = true; private final long minTimestamp; private final TimestampSpec timestampSpec; private final QueryGranularity gran; private final DimensionsSpec dimensionsSpec; private final AggregatorFactory[] metrics; + private final boolean rollup; public IncrementalIndexSchema( long minTimestamp, TimestampSpec timestampSpec, QueryGranularity gran, DimensionsSpec dimensionsSpec, - AggregatorFactory[] metrics + AggregatorFactory[] metrics, + boolean rollup ) { this.minTimestamp = minTimestamp; @@ -49,6 +52,7 @@ public class IncrementalIndexSchema this.gran = gran; this.dimensionsSpec = dimensionsSpec; this.metrics = metrics; + this.rollup = rollup; } public long getMinTimestamp() @@ -76,6 +80,11 @@ public class IncrementalIndexSchema return metrics; } + public boolean isRollup() + { + return rollup; + } + public static class Builder { private long minTimestamp; @@ -83,6 +92,7 @@ public class IncrementalIndexSchema private QueryGranularity gran; private DimensionsSpec dimensionsSpec; private AggregatorFactory[] metrics; + private boolean rollup; public Builder() { @@ -90,6 +100,7 @@ public class IncrementalIndexSchema this.gran = QueryGranularities.NONE; this.dimensionsSpec = new DimensionsSpec(null, null, null); this.metrics = new AggregatorFactory[]{}; + this.rollup = true; } public Builder withMinTimestamp(long minTimestamp) @@ -147,10 +158,16 @@ public class IncrementalIndexSchema return this; } + public Builder withRollup(boolean rollup) + { + this.rollup = rollup; + return this; + } + public IncrementalIndexSchema build() { return new IncrementalIndexSchema( - minTimestamp, timestampSpec, gran, dimensionsSpec, metrics + minTimestamp, timestampSpec, gran, dimensionsSpec, metrics, rollup ); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index f749bf9554a..80b400a32b9 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -69,7 +69,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentNavigableMap; /** */ @@ -228,22 +227,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this, currEntry); private Iterator> baseIter; - private ConcurrentNavigableMap cursorMap; + private Iterable> cursorIterable; + private boolean emptyRange; final DateTime time; int numAdvanced = -1; boolean done; { - cursorMap = index.getSubMap( - new IncrementalIndex.TimeAndDims(timeStart, new int[][]{}), - new IncrementalIndex.TimeAndDims( - Math.min(actualInterval.getEndMillis(), gran.next(input)), - new int[][]{} - ) + cursorIterable = index.getFacts().timeRangeIterable( + descending, + timeStart, + Math.min(actualInterval.getEndMillis(), gran.next(input)) ); - if (descending) { - cursorMap = cursorMap.descendingMap(); - } + emptyRange = !cursorIterable.iterator().hasNext(); time = gran.toDateTime(input); reset(); @@ -299,7 +295,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public void reset() { - baseIter = cursorMap.entrySet().iterator(); + baseIter = cursorIterable.iterator(); if (numAdvanced == -1) { numAdvanced = 0; @@ -322,7 +318,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter numAdvanced++; } - done = !foundMatched && (cursorMap.size() == 0 || !baseIter.hasNext()); + done = !foundMatched && (emptyRange || !baseIter.hasNext()); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 6dbb924a8f8..bbb42936192 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -20,7 +20,6 @@ package io.druid.segment.incremental; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.metamx.common.IAE; import com.metamx.common.ISE; @@ -34,14 +33,10 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; import io.druid.segment.ColumnSelectorFactory; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; /** @@ -55,7 +50,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex private final List> aggBuffers = new ArrayList<>(); private final List indexAndOffsets = new ArrayList<>(); - private final ConcurrentMap facts; + private final FactsHolder facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); @@ -80,15 +75,12 @@ public class OffheapIncrementalIndex extends IncrementalIndex StupidPool bufferPool ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); this.maxRowCount = maxRowCount; this.bufferPool = bufferPool; - if (sortFacts) { - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); - } else { - this.facts = new ConcurrentHashMap<>(); - } + this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator()) + : new PlainFactsHolder(sortFacts); //check that stupid pool gives buffers that can hold at least one row's aggregators ResourceHolder bb = bufferPool.take(); @@ -114,6 +106,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) .withQueryGranularity(gran) .withMetrics(metrics) + .withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP) .build(), deserializeComplexMetrics, reportParseExceptions, @@ -123,6 +116,29 @@ public class OffheapIncrementalIndex extends IncrementalIndex ); } + public OffheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + boolean rollup, + final AggregatorFactory[] metrics, + int maxRowCount, + StupidPool bufferPool + ) + { + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .withRollup(rollup) + .build(), + true, + true, + true, + maxRowCount, + bufferPool + ); + } + public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, @@ -132,20 +148,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex ) { this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - true, - true, - true, + minTimestamp, + gran, + IncrementalIndexSchema.DEFAULT_ROLLUP, + metrics, maxRowCount, bufferPool ); } @Override - public ConcurrentMap getFacts() + public FactsHolder getFacts() { return facts; } @@ -207,7 +220,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex int bufferOffset; synchronized (this) { - final Integer priorIndex = facts.get(key); + final Integer priorIndex = facts.getPriorIndex(key); if (null != priorIndex) { final int[] indexAndOffset = indexAndOffsets.get(priorIndex); bufferIndex = indexAndOffset[0]; @@ -254,7 +267,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex } // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { + if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == null) { throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 98a6361c9cb..d18a1c52715 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; /** @@ -50,7 +49,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex private static final Logger log = new Logger(OnheapIncrementalIndex.class); private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); - private final ConcurrentMap facts; + private final FactsHolder facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); protected final int maxRowCount; private volatile Map selectors; @@ -65,14 +64,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex int maxRowCount ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); this.maxRowCount = maxRowCount; - if (sortFacts) { - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); - } else { - this.facts = new ConcurrentHashMap<>(); - } + this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator()) + : new PlainFactsHolder(sortFacts); } public OnheapIncrementalIndex( @@ -89,6 +85,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) .withQueryGranularity(gran) .withMetrics(metrics) + .withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP) .build(), deserializeComplexMetrics, reportParseExceptions, @@ -97,6 +94,27 @@ public class OnheapIncrementalIndex extends IncrementalIndex ); } + public OnheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + boolean rollup, + final AggregatorFactory[] metrics, + int maxRowCount + ) + { + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .withRollup(rollup) + .build(), + true, + true, + true, + maxRowCount + ); + } + public OnheapIncrementalIndex( long minTimestamp, QueryGranularity gran, @@ -105,13 +123,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex ) { this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - true, - true, - true, + minTimestamp, + gran, + IncrementalIndexSchema.DEFAULT_ROLLUP, + metrics, maxRowCount ); } @@ -126,7 +141,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } @Override - public ConcurrentMap getFacts() + public FactsHolder getFacts() { return facts; } @@ -165,7 +180,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex Supplier rowSupplier ) throws IndexSizeExceededException { - final Integer priorIndex = facts.get(key); + final Integer priorIndex = facts.getPriorIndex(key); Aggregator[] aggs; @@ -181,7 +196,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex concurrentSet(rowIndex, aggs); // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { + if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == null) { throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); } final Integer prev = facts.putIfAbsent(key, rowIndex); diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index c3b4b3d02dc..7810b847792 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -323,11 +323,15 @@ public class QueryRunnerTestHelper throws IOException { final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final IncrementalIndex noRollupRtIndex = TestIndex.getNoRollupIncrementalTestIndex(); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); + final QueryableIndex noRollupMMappedTestIndex = TestIndex.getNoRollupMMappedTestIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); return ImmutableList.of( makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)), + makeQueryRunner(factory, new IncrementalIndexSegment(noRollupRtIndex, segmentId)), makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)), + makeQueryRunner(factory, new QueryableIndexSegment(segmentId, noRollupMMappedTestIndex)), makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) ); } diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 08abea147c0..78f3f81e406 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -28,21 +28,16 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.base.Function; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; -import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.IAE; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; - -import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.StringInputRowParser; @@ -56,16 +51,12 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; -import io.druid.query.QueryWatcher; import io.druid.query.groupby.GroupByQueryConfig; -import io.druid.query.groupby.GroupByQueryEngine; -import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.query.groupby.GroupByQueryRunnerTest; import io.druid.query.select.SelectQueryEngine; import io.druid.query.select.SelectQueryQueryToolChest; import io.druid.query.select.SelectQueryRunnerFactory; -import io.druid.segment.AbstractSegment; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; @@ -75,7 +66,6 @@ import io.druid.segment.Segment; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; - import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; import org.junit.rules.TemporaryFolder; @@ -84,7 +74,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -334,7 +323,7 @@ public class AggregationTestHelper for (File file : toMerge) { indexes.add(indexIO.loadIndex(file)); } - indexMerger.mergeQueryableIndex(indexes, metrics, outDir, new IndexSpec()); + indexMerger.mergeQueryableIndex(indexes, true, metrics, outDir, new IndexSpec()); for (QueryableIndex qi : indexes) { qi.close(); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index 09f78a3e6c5..24da3e4b9b1 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -87,6 +87,7 @@ public class SegmentMetadataQueryQueryToolChestTest 100, null, null, + null, null ); @@ -117,6 +118,7 @@ public class SegmentMetadataQueryQueryToolChestTest "baz", new DoubleSumAggregatorFactory("baz", "baz") ), null, + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -130,6 +132,7 @@ public class SegmentMetadataQueryQueryToolChestTest "bar", new DoubleSumAggregatorFactory("bar", "bar") ), null, + null, null ); @@ -162,6 +165,7 @@ public class SegmentMetadataQueryQueryToolChestTest 0, null, null, + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -175,6 +179,7 @@ public class SegmentMetadataQueryQueryToolChestTest "bar", new DoubleSumAggregatorFactory("bar", "bar") ), null, + null, null ); @@ -199,6 +204,7 @@ public class SegmentMetadataQueryQueryToolChestTest 0, null, null, + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -209,6 +215,7 @@ public class SegmentMetadataQueryQueryToolChestTest 0, null, null, + null, null ); @@ -230,6 +237,7 @@ public class SegmentMetadataQueryQueryToolChestTest "bar", new DoubleSumAggregatorFactory("bar", "bar") ), null, + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -244,6 +252,7 @@ public class SegmentMetadataQueryQueryToolChestTest "baz", new LongMaxAggregatorFactory("baz", "baz") ), null, + null, null ); @@ -264,6 +273,72 @@ public class SegmentMetadataQueryQueryToolChestTest ); } + @Test + public void testMergeRollup() + { + final SegmentAnalysis analysis1 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null, + null, + null, + null + ); + final SegmentAnalysis analysis2 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null, + null, + null, + false + ); + final SegmentAnalysis analysis3 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null, + null, + null, + false + ); + final SegmentAnalysis analysis4 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null, + null, + null, + true + ); + final SegmentAnalysis analysis5 = new SegmentAnalysis( + "id", + null, + Maps.newHashMap(), + 0, + 0, + null, + null, + null, + true + ); + + Assert.assertNull(mergeStrict(analysis1, analysis2).isRollup()); + Assert.assertNull(mergeStrict(analysis1, analysis4).isRollup()); + Assert.assertNull(mergeStrict(analysis2, analysis4).isRollup()); + Assert.assertFalse(mergeStrict(analysis2, analysis3).isRollup()); + Assert.assertTrue(mergeStrict(analysis4, analysis5).isRollup()); + } + private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2) { return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index 637d81a8c42..a3a3c5d585d 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -47,10 +47,12 @@ import io.druid.query.metadata.metadata.ListColumnIncluderator; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.TestHelper; import io.druid.segment.TestIndex; import io.druid.segment.column.ValueType; +import io.druid.segment.incremental.IncrementalIndex; import io.druid.timeline.LogicalSegment; import org.joda.time.Interval; import org.junit.Assert; @@ -78,26 +80,30 @@ public class SegmentMetadataQueryTest @SuppressWarnings("unchecked") public static QueryRunner makeMMappedQueryRunner( String segmentId, + boolean rollup, QueryRunnerFactory factory ) { + QueryableIndex index = rollup ? TestIndex.getMMappedTestIndex() : TestIndex.getNoRollupMMappedTestIndex(); return QueryRunnerTestHelper.makeQueryRunner( factory, segmentId, - new QueryableIndexSegment(segmentId, TestIndex.getMMappedTestIndex()) + new QueryableIndexSegment(segmentId, index) ); } @SuppressWarnings("unchecked") public static QueryRunner makeIncrementalIndexQueryRunner( String segmentId, + boolean rollup, QueryRunnerFactory factory ) { + IncrementalIndex index = rollup ? TestIndex.getIncrementalTestIndex() : TestIndex.getNoRollupIncrementalTestIndex(); return QueryRunnerTestHelper.makeQueryRunner( factory, segmentId, - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId) + new IncrementalIndexSegment(index, segmentId) ); } @@ -105,35 +111,42 @@ public class SegmentMetadataQueryTest private final QueryRunner runner2; private final boolean mmap1; private final boolean mmap2; + private final boolean rollup1; + private final boolean rollup2; private final boolean differentIds; private final SegmentMetadataQuery testQuery; private final SegmentAnalysis expectedSegmentAnalysis1; private final SegmentAnalysis expectedSegmentAnalysis2; - @Parameterized.Parameters(name = "mmap1 = {0}, mmap2 = {1}, differentIds = {2}") + @Parameterized.Parameters(name = "mmap1 = {0}, mmap2 = {1}, rollup1 = {2}, rollup2 = {3}, differentIds = {4}") public static Collection constructorFeeder() { return ImmutableList.of( - new Object[]{true, true, false}, - new Object[]{true, false, false}, - new Object[]{false, true, false}, - new Object[]{false, false, false}, - new Object[]{false, false, true} + new Object[]{true, true, true, true, false}, + new Object[]{true, false, true, false, false}, + new Object[]{false, true, true, false, false}, + new Object[]{false, false, false, false, false}, + new Object[]{false, false, true, true, false}, + new Object[]{false, false, false, true, true} ); } public SegmentMetadataQueryTest( boolean mmap1, boolean mmap2, + boolean rollup1, + boolean rollup2, boolean differentIds ) { final String id1 = differentIds ? "testSegment1" : "testSegment"; final String id2 = differentIds ? "testSegment2" : "testSegment"; - this.runner1 = mmap1 ? makeMMappedQueryRunner(id1, FACTORY) : makeIncrementalIndexQueryRunner(id1, FACTORY); - this.runner2 = mmap2 ? makeMMappedQueryRunner(id2, FACTORY) : makeIncrementalIndexQueryRunner(id2, FACTORY); + this.runner1 = mmap1 ? makeMMappedQueryRunner(id1, rollup1, FACTORY) : makeIncrementalIndexQueryRunner(id1, rollup1, FACTORY); + this.runner2 = mmap2 ? makeMMappedQueryRunner(id2, rollup2, FACTORY) : makeIncrementalIndexQueryRunner(id2, rollup2, FACTORY); this.mmap1 = mmap1; this.mmap2 = mmap2; + this.rollup1 = rollup1; + this.rollup2 = rollup2; this.differentIds = differentIds; testQuery = Druids.newSegmentMetadataQueryBuilder() .dataSource("testing") @@ -183,6 +196,7 @@ public class SegmentMetadataQueryTest 1209, null, null, + null, null ); expectedSegmentAnalysis2 = new SegmentAnalysis( @@ -226,6 +240,7 @@ public class SegmentMetadataQueryTest 1209, null, null, + null, null ); } @@ -242,6 +257,75 @@ public class SegmentMetadataQueryTest Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis1), results); } + @Test + public void testSegmentMetadataQueryWithRollupMerge() + { + SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis( + differentIds ? "merged" : "testSegment", + null, + ImmutableMap.of( + "placement", + new ColumnAnalysis( + ValueType.STRING.toString(), + false, + 0, + 0, + null, + null, + null + ), + "placementish", + new ColumnAnalysis( + ValueType.STRING.toString(), + true, + 0, + 0, + null, + null, + null + ) + ), + 0, + expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), + null, + null, + null, + rollup1 != rollup2 ? null : rollup1 + ); + + QueryToolChest toolChest = FACTORY.getToolchest(); + + ExecutorService exec = Executors.newCachedThreadPool(); + QueryRunner myRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + FACTORY.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Lists.>newArrayList( + toolChest.preMergeQueryDecoration(runner1), + toolChest.preMergeQueryDecoration(runner2) + ) + ) + ), + toolChest + ); + + TestHelper.assertExpectedObjects( + ImmutableList.of(mergedSegmentAnalysis), + myRunner.run( + Druids.newSegmentMetadataQueryBuilder() + .dataSource("testing") + .intervals("2013/2014") + .toInclude(new ListColumnIncluderator(Arrays.asList("placement", "placementish"))) + .analysisTypes(SegmentMetadataQuery.AnalysisType.ROLLUP) + .merge(true) + .build(), + Maps.newHashMap() + ), + "failed SegmentMetadata merging query" + ); + exec.shutdownNow(); + } + @Test public void testSegmentMetadataQueryWithHasMultipleValuesMerge() { @@ -274,6 +358,7 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, null, + null, null ); @@ -342,6 +427,7 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, null, + null, null ); @@ -459,6 +545,7 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, null, + null, null ); @@ -510,6 +597,7 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, null, + null, null ); @@ -572,6 +660,7 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedAggregators, null, + null, null ); @@ -630,6 +719,7 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, new TimestampSpec("ds", "auto", null), + null, null ); @@ -688,7 +778,8 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, null, - QueryGranularities.NONE + QueryGranularities.NONE, + null ); QueryToolChest toolChest = FACTORY.getToolchest(); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java index b99097428c0..1e1f2e9839d 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java @@ -19,25 +19,15 @@ package io.druid.query.metadata; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import io.druid.collections.StupidPool; import io.druid.query.Druids; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.Result; -import io.druid.query.TestQueryRunners; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.DoubleMaxAggregatorFactory; -import io.druid.query.aggregation.DoubleMinAggregatorFactory; -import io.druid.query.aggregation.PostAggregator; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.ListColumnIncluderator; import io.druid.query.metadata.metadata.SegmentAnalysis; @@ -47,18 +37,13 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.TestHelper; import io.druid.segment.TestIndex; import io.druid.segment.column.ValueType; -import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; @RunWith(Parameterized.class) public class SegmentMetadataUnionQueryTest @@ -127,6 +112,7 @@ public class SegmentMetadataUnionQueryTest 4836, null, null, + null, null ); SegmentMetadataQuery query = new Druids.SegmentMetadataQueryBuilder() diff --git a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java index faaa5bf48bf..2885be71c21 100644 --- a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java @@ -61,6 +61,7 @@ public class EmptyIndexTest ); TestHelper.getTestIndexMerger().merge( Lists.newArrayList(emptyIndexAdapter), + true, new AggregatorFactory[0], tmpDir, new IndexSpec() diff --git a/processing/src/test/java/io/druid/segment/IndexBuilder.java b/processing/src/test/java/io/druid/segment/IndexBuilder.java index 699e5c03a85..702e690b2be 100644 --- a/processing/src/test/java/io/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/io/druid/segment/IndexBuilder.java @@ -25,7 +25,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.druid.data.input.InputRow; -import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; @@ -33,7 +32,6 @@ import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.OnheapIncrementalIndex; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -161,6 +159,7 @@ public class IndexBuilder } } ), + true, Iterables.toArray( Iterables.transform( Arrays.asList(schema.getMetrics()), diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 7e285f4b23c..cd7a3fe24ff 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -273,6 +273,7 @@ public class IndexMergerTest IncrementalIndexTest.getDefaultCombiningAggregatorFactories() ) .setQueryGranularity(QueryGranularities.NONE) + .setRollup(Boolean.TRUE) .putAll(metadataElems), index.getMetadata() ); @@ -347,6 +348,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(index1, index2), + true, mergedAggregators, mergedDir, indexSpec @@ -424,6 +426,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(index1, index2), + true, new AggregatorFactory[]{}, tmpDir3, indexSpec @@ -485,6 +488,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( ImmutableList.of(index1), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, mergedDir, indexSpec @@ -543,6 +547,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( ImmutableList.of(index1), + true, mergedAggregators, mergedDir, indexSpec @@ -611,6 +616,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( ImmutableList.of(index1), + true, mergedAggregators, mergedDir, newSpec @@ -841,6 +847,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(index1, index2, index3), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec @@ -940,6 +947,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(index1, index2, index3), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec @@ -1020,6 +1028,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(indexA, indexB), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec @@ -1031,6 +1040,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(indexA, indexB2), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec @@ -1178,6 +1188,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(indexA, indexB), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec @@ -1228,6 +1239,261 @@ public class IndexMergerTest checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921")); } + @Test + public void testNoRollupMergeWithoutDuplicateRow() throws Exception + { + // (d1, d2, d3) from only one index, and their dim values are ('empty', 'has null', 'no null') + // (d4, d5, d6, d7, d8, d9) are from both indexes + // d4: 'empty' join 'empty' + // d5: 'empty' join 'has null' + // d6: 'empty' join 'no null' + // d7: 'has null' join 'has null' + // d8: 'has null' join 'no null' + // d9: 'no null' join 'no null' + + IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(0L) + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .withRollup(false) + .build(); + IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000); + toPersistA.add( + new MapBasedInputRow( + 1, + Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910" + ) + ) + ); + toPersistA.add( + new MapBasedInputRow( + 2, + Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d2", "210", "d3", "311", "d7", "710", "d8", "810", "d9", "911" + ) + ) + ); + + IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000); + toPersistB.add( + new MapBasedInputRow( + 3, + Arrays.asList("d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d5", "520", "d6", "620", "d7", "720", "d8", "820", "d9", "920" + ) + ) + ); + toPersistB.add( + new MapBasedInputRow( + 4, + Arrays.asList("d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d5", "", "d6", "621", "d7", "", "d8", "821", "d9", "921" + ) + ) + ); + final File tmpDirA = temporaryFolder.newFolder(); + final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + + QueryableIndex indexA = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistA, + tmpDirA, + indexSpec + ) + ) + ); + + QueryableIndex indexB = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistB, + tmpDirB, + indexSpec + ) + ) + ); + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List boatList = ImmutableList.copyOf(adapter.getRows()); + + Assert.assertEquals( + ImmutableList.of("d2", "d3", "d5", "d6", "d7", "d8", "d9"), + ImmutableList.copyOf(adapter.getDimensionNames()) + ); + Assert.assertEquals(4, boatList.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims()); + + checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", "")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210")); + + checkBitmapIndex(Lists.newArrayList(2, 3), adapter.getBitmapIndex("d3", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d3", "310")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d3", "311")); + + checkBitmapIndex(Lists.newArrayList(0, 1, 3), adapter.getBitmapIndex("d5", "")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d5", "520")); + + checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("d6", "")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d6", "620")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d6", "621")); + + checkBitmapIndex(Lists.newArrayList(0, 3), adapter.getBitmapIndex("d7", "")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d7", "710")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d7", "720")); + + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d8", "")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d8", "810")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d8", "820")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d8", "821")); + + checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("d9", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d9", "910")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d9", "911")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d9", "920")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921")); + } + + @Test + public void testNoRollupMergeWithDuplicateRow() throws Exception + { + // (d3, d6, d8, d9) as actually data from index1 and index2 + // index1 has two duplicate rows + // index2 has 1 row which is same as index1 row and another different row + // then we can test + // 1. incrementalIndex with duplicate rows + // 2. incrementalIndex without duplicate rows + // 3. merge 2 indexes with duplicate rows + + IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(0L) + .withQueryGranularity(QueryGranularities.NONE) + .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .withRollup(false) + .build(); + IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000); + toPersistA.add( + new MapBasedInputRow( + 1, + Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910" + ) + ) + ); + toPersistA.add( + new MapBasedInputRow( + 1, + Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910" + ) + ) + ); + + IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000); + toPersistB.add( + new MapBasedInputRow( + 1, + Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910" + ) + ) + ); + toPersistB.add( + new MapBasedInputRow( + 4, + Arrays.asList("d4", "d5", "d6", "d7", "d8", "d9"), + ImmutableMap.of( + "d5", "", "d6", "621", "d7", "", "d8", "821", "d9", "921" + ) + ) + ); + final File tmpDirA = temporaryFolder.newFolder(); + final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + + QueryableIndex indexA = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistA, + tmpDirA, + indexSpec + ) + ) + ); + + QueryableIndex indexB = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistB, + tmpDirB, + indexSpec + ) + ) + ); + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB), + false, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List boatList = ImmutableList.copyOf(adapter.getRows()); + + Assert.assertEquals( + ImmutableList.of("d3", "d6", "d8", "d9"), + ImmutableList.copyOf(adapter.getDimensionNames()) + ); + Assert.assertEquals(4, boatList.size()); + Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}, {0}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}, {0}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}, {0}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {1}, {1}, {1}}, boatList.get(3).getDims()); + + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d3", "")); + checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d3", "310")); + + checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d6", "")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d6", "621")); + + checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d8", "")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d8", "821")); + + checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("d9", "")); + checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d9", "910")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921")); + } + private void checkBitmapIndex(ArrayList expected, IndexedInts real) { Assert.assertEquals(expected.size(), real.size()); @@ -1335,6 +1601,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(indexA, indexB, indexBA, indexBA2), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec @@ -1346,6 +1613,7 @@ public class IndexMergerTest INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(indexA, indexB, indexBA, indexC), + true, new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged2, indexSpec @@ -1463,6 +1731,7 @@ public class IndexMergerTest INDEX_MERGER.merge( toMerge, + true, new AggregatorFactory[]{ new LongSumAggregatorFactory("A", "A"), new LongSumAggregatorFactory("C", "C"), @@ -1514,6 +1783,7 @@ public class IndexMergerTest File merged = INDEX_MERGER.merge( toMerge, + true, new AggregatorFactory[]{ new LongSumAggregatorFactory("A", "A"), new LongSumAggregatorFactory("C", "C") @@ -1584,6 +1854,7 @@ public class IndexMergerTest File merged = INDEX_MERGER.merge( toMerge, + true, new AggregatorFactory[]{ new LongSumAggregatorFactory("A", "A"), new LongSumAggregatorFactory("C", "C") @@ -1644,6 +1915,7 @@ public class IndexMergerTest File merged = INDEX_MERGER.merge( toMerge, + true, new AggregatorFactory[]{ new LongSumAggregatorFactory("A", "A"), new LongSumAggregatorFactory("B", "B"), @@ -1688,6 +1960,7 @@ public class IndexMergerTest final File merged = INDEX_MERGER.merge( toMerge, + true, new AggregatorFactory[]{ new LongSumAggregatorFactory("B", "B"), new LongSumAggregatorFactory("A", "A"), @@ -1770,6 +2043,7 @@ public class IndexMergerTest .withQueryGranularity(QueryGranularities.NONE) .withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null)) .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .withRollup(true) .build(); return new OnheapIncrementalIndex(schema, true, 1000); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java index 9a96f669d31..f4a3186f69d 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java @@ -489,6 +489,7 @@ public class IndexMergerV9WithSpatialIndexTest INDEX_IO.loadIndex(secondFile), INDEX_IO.loadIndex(thirdFile) ), + true, METRIC_AGGS, mergedFile, indexSpec diff --git a/processing/src/test/java/io/druid/segment/MetadataTest.java b/processing/src/test/java/io/druid/segment/MetadataTest.java index b75269fa900..64398cc3fd6 100644 --- a/processing/src/test/java/io/druid/segment/MetadataTest.java +++ b/processing/src/test/java/io/druid/segment/MetadataTest.java @@ -52,6 +52,7 @@ public class MetadataTest }; metadata.setAggregators(aggregators); metadata.setQueryGranularity(QueryGranularities.ALL); + metadata.setRollup(Boolean.FALSE); Metadata other = jsonMapper.readValue( jsonMapper.writeValueAsString(metadata), @@ -81,12 +82,14 @@ public class MetadataTest m1.setAggregators(aggs); m1.setTimestampSpec(new TimestampSpec("ds", "auto", null)); m1.setQueryGranularity(QueryGranularities.ALL); + m1.setRollup(Boolean.FALSE); Metadata m2 = new Metadata(); m2.put("k", "v"); m2.setAggregators(aggs); m2.setTimestampSpec(new TimestampSpec("ds", "auto", null)); m2.setQueryGranularity(QueryGranularities.ALL); + m2.setRollup(Boolean.FALSE); Metadata merged = new Metadata(); merged.put("k", "v"); @@ -96,6 +99,7 @@ public class MetadataTest } ); merged.setTimestampSpec(new TimestampSpec("ds", "auto", null)); + merged.setRollup(Boolean.FALSE); merged.setQueryGranularity(QueryGranularities.ALL); Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null)); @@ -108,6 +112,7 @@ public class MetadataTest merged.setAggregators(null); merged.setTimestampSpec(null); merged.setQueryGranularity(null); + merged.setRollup(null); Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null)); //merge check with client explicitly providing merged aggregators @@ -123,6 +128,7 @@ public class MetadataTest merged.setTimestampSpec(new TimestampSpec("ds", "auto", null)); merged.setQueryGranularity(QueryGranularities.ALL); + m1.setRollup(Boolean.TRUE); Assert.assertEquals( merged, Metadata.merge(ImmutableList.of(m1, m2), explicitAggs) diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index 4c79bb3dd18..ca22a124661 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -197,6 +197,7 @@ public class SchemalessIndex mergedIndex = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(INDEX_IO.loadIndex(topFile), INDEX_IO.loadIndex(bottomFile)), + true, METRIC_AGGS, mergedFile, indexSpec @@ -242,6 +243,7 @@ public class SchemalessIndex QueryableIndex index = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), + true, METRIC_AGGS, mergedFile, indexSpec @@ -280,7 +282,7 @@ public class SchemalessIndex } QueryableIndex index = INDEX_IO.loadIndex( - INDEX_MERGER.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile, indexSpec) + INDEX_MERGER.mergeQueryableIndex(indexesToMerge, true, METRIC_AGGS, mergedFile, indexSpec) ); return index; @@ -533,6 +535,7 @@ public class SchemalessIndex } ) ), + true, METRIC_AGGS, mergedFile, indexSpec diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index f21d88ae3ea..0b3055bcd02 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -90,7 +90,9 @@ public class TestIndex } private static IncrementalIndex realtimeIndex = null; + private static IncrementalIndex noRollupRealtimeIndex = null; private static QueryableIndex mmappedIndex = null; + private static QueryableIndex noRollupMmappedIndex = null; private static QueryableIndex mergedRealtime = null; public static IncrementalIndex getIncrementalTestIndex() @@ -104,6 +106,17 @@ public class TestIndex return realtimeIndex = makeRealtimeIndex("druid.sample.tsv"); } + public static IncrementalIndex getNoRollupIncrementalTestIndex() + { + synchronized (log) { + if (noRollupRealtimeIndex != null) { + return noRollupRealtimeIndex; + } + } + + return noRollupRealtimeIndex = makeRealtimeIndex("druid.sample.tsv", false); + } + public static QueryableIndex getMMappedTestIndex() { synchronized (log) { @@ -118,6 +131,20 @@ public class TestIndex return mmappedIndex; } + public static QueryableIndex getNoRollupMMappedTestIndex() + { + synchronized (log) { + if (noRollupMmappedIndex != null) { + return noRollupMmappedIndex; + } + } + + IncrementalIndex incrementalIndex = getNoRollupIncrementalTestIndex(); + noRollupMmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex); + + return noRollupMmappedIndex; + } + public static QueryableIndex mergedRealtimeIndex() { synchronized (log) { @@ -149,6 +176,7 @@ public class TestIndex mergedRealtime = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(INDEX_IO.loadIndex(topFile), INDEX_IO.loadIndex(bottomFile)), + true, METRIC_AGGS, mergedFile, indexSpec @@ -164,6 +192,11 @@ public class TestIndex } public static IncrementalIndex makeRealtimeIndex(final String resourceFilename) + { + return makeRealtimeIndex(resourceFilename, true); + } + + public static IncrementalIndex makeRealtimeIndex(final String resourceFilename, boolean rollup) { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); if (resource == null) { @@ -171,16 +204,22 @@ public class TestIndex } log.info("Realtime loading index file[%s]", resource); CharSource stream = Resources.asByteSource(resource).asCharSource(Charsets.UTF_8); - return makeRealtimeIndex(stream); + return makeRealtimeIndex(stream, rollup); } public static IncrementalIndex makeRealtimeIndex(final CharSource source) + { + return makeRealtimeIndex(source, true); + } + + public static IncrementalIndex makeRealtimeIndex(final CharSource source, boolean rollup) { final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) .withTimestampSpec(new TimestampSpec("ds", "auto", null)) .withQueryGranularity(QueryGranularities.NONE) .withMetrics(METRIC_AGGS) + .withRollup(rollup) .build(); final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000); @@ -188,7 +227,11 @@ public class TestIndex return loadIncrementalIndex(retVal, source); } catch (Exception e) { - realtimeIndex = null; + if (rollup) { + realtimeIndex = null; + } else { + noRollupRealtimeIndex = null; + } throw Throwables.propagate(e); } } diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index f0a5fd2c722..7a897817a26 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -144,6 +144,38 @@ public class IncrementalIndexTest ); } } + }, + { + new IndexCreator() + { + @Override + public IncrementalIndex createIndex(AggregatorFactory[] factories) + { + return IncrementalIndexTest.createNoRollupIndex(factories); + } + } + }, + { + new IndexCreator() + { + @Override + public IncrementalIndex createIndex(AggregatorFactory[] factories) + { + return new OffheapIncrementalIndex( + 0L, QueryGranularities.NONE, false, factories, 1000000, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); + } + } } } @@ -171,6 +203,17 @@ public class IncrementalIndexTest ); } + public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories) + { + if (null == aggregatorFactories) { + aggregatorFactories = defaultAggregatorFactories; + } + + return new OnheapIncrementalIndex( + 0L, QueryGranularities.NONE, false, aggregatorFactories, 1000000 + ); + } + public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException { index.add( @@ -330,7 +373,8 @@ public class IncrementalIndexTest new LinkedList>() ); Result result = Iterables.getOnlyElement(results); - Assert.assertEquals(rows, result.getValue().getLongMetric("rows").intValue()); + boolean isRollup = index.isRollup(); + Assert.assertEquals(rows * (isRollup ? 1 : 2), result.getValue().getLongMetric("rows").intValue()); for (int i = 0; i < dimensionCount; ++i) { Assert.assertEquals( String.format("Failed long sum on dimension %d", i), @@ -545,8 +589,12 @@ public class IncrementalIndexTest runner.run(query, context), new LinkedList>() ); + boolean isRollup = index.isRollup(); for (Result result : results) { - Assert.assertEquals(elementsPerThread, result.getValue().getLongMetric("rows").intValue()); + Assert.assertEquals( + elementsPerThread * (isRollup ? 1 : concurrentThreads), + result.getValue().getLongMetric("rows").intValue() + ); for (int i = 0; i < dimensionCount; ++i) { Assert.assertEquals( String.format("Failed long sum on dimension %d", i), @@ -594,17 +642,18 @@ public class IncrementalIndexTest } Assert.assertTrue(latch.await(60, TimeUnit.SECONDS)); + boolean isRollup = index.isRollup(); Assert.assertEquals(dimensionCount, index.getDimensionNames().size()); - Assert.assertEquals(elementsPerThread, index.size()); + Assert.assertEquals(elementsPerThread * (isRollup ? 1 : threadCount), index.size()); Iterator iterator = index.iterator(); int curr = 0; while (iterator.hasNext()) { Row row = iterator.next(); - Assert.assertEquals(timestamp + curr, row.getTimestampFromEpoch()); - Assert.assertEquals(Float.valueOf(threadCount), (Float) row.getFloatMetric("count")); + Assert.assertEquals(timestamp + (isRollup ? curr : curr / threadCount), row.getTimestampFromEpoch()); + Assert.assertEquals(Float.valueOf(isRollup ? threadCount : 1), (Float) row.getFloatMetric("count")); curr++; } - Assert.assertEquals(elementsPerThread, curr); + Assert.assertEquals(elementsPerThread * (isRollup ? 1 : threadCount), curr); } @Test diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index b7738bab3fa..591056cecb8 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -439,6 +439,7 @@ public class SpatialFilterBonusTest INDEX_IO.loadIndex(secondFile), INDEX_IO.loadIndex(thirdFile) ), + true, METRIC_AGGS, mergedFile, indexSpec diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index 7d5a702abd5..3458559a100 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -493,6 +493,7 @@ public class SpatialFilterTest QueryableIndex mergedRealtime = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(INDEX_IO.loadIndex(firstFile), INDEX_IO.loadIndex(secondFile), INDEX_IO.loadIndex(thirdFile)), + true, METRIC_AGGS, mergedFile, indexSpec diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index a9b8252c33c..5ed5bff9372 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -92,6 +92,7 @@ public class IncrementalIndexTest .withQueryGranularity(QueryGranularities.MINUTE) .withDimensionsSpec(dimensions) .withMetrics(metrics) + .withRollup(true) .build(); final List constructors = Lists.newArrayList(); diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 471a43c68be..8fc9c0b9ac5 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -146,7 +146,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark ) throws IndexSizeExceededException { - final Integer priorIdex = getFacts().get(key); + final Integer priorIdex = getFacts().getPriorIndex(key); Aggregator[] aggs; @@ -169,7 +169,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && !getFacts().containsKey(key)) { + if (numEntries.get() >= maxRowCount && getFacts().getPriorIndex(key) == null) { throw new IndexSizeExceededException("Maximum number of rows reached"); } final Integer prev = getFacts().putIfAbsent(key, rowIndex); diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java index fb0b9abdb2c..d4a864c91f1 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java @@ -28,6 +28,7 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory; import org.junit.Assert; import org.junit.Test; +import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -79,8 +80,8 @@ public class OnheapIncrementalIndexTest public void run() { while (!Thread.interrupted()) { - for (int row : index.getFacts().values()) { - if (index.getMetricLongValue(row, 0) != 1) { + for (Map.Entry row : index.getFacts().entrySet()) { + if (index.getMetricLongValue(row.getValue(), 0) != 1) { checkFailedCount.addAndGet(1); } } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index 667ebe355b5..e7294b1bcee 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -41,14 +41,17 @@ public class ArbitraryGranularitySpec implements GranularitySpec { private final TreeSet intervals; private final QueryGranularity queryGranularity; + private final Boolean rollup; @JsonCreator public ArbitraryGranularitySpec( @JsonProperty("queryGranularity") QueryGranularity queryGranularity, + @JsonProperty("rollup") Boolean rollup, @JsonProperty("intervals") List inputIntervals ) { this.queryGranularity = queryGranularity; + this.rollup = rollup == null ? Boolean.TRUE : rollup; this.intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); if (inputIntervals == null) { @@ -80,6 +83,14 @@ public class ArbitraryGranularitySpec implements GranularitySpec } } + public ArbitraryGranularitySpec( + QueryGranularity queryGranularity, + List inputIntervals + ) + { + this(queryGranularity, true, inputIntervals); + } + @Override @JsonProperty("intervals") public Optional> bucketIntervals() @@ -106,6 +117,13 @@ public class ArbitraryGranularitySpec implements GranularitySpec throw new UnsupportedOperationException(); } + @Override + @JsonProperty("rollup") + public boolean isRollup() + { + return rollup; + } + @Override @JsonProperty("queryGranularity") public QueryGranularity getQueryGranularity() @@ -128,6 +146,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec if (!intervals.equals(that.intervals)) { return false; } + if (!rollup.equals(that.rollup)) { + return false; + } return !(queryGranularity != null ? !queryGranularity.equals(that.queryGranularity) : that.queryGranularity != null); @@ -138,6 +159,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec public int hashCode() { int result = intervals.hashCode(); + result = 31 * result + rollup.hashCode(); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); return result; } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java index 703ccf0984e..e7e0c09eedb 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java @@ -57,6 +57,8 @@ public interface GranularitySpec public Granularity getSegmentGranularity(); + public boolean isRollup(); + public QueryGranularity getQueryGranularity(); } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java index 9af0b2f7568..2b5366a104f 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -26,8 +26,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.Granularity; -import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularities; +import io.druid.granularity.QueryGranularity; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -41,6 +41,7 @@ public class UniformGranularitySpec implements GranularitySpec private final Granularity segmentGranularity; private final QueryGranularity queryGranularity; + private final Boolean rollup; private final List inputIntervals; private final ArbitraryGranularitySpec wrappedSpec; @@ -48,12 +49,14 @@ public class UniformGranularitySpec implements GranularitySpec public UniformGranularitySpec( @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("queryGranularity") QueryGranularity queryGranularity, + @JsonProperty("rollup") Boolean rollup, @JsonProperty("intervals") List inputIntervals ) { this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity; this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity; + this.rollup = rollup == null ? Boolean.TRUE : rollup; if (inputIntervals != null) { List granularIntervals = Lists.newArrayList(); @@ -61,13 +64,22 @@ public class UniformGranularitySpec implements GranularitySpec Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval)); } this.inputIntervals = ImmutableList.copyOf(inputIntervals); - this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, granularIntervals); + this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals); } else { this.inputIntervals = null; this.wrappedSpec = null; } } + public UniformGranularitySpec( + Granularity segmentGranularity, + QueryGranularity queryGranularity, + List inputIntervals + ) + { + this(segmentGranularity, queryGranularity, true, inputIntervals); + } + @Override public Optional> bucketIntervals() { @@ -91,6 +103,13 @@ public class UniformGranularitySpec implements GranularitySpec return segmentGranularity; } + @Override + @JsonProperty("rollup") + public boolean isRollup() + { + return rollup; + } + @Override @JsonProperty("queryGranularity") public QueryGranularity getQueryGranularity() @@ -122,6 +141,9 @@ public class UniformGranularitySpec implements GranularitySpec if (!queryGranularity.equals(that.queryGranularity)) { return false; } + if (!rollup.equals(that.rollup)) { + return false; + } if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) { return false; } @@ -134,6 +156,7 @@ public class UniformGranularitySpec implements GranularitySpec { int result = segmentGranularity.hashCode(); result = 31 * result + queryGranularity.hashCode(); + result = 31 * result + rollup.hashCode(); result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0); result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0); return result; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 3ab523e266c..f2588a9a2e0 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -560,6 +560,7 @@ public class AppenderatorImpl implements Appenderator final File mergedFile; mergedFile = indexMerger.mergeQueryableIndex( indexes, + schema.getGranularitySpec().isRollup(), schema.getAggregators(), mergedTarget, tuningConfig.getIndexSpec() diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 81b13145b2d..bb800f1e294 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -412,6 +412,7 @@ public class RealtimePlumber implements Plumber final File mergedFile = indexMerger.mergeQueryableIndex( indexes, + schema.getGranularitySpec().isRollup(), schema.getAggregators(), mergedTarget, config.getIndexSpec() diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index ea0141569d9..e34ea81fa6c 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -248,6 +248,7 @@ public class Sink implements Iterable .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) .withDimensionsSpec(schema.getParser()) .withMetrics(schema.getAggregators()) + .withRollup(schema.getGranularitySpec().isRollup()) .build(); final IncrementalIndex newIndex = new OnheapIncrementalIndex(indexSchema, reportParseExceptions, maxRowsInMemory); diff --git a/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java b/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java index 8b6ca8ba8ba..cdb02c71bcc 100644 --- a/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java +++ b/server/src/test/java/io/druid/segment/indexing/granularity/ArbitraryGranularityTest.java @@ -49,6 +49,8 @@ public class ArbitraryGranularityTest new Interval("2012-01-01T00Z/2012-01-03T00Z") )); + Assert.assertTrue(spec.isRollup()); + Assert.assertEquals( Lists.newArrayList( new Interval("2012-01-01T00Z/2012-01-03T00Z"), @@ -122,6 +124,21 @@ public class ArbitraryGranularityTest Assert.assertTrue("Exception thrown", thrown); } + @Test + public void testRollupSetting() + { + List intervals = Lists.newArrayList( + new Interval("2012-01-08T00Z/2012-01-11T00Z"), + new Interval("2012-02-01T00Z/2012-03-01T00Z"), + new Interval("2012-01-07T00Z/2012-01-08T00Z"), + new Interval("2012-01-03T00Z/2012-01-04T00Z"), + new Interval("2012-01-01T00Z/2012-01-03T00Z") + ); + final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularities.NONE, false, intervals); + + Assert.assertFalse(spec.isRollup()); + } + @Test public void testOverlapViolationSameStartInstant() { diff --git a/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java b/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java index 4d7dd46a35d..c0f616c06e9 100644 --- a/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java +++ b/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java @@ -31,6 +31,8 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import java.util.List; + public class UniformGranularityTest { private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); @@ -49,6 +51,8 @@ public class UniformGranularityTest ) ); + Assert.assertTrue(spec.isRollup()); + Assert.assertEquals( Lists.newArrayList( new Interval("2012-01-01T00Z/P1D"), @@ -93,6 +97,20 @@ public class UniformGranularityTest ); } + @Test + public void testRollupSetting() + { + List intervals = Lists.newArrayList( + new Interval("2012-01-08T00Z/2012-01-11T00Z"), + new Interval("2012-01-07T00Z/2012-01-08T00Z"), + new Interval("2012-01-03T00Z/2012-01-04T00Z"), + new Interval("2012-01-01T00Z/2012-01-03T00Z") + ); + final GranularitySpec spec = new UniformGranularitySpec(Granularity.DAY, QueryGranularities.NONE, false, intervals); + + Assert.assertFalse(spec.isRollup()); + } + @Test public void testJson() {