ability to not rollup at index time, make pre aggregation an option (#3020)

* ability to not rollup at index time, make pre aggregation an option

* rename getRowIndexForRollup to getPriorIndex

* fix doc misspelling

* test query using no-rollup indexes

* fix benchmark fail due to jmh bug
This commit is contained in:
kaijianding 2016-08-03 02:13:05 +08:00 committed by Fangjin Yang
parent 0bdaaa224b
commit 50d52a24fc
50 changed files with 1261 additions and 168 deletions

View File

@ -82,6 +82,9 @@ public class IncrementalIndexReadBenchmark
@Param({"basic"})
private String schema;
@Param({"true", "false"})
private boolean rollup;
private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class);
private static final int RNG_SEED = 9999;
private IncrementalIndex incIndex;
@ -125,6 +128,7 @@ public class IncrementalIndexReadBenchmark
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,

View File

@ -63,6 +63,9 @@ public class IndexIngestionBenchmark
@Param({"basic"})
private String schema;
@Param({"true", "false"})
private boolean rollup;
private static final Logger log = new Logger(IndexIngestionBenchmark.class);
private static final int RNG_SEED = 9999;
@ -107,11 +110,12 @@ public class IndexIngestionBenchmark
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
true,
rowsPerSegment
rowsPerSegment * 2
);
}

View File

@ -75,6 +75,9 @@ public class IndexMergeBenchmark
@Param({"basic"})
private String schema;
@Param({"true", "false"})
private boolean rollup;
private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMerger INDEX_MERGER;
@ -155,6 +158,7 @@ public class IndexMergeBenchmark
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
@ -174,7 +178,7 @@ public class IndexMergeBenchmark
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();
File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
blackhole.consume(mergedFile);
@ -192,7 +196,7 @@ public class IndexMergeBenchmark
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();
File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
blackhole.consume(mergedFile);

View File

@ -72,6 +72,9 @@ public class IndexPersistBenchmark
@Param({"basic"})
private String schema;
@Param({"true", "false"})
private boolean rollup;
private static final Logger log = new Logger(IndexPersistBenchmark.class);
private static final int RNG_SEED = 9999;
@ -156,6 +159,7 @@ public class IndexPersistBenchmark
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,

View File

@ -186,6 +186,7 @@ This spec is used to generated segments with uniform intervals.
| type | string | The type of granularity spec. | no (default == 'uniform') |
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
### Arbitrary Granularity Spec
@ -196,6 +197,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
|-------|------|-------------|----------|
| type | string | The type of granularity spec. | no (default == 'uniform') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
# IO Config

View File

@ -159,7 +159,10 @@ Append tasks append a list of segments together into a single segment (one after
### Merge Task
Merge tasks merge a list of segments together. Any common timestamps are merged. The grammar is:
Merge tasks merge a list of segments together. Any common timestamps are merged.
If rollup is disabled as part of ingestion, common timestamps are not merged and rows are reordered by their timestamp.
The grammar is:
```json
{
@ -167,6 +170,7 @@ Merge tasks merge a list of segments together. Any common timestamps are merged.
"id": <task_id>,
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"segments": <JSON list of DataSegment objects to merge>
}
```

View File

@ -11,6 +11,7 @@ Segment metadata queries return per-segment information about:
* Interval the segment covers
* Column type of all the columns in the segment
* Estimated total segment byte size in if it was stored in a flat format
* Is the segment rolled up
* Segment id
```json
@ -143,6 +144,11 @@ null if the aggregators are unknown or unmergeable (if merging is enabled).
* The form of the result is a map of column name to aggregator.
#### rollup
* `rollup` in the result is true/false/null.
* When merging is enabled, if some are rollup, others are not, result is null.
### lenientAggregatorMerge
Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if

View File

@ -142,6 +142,7 @@ public class DetermineHashedPartitionsJob implements Jobby
new UniformGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
intervals
)
);

View File

@ -226,6 +226,7 @@ public class IndexGeneratorJob implements Jobby
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.build();
OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
@ -514,13 +515,14 @@ public class IndexGeneratorJob implements Jobby
ProgressIndicator progressIndicator
) throws IOException
{
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
}
}

View File

@ -112,6 +112,7 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
new UniformGranularitySpec(
segmentGranularity,
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
Lists.newArrayList(bucketsToRun)
)
);

View File

@ -188,7 +188,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
theIndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
theIndexMerger.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec());
}
// Map merged segment so we can extract dimensions

View File

@ -44,6 +44,7 @@ public class MergeTask extends MergeTaskBase
{
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final Boolean rollup;
private final IndexSpec indexSpec;
@JsonCreator
@ -52,12 +53,14 @@ public class MergeTask extends MergeTaskBase
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, dataSource, segments, context);
this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}
@ -82,6 +85,7 @@ public class MergeTask extends MergeTaskBase
}
}
),
rollup,
aggregators.toArray(new AggregatorFactory[aggregators.size()]),
outDir,
indexSpec

View File

@ -176,6 +176,7 @@ public class TaskSerdeTest
"foo",
segments,
aggregators,
true,
indexSpec,
null
);

View File

@ -355,6 +355,14 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
mergedId = "merged";
}
final Boolean rollup;
if (arg1.isRollup() != null && arg2.isRollup() != null && arg1.isRollup().equals(arg2.isRollup())) {
rollup = arg1.isRollup();
} else {
rollup = null;
}
return new SegmentAnalysis(
mergedId,
newIntervals,
@ -363,7 +371,8 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators,
timestampSpec,
queryGranularity
queryGranularity,
rollup
);
}
@ -378,7 +387,8 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
analysis.getNumRows(),
analysis.getAggregators(),
analysis.getTimestampSpec(),
analysis.getQueryGranularity()
analysis.getQueryGranularity(),
analysis.isRollup()
);
}
}

View File

@ -148,6 +148,19 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
queryGranularity = null;
}
Boolean rollup = null;
if (query.hasRollup()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
rollup = metadata != null ? metadata.isRollup() : null;
if (rollup == null) {
// in this case, this segment is built before no-rollup function is coded,
// thus it is built with rollup
rollup = Boolean.TRUE;
}
}
return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
@ -158,7 +171,8 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
numRows,
aggregators,
timestampSpec,
queryGranularity
queryGranularity,
rollup
)
)
);

View File

@ -40,6 +40,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
private final Map<String, AggregatorFactory> aggregators;
private final TimestampSpec timestampSpec;
private final QueryGranularity queryGranularity;
private final Boolean rollup;
@JsonCreator
public SegmentAnalysis(
@ -50,7 +51,8 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
@JsonProperty("numRows") long numRows,
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators,
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("rollup") Boolean rollup
)
{
this.id = id;
@ -61,6 +63,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
this.aggregators = aggregators;
this.timestampSpec = timestampSpec;
this.queryGranularity = queryGranularity;
this.rollup = rollup;
}
@JsonProperty
@ -105,6 +108,12 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
return queryGranularity;
}
@JsonProperty
public Boolean isRollup()
{
return rollup;
}
@JsonProperty
public Map<String, AggregatorFactory> getAggregators()
{
@ -123,6 +132,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
", aggregators=" + aggregators +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
}
@ -141,6 +151,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
SegmentAnalysis that = (SegmentAnalysis) o;
return size == that.size &&
numRows == that.numRows &&
rollup == that.rollup &&
Objects.equals(id, that.id) &&
Objects.equals(interval, that.interval) &&
Objects.equals(columns, that.columns) &&
@ -156,7 +167,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
@Override
public int hashCode()
{
return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity);
return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity, rollup);
}
@Override

View File

@ -58,7 +58,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
AGGREGATORS,
MINMAX,
TIMESTAMPSPEC,
QUERYGRANULARITY;
QUERYGRANULARITY,
ROLLUP;
@JsonValue
@Override
@ -199,6 +200,11 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
return analysisTypes.contains(AnalysisType.QUERYGRANULARITY);
}
public boolean hasRollup()
{
return analysisTypes.contains(AnalysisType.ROLLUP);
}
public boolean hasMinMax()
{
return analysisTypes.contains(AnalysisType.MINMAX);

View File

@ -38,6 +38,7 @@ import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
@ -200,6 +201,11 @@ public class IndexMerger
indexSpec.getBitmapSerdeFactory().getBitmapFactory()
)
),
// if index is not rolled up, then it should be not rollup here
// if index is rolled up, then it is no need to rollup again.
// In this case, true/false won't cause reOrdering in merge stage
// while merging a single iterable
false,
index.getMetricAggs(),
outDir,
indexSpec,
@ -209,16 +215,18 @@ public class IndexMerger
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec
) throws IOException
{
return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
return mergeQueryableIndex(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
@ -243,6 +251,7 @@ public class IndexMerger
);
return merge(
indexAdapteres,
rollup,
metricAggs,
outDir,
indexSpec,
@ -252,12 +261,13 @@ public class IndexMerger
public File merge(
List<IndexableAdapter> indexes,
boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec
) throws IOException
{
return merge(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
return merge(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
private static List<String> getLexicographicMergedDimensions(List<IndexableAdapter> indexes)
@ -328,6 +338,7 @@ public class IndexMerger
public File merge(
List<IndexableAdapter> indexes,
final boolean rollup,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
@ -409,14 +420,28 @@ public class IndexMerger
@Nullable ArrayList<Iterable<Rowboat>> boats
)
{
return CombiningIterable.create(
new MergeIterable<Rowboat>(
Ordering.<Rowboat>natural().nullsFirst(),
boats
),
Ordering.<Rowboat>natural().nullsFirst(),
new RowboatMergeFunction(sortedMetricAggs)
);
if (rollup) {
return CombiningIterable.create(
new MergeIterable<Rowboat>(
Ordering.<Rowboat>natural().nullsFirst(),
boats
),
Ordering.<Rowboat>natural().nullsFirst(),
new RowboatMergeFunction(sortedMetricAggs)
);
} else {
return new MergeIterable<Rowboat>(
new Ordering<Rowboat>()
{
@Override
public int compare(Rowboat left, Rowboat right)
{
return Longs.compare(left.getTimestamp(), right.getTimestamp());
}
}.nullsFirst(),
boats
);
}
}
};

View File

@ -49,6 +49,9 @@ public class Metadata
@JsonProperty
private QueryGranularity queryGranularity;
@JsonProperty
private Boolean rollup;
public Metadata()
{
container = new ConcurrentHashMap<>();
@ -87,6 +90,17 @@ public class Metadata
return this;
}
public Boolean isRollup()
{
return rollup;
}
public Metadata setRollup(Boolean rollup)
{
this.rollup = rollup;
return this;
}
public Metadata putAll(Map<String, Object> other)
{
if (other != null) {
@ -128,6 +142,7 @@ public class Metadata
List<TimestampSpec> timestampSpecsToMerge = new ArrayList<>();
List<QueryGranularity> gransToMerge = new ArrayList<>();
List<Boolean> rollupToMerge = new ArrayList<>();
for (Metadata metadata : toBeMerged) {
if (metadata != null) {
@ -143,6 +158,10 @@ public class Metadata
if (gransToMerge != null) {
gransToMerge.add(metadata.getQueryGranularity());
}
if (rollupToMerge != null) {
rollupToMerge.add(metadata.isRollup());
}
mergedContainer.putAll(metadata.container);
} else {
//if metadata and hence aggregators and queryGranularity for some segment being merged are unknown then
@ -150,6 +169,7 @@ public class Metadata
aggregatorsToMerge = null;
timestampSpecsToMerge = null;
gransToMerge = null;
rollupToMerge = null;
}
}
@ -172,6 +192,23 @@ public class Metadata
result.setQueryGranularity(QueryGranularity.mergeQueryGranularities(gransToMerge));
}
Boolean rollup = null;
if (rollupToMerge != null && !rollupToMerge.isEmpty()) {
rollup = rollupToMerge.get(0);
for (Boolean r : rollupToMerge) {
if (r == null) {
rollup = null;
break;
} else if (!r.equals(rollup)) {
rollup = null;
break;
} else {
rollup = r;
}
}
}
result.setRollup(rollup);
result.container.putAll(mergedContainer);
return result;
@ -199,6 +236,9 @@ public class Metadata
if (timestampSpec != null ? !timestampSpec.equals(metadata.timestampSpec) : metadata.timestampSpec != null) {
return false;
}
if (rollup != null ? !rollup.equals(metadata.rollup) : metadata.rollup != null) {
return false;
}
return queryGranularity != null
? queryGranularity.equals(metadata.queryGranularity)
: metadata.queryGranularity == null;
@ -212,6 +252,7 @@ public class Metadata
result = 31 * result + Arrays.hashCode(aggregators);
result = 31 * result + (timestampSpec != null ? timestampSpec.hashCode() : 0);
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
result = 31 * result + (rollup != null ? rollup.hashCode() : 0);
return result;
}
@ -223,6 +264,7 @@ public class Metadata
", aggregators=" + Arrays.toString(aggregators) +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -70,11 +71,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@ -353,12 +358,12 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private final long minTimestamp;
private final QueryGranularity gran;
private final boolean rollup;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final boolean sortFacts;
private final Metadata metadata;
private final Map<String, MetricDesc> metricDescs;
@ -396,22 +401,22 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean reportParseExceptions,
final boolean sortFacts
final boolean reportParseExceptions
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.rollup = incrementalIndexSchema.isRollup();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.reportParseExceptions = reportParseExceptions;
this.sortFacts = sortFacts;
this.metadata = new Metadata()
.setAggregators(getCombiningAggregators(metrics))
.setTimestampSpec(incrementalIndexSchema.getTimestampSpec())
.setQueryGranularity(this.gran);
.setQueryGranularity(this.gran)
.setRollup(this.rollup);
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
this.columnCapabilities = Maps.newHashMap();
@ -452,7 +457,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
}
private DimDim newDimDim(String dimension, ValueType type) {
private DimDim newDimDim(String dimension, ValueType type)
{
DimDim newDimDim;
switch (type) {
case LONG:
@ -473,7 +479,12 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
// use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation
protected abstract DimDim makeDimDim(String dimension, Object lock);
public abstract ConcurrentMap<TimeAndDims, Integer> getFacts();
public boolean isRollup()
{
return rollup;
}
public abstract FactsHolder getFacts();
public abstract boolean canAppendRow();
@ -579,7 +590,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row) throws IndexSizeExceededException {
public int add(InputRow row) throws IndexSizeExceededException
{
TimeAndDims key = toTimeAndDims(row);
final int rv = addToFacts(
metrics,
@ -694,20 +706,12 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private long getMinTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).firstKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
return getFacts().getMinTimeMillis();
}
private long getMaxTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).lastKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
return getFacts().getMaxTimeMillis();
}
private int[] getDimVals(final DimDim dimLookup, final List<Comparable> dimValues)
@ -858,15 +862,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return columnCapabilities.get(column);
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).subMap(start, end);
} else {
throw new UnsupportedOperationException("can't get subMap from unsorted facts data.");
}
}
public Metadata getMetadata()
{
return metadata;
@ -896,15 +891,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
{
final List<DimensionDesc> dimensions = getDimensions();
Map<TimeAndDims, Integer> facts = null;
if (descending && sortFacts) {
facts = ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).descendingMap();
} else {
facts = getFacts();
}
return Iterators.transform(
facts.entrySet().iterator(),
getFacts().iterator(descending),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
@ -1320,4 +1308,313 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return retVal;
}
}
public static class FactsEntry implements Map.Entry<TimeAndDims, Integer>
{
TimeAndDims key = null;
Integer value = null;
public FactsEntry(TimeAndDims key, Integer value)
{
this.key = key;
this.value = value;
}
public TimeAndDims getKey()
{
return key;
}
public Integer getValue()
{
return value;
}
@Override
public Integer setValue(Integer value)
{
return value;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FactsEntry that = (FactsEntry) o;
if (key != null ? !key.equals(that.key) : that.key != null) {
return false;
}
return value != null ? value.equals(that.value) : that.value == null;
}
@Override
public int hashCode()
{
int result = key != null ? key.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
}
interface FactsHolder
{
/**
* @return the previous value associated with the specified key, or
* {@code null} if there was no mapping for the key.
*/
Integer getPriorIndex(TimeAndDims key);
long getMinTimeMillis();
long getMaxTimeMillis();
Iterable<Map.Entry<TimeAndDims, Integer>> entrySet();
Iterator<Map.Entry<TimeAndDims, Integer>> iterator(boolean descending);
Iterable<Map.Entry<TimeAndDims, Integer>> timeRangeIterable(boolean descending, long timeStart, long timeEnd);
Iterable<TimeAndDims> keySet();
/**
* @return the previous value associated with the specified key, or
* {@code null} if there was no mapping for the key.
*/
Integer putIfAbsent(TimeAndDims key, Integer rowIndex);
void clear();
}
static class RollupFactsHolder implements FactsHolder
{
private final boolean sortFacts;
private final ConcurrentMap<TimeAndDims, Integer> facts;
public RollupFactsHolder(boolean sortFacts, Comparator<TimeAndDims> timeAndDimsComparator)
{
this.sortFacts = sortFacts;
if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(timeAndDimsComparator);
} else {
this.facts = new ConcurrentHashMap<>();
}
}
@Override
public Integer getPriorIndex(TimeAndDims key)
{
return facts.get(key);
}
@Override
public long getMinTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).firstKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
}
@Override
public long getMaxTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).lastKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
}
public Iterable<Map.Entry<TimeAndDims, Integer>> entrySet()
{
return facts.entrySet();
}
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> iterator(boolean descending)
{
if (descending && sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).descendingMap().entrySet().iterator();
}
return entrySet().iterator();
}
@Override
public Iterable<Map.Entry<TimeAndDims, Integer>> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{
if (!sortFacts) {
throw new UnsupportedOperationException("can't get timeRange from unsorted facts data.");
}
TimeAndDims start = new TimeAndDims(timeStart, new int[][]{});
TimeAndDims end = new TimeAndDims(timeEnd, new int[][]{});
ConcurrentNavigableMap<TimeAndDims, Integer> subMap =
((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).subMap(start, end);
final Map<TimeAndDims, Integer> rangeMap = descending ? subMap.descendingMap() : subMap;
return rangeMap.entrySet();
}
@Override
public Iterable<TimeAndDims> keySet()
{
return facts.keySet();
}
@Override
public Integer putIfAbsent(TimeAndDims key, Integer rowIndex)
{
return facts.putIfAbsent(key, rowIndex);
}
@Override
public void clear()
{
facts.clear();
}
}
static class PlainFactsHolder implements FactsHolder
{
private final boolean sortFacts;
private final ConcurrentMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>> facts;
public PlainFactsHolder(boolean sortFacts)
{
this.sortFacts = sortFacts;
if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(new Comparator<Long>()
{
@Override
public int compare(Long lhs, Long rhs)
{
return Longs.compare(lhs, rhs);
}
});
} else {
this.facts = new ConcurrentHashMap<>();
}
}
@Override
public Integer getPriorIndex(TimeAndDims key)
{
// always return null to indicate that no prior key cause we always add new row
return null;
}
@Override
public long getMinTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts).firstKey();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
}
@Override
public long getMaxTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts).lastKey();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
}
public Iterable<Map.Entry<TimeAndDims, Integer>> entrySet()
{
return concat(facts.values(), false);
}
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> iterator(boolean descending)
{
if (descending && sortFacts) {
return concat(((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts)
.descendingMap().values(), true).iterator();
}
return concat(facts.values(), false).iterator();
}
@Override
public Iterable<Map.Entry<TimeAndDims, Integer>> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{
ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>> subMap =
((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts).subMap(timeStart, timeEnd);
final Map<Long, Deque<Map.Entry<TimeAndDims, Integer>>> rangeMap = descending ? subMap.descendingMap() : subMap;
return concat(rangeMap.values(), descending);
}
private Iterable<Map.Entry<TimeAndDims, Integer>> concat(
final Iterable<Deque<Map.Entry<TimeAndDims, Integer>>> iterable,
final boolean descending
)
{
return new Iterable<Map.Entry<TimeAndDims, Integer>>()
{
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> iterator()
{
return Iterators.concat(
Iterators.transform(
iterable.iterator(),
new Function<Deque<Map.Entry<TimeAndDims, Integer>>, Iterator<Map.Entry<TimeAndDims, Integer>>>()
{
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> apply(Deque<Map.Entry<TimeAndDims, Integer>> input)
{
return descending ? input.descendingIterator() : input.iterator();
}
}
)
);
}
};
}
@Override
public Iterable<TimeAndDims> keySet()
{
return Iterables.transform(
entrySet(),
new Function<Map.Entry<TimeAndDims, Integer>, TimeAndDims>()
{
@Override
public TimeAndDims apply(Map.Entry<TimeAndDims, Integer> input)
{
return input.getKey();
}
}
);
}
@Override
public Integer putIfAbsent(TimeAndDims key, Integer rowIndex)
{
Long time = key.getTimestamp();
Deque<Map.Entry<TimeAndDims, Integer>> rows = facts.get(time);
if (rows == null) {
facts.putIfAbsent(time, new ConcurrentLinkedDeque<Map.Entry<TimeAndDims, Integer>>());
// in race condition, rows may be put by other thread, so always get latest status from facts
rows = facts.get(time);
}
rows.add(new FactsEntry(key, rowIndex));
// always return null to indicate that we always add new row
return null;
}
@Override
public void clear()
{
facts.clear();
}
}
}

View File

@ -30,18 +30,21 @@ import io.druid.query.aggregation.AggregatorFactory;
*/
public class IncrementalIndexSchema
{
public static final boolean DEFAULT_ROLLUP = true;
private final long minTimestamp;
private final TimestampSpec timestampSpec;
private final QueryGranularity gran;
private final DimensionsSpec dimensionsSpec;
private final AggregatorFactory[] metrics;
private final boolean rollup;
public IncrementalIndexSchema(
long minTimestamp,
TimestampSpec timestampSpec,
QueryGranularity gran,
DimensionsSpec dimensionsSpec,
AggregatorFactory[] metrics
AggregatorFactory[] metrics,
boolean rollup
)
{
this.minTimestamp = minTimestamp;
@ -49,6 +52,7 @@ public class IncrementalIndexSchema
this.gran = gran;
this.dimensionsSpec = dimensionsSpec;
this.metrics = metrics;
this.rollup = rollup;
}
public long getMinTimestamp()
@ -76,6 +80,11 @@ public class IncrementalIndexSchema
return metrics;
}
public boolean isRollup()
{
return rollup;
}
public static class Builder
{
private long minTimestamp;
@ -83,6 +92,7 @@ public class IncrementalIndexSchema
private QueryGranularity gran;
private DimensionsSpec dimensionsSpec;
private AggregatorFactory[] metrics;
private boolean rollup;
public Builder()
{
@ -90,6 +100,7 @@ public class IncrementalIndexSchema
this.gran = QueryGranularities.NONE;
this.dimensionsSpec = new DimensionsSpec(null, null, null);
this.metrics = new AggregatorFactory[]{};
this.rollup = true;
}
public Builder withMinTimestamp(long minTimestamp)
@ -147,10 +158,16 @@ public class IncrementalIndexSchema
return this;
}
public Builder withRollup(boolean rollup)
{
this.rollup = rollup;
return this;
}
public IncrementalIndexSchema build()
{
return new IncrementalIndexSchema(
minTimestamp, timestampSpec, gran, dimensionsSpec, metrics
minTimestamp, timestampSpec, gran, dimensionsSpec, metrics, rollup
);
}
}

View File

@ -69,7 +69,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
/**
*/
@ -228,22 +227,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this, currEntry);
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter;
private ConcurrentNavigableMap<IncrementalIndex.TimeAndDims, Integer> cursorMap;
private Iterable<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> cursorIterable;
private boolean emptyRange;
final DateTime time;
int numAdvanced = -1;
boolean done;
{
cursorMap = index.getSubMap(
new IncrementalIndex.TimeAndDims(timeStart, new int[][]{}),
new IncrementalIndex.TimeAndDims(
Math.min(actualInterval.getEndMillis(), gran.next(input)),
new int[][]{}
)
cursorIterable = index.getFacts().timeRangeIterable(
descending,
timeStart,
Math.min(actualInterval.getEndMillis(), gran.next(input))
);
if (descending) {
cursorMap = cursorMap.descendingMap();
}
emptyRange = !cursorIterable.iterator().hasNext();
time = gran.toDateTime(input);
reset();
@ -299,7 +295,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public void reset()
{
baseIter = cursorMap.entrySet().iterator();
baseIter = cursorIterable.iterator();
if (numAdvanced == -1) {
numAdvanced = 0;
@ -322,7 +318,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
numAdvanced++;
}
done = !foundMatched && (cursorMap.size() == 0 || !baseIter.hasNext());
done = !foundMatched && (emptyRange || !baseIter.hasNext());
}
@Override

View File

@ -20,7 +20,6 @@
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
@ -34,14 +33,10 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -55,7 +50,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private final List<ResourceHolder<ByteBuffer>> aggBuffers = new ArrayList<>();
private final List<int[]> indexAndOffsets = new ArrayList<>();
private final ConcurrentMap<TimeAndDims, Integer> facts;
private final FactsHolder facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
@ -80,15 +75,12 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator())
: new PlainFactsHolder(sortFacts);
//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();
@ -114,6 +106,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP)
.build(),
deserializeComplexMetrics,
reportParseExceptions,
@ -123,6 +116,29 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
);
}
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
boolean rollup,
final AggregatorFactory[] metrics,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.withRollup(rollup)
.build(),
true,
true,
true,
maxRowCount,
bufferPool
);
}
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
@ -132,20 +148,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true,
true,
true,
minTimestamp,
gran,
IncrementalIndexSchema.DEFAULT_ROLLUP,
metrics,
maxRowCount,
bufferPool
);
}
@Override
public ConcurrentMap<TimeAndDims, Integer> getFacts()
public FactsHolder getFacts()
{
return facts;
}
@ -207,7 +220,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
int bufferOffset;
synchronized (this) {
final Integer priorIndex = facts.get(key);
final Integer priorIndex = facts.getPriorIndex(key);
if (null != priorIndex) {
final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
bufferIndex = indexAndOffset[0];
@ -254,7 +267,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) {
if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == null) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}

View File

@ -40,7 +40,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -50,7 +49,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private static final Logger log = new Logger(OnheapIncrementalIndex.class);
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final ConcurrentMap<TimeAndDims, Integer> facts;
private final FactsHolder facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
@ -65,14 +64,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
int maxRowCount
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
this.maxRowCount = maxRowCount;
if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator())
: new PlainFactsHolder(sortFacts);
}
public OnheapIncrementalIndex(
@ -89,6 +85,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP)
.build(),
deserializeComplexMetrics,
reportParseExceptions,
@ -97,6 +94,27 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
);
}
public OnheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
boolean rollup,
final AggregatorFactory[] metrics,
int maxRowCount
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.withRollup(rollup)
.build(),
true,
true,
true,
maxRowCount
);
}
public OnheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
@ -105,13 +123,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true,
true,
true,
minTimestamp,
gran,
IncrementalIndexSchema.DEFAULT_ROLLUP,
metrics,
maxRowCount
);
}
@ -126,7 +141,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
@Override
public ConcurrentMap<TimeAndDims, Integer> getFacts()
public FactsHolder getFacts()
{
return facts;
}
@ -165,7 +180,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException
{
final Integer priorIndex = facts.get(key);
final Integer priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
@ -181,7 +196,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
concurrentSet(rowIndex, aggs);
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) {
if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == null) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
final Integer prev = facts.putIfAbsent(key, rowIndex);

View File

@ -323,11 +323,15 @@ public class QueryRunnerTestHelper
throws IOException
{
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final IncrementalIndex noRollupRtIndex = TestIndex.getNoRollupIncrementalTestIndex();
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex noRollupMMappedTestIndex = TestIndex.getNoRollupMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
return ImmutableList.of(
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)),
makeQueryRunner(factory, new IncrementalIndexSegment(noRollupRtIndex, segmentId)),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, noRollupMMappedTestIndex)),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
);
}

View File

@ -28,21 +28,16 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
@ -56,16 +51,12 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.select.SelectQueryEngine;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.select.SelectQueryRunnerFactory;
import io.druid.segment.AbstractSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
@ -75,7 +66,6 @@ import io.druid.segment.Segment;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.junit.rules.TemporaryFolder;
@ -84,7 +74,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -334,7 +323,7 @@ public class AggregationTestHelper
for (File file : toMerge) {
indexes.add(indexIO.loadIndex(file));
}
indexMerger.mergeQueryableIndex(indexes, metrics, outDir, new IndexSpec());
indexMerger.mergeQueryableIndex(indexes, true, metrics, outDir, new IndexSpec());
for (QueryableIndex qi : indexes) {
qi.close();

View File

@ -87,6 +87,7 @@ public class SegmentMetadataQueryQueryToolChestTest
100,
null,
null,
null,
null
);
@ -117,6 +118,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"baz", new DoubleSumAggregatorFactory("baz", "baz")
),
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -130,6 +132,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
);
@ -162,6 +165,7 @@ public class SegmentMetadataQueryQueryToolChestTest
0,
null,
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -175,6 +179,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
);
@ -199,6 +204,7 @@ public class SegmentMetadataQueryQueryToolChestTest
0,
null,
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -209,6 +215,7 @@ public class SegmentMetadataQueryQueryToolChestTest
0,
null,
null,
null,
null
);
@ -230,6 +237,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"bar", new DoubleSumAggregatorFactory("bar", "bar")
),
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -244,6 +252,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"baz", new LongMaxAggregatorFactory("baz", "baz")
),
null,
null,
null
);
@ -264,6 +273,72 @@ public class SegmentMetadataQueryQueryToolChestTest
);
}
@Test
public void testMergeRollup()
{
final SegmentAnalysis analysis1 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null,
null,
null,
null
);
final SegmentAnalysis analysis2 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null,
null,
null,
false
);
final SegmentAnalysis analysis3 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null,
null,
null,
false
);
final SegmentAnalysis analysis4 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null,
null,
null,
true
);
final SegmentAnalysis analysis5 = new SegmentAnalysis(
"id",
null,
Maps.<String, ColumnAnalysis>newHashMap(),
0,
0,
null,
null,
null,
true
);
Assert.assertNull(mergeStrict(analysis1, analysis2).isRollup());
Assert.assertNull(mergeStrict(analysis1, analysis4).isRollup());
Assert.assertNull(mergeStrict(analysis2, analysis4).isRollup());
Assert.assertFalse(mergeStrict(analysis2, analysis3).isRollup());
Assert.assertTrue(mergeStrict(analysis4, analysis5).isRollup());
}
private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2)
{
return SegmentMetadataQueryQueryToolChest.finalizeAnalysis(

View File

@ -47,10 +47,12 @@ import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.TestHelper;
import io.druid.segment.TestIndex;
import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.timeline.LogicalSegment;
import org.joda.time.Interval;
import org.junit.Assert;
@ -78,26 +80,30 @@ public class SegmentMetadataQueryTest
@SuppressWarnings("unchecked")
public static QueryRunner makeMMappedQueryRunner(
String segmentId,
boolean rollup,
QueryRunnerFactory factory
)
{
QueryableIndex index = rollup ? TestIndex.getMMappedTestIndex() : TestIndex.getNoRollupMMappedTestIndex();
return QueryRunnerTestHelper.makeQueryRunner(
factory,
segmentId,
new QueryableIndexSegment(segmentId, TestIndex.getMMappedTestIndex())
new QueryableIndexSegment(segmentId, index)
);
}
@SuppressWarnings("unchecked")
public static QueryRunner makeIncrementalIndexQueryRunner(
String segmentId,
boolean rollup,
QueryRunnerFactory factory
)
{
IncrementalIndex index = rollup ? TestIndex.getIncrementalTestIndex() : TestIndex.getNoRollupIncrementalTestIndex();
return QueryRunnerTestHelper.makeQueryRunner(
factory,
segmentId,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId)
new IncrementalIndexSegment(index, segmentId)
);
}
@ -105,35 +111,42 @@ public class SegmentMetadataQueryTest
private final QueryRunner runner2;
private final boolean mmap1;
private final boolean mmap2;
private final boolean rollup1;
private final boolean rollup2;
private final boolean differentIds;
private final SegmentMetadataQuery testQuery;
private final SegmentAnalysis expectedSegmentAnalysis1;
private final SegmentAnalysis expectedSegmentAnalysis2;
@Parameterized.Parameters(name = "mmap1 = {0}, mmap2 = {1}, differentIds = {2}")
@Parameterized.Parameters(name = "mmap1 = {0}, mmap2 = {1}, rollup1 = {2}, rollup2 = {3}, differentIds = {4}")
public static Collection<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{true, true, false},
new Object[]{true, false, false},
new Object[]{false, true, false},
new Object[]{false, false, false},
new Object[]{false, false, true}
new Object[]{true, true, true, true, false},
new Object[]{true, false, true, false, false},
new Object[]{false, true, true, false, false},
new Object[]{false, false, false, false, false},
new Object[]{false, false, true, true, false},
new Object[]{false, false, false, true, true}
);
}
public SegmentMetadataQueryTest(
boolean mmap1,
boolean mmap2,
boolean rollup1,
boolean rollup2,
boolean differentIds
)
{
final String id1 = differentIds ? "testSegment1" : "testSegment";
final String id2 = differentIds ? "testSegment2" : "testSegment";
this.runner1 = mmap1 ? makeMMappedQueryRunner(id1, FACTORY) : makeIncrementalIndexQueryRunner(id1, FACTORY);
this.runner2 = mmap2 ? makeMMappedQueryRunner(id2, FACTORY) : makeIncrementalIndexQueryRunner(id2, FACTORY);
this.runner1 = mmap1 ? makeMMappedQueryRunner(id1, rollup1, FACTORY) : makeIncrementalIndexQueryRunner(id1, rollup1, FACTORY);
this.runner2 = mmap2 ? makeMMappedQueryRunner(id2, rollup2, FACTORY) : makeIncrementalIndexQueryRunner(id2, rollup2, FACTORY);
this.mmap1 = mmap1;
this.mmap2 = mmap2;
this.rollup1 = rollup1;
this.rollup2 = rollup2;
this.differentIds = differentIds;
testQuery = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
@ -183,6 +196,7 @@ public class SegmentMetadataQueryTest
1209,
null,
null,
null,
null
);
expectedSegmentAnalysis2 = new SegmentAnalysis(
@ -226,6 +240,7 @@ public class SegmentMetadataQueryTest
1209,
null,
null,
null,
null
);
}
@ -242,6 +257,75 @@ public class SegmentMetadataQueryTest
Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis1), results);
}
@Test
public void testSegmentMetadataQueryWithRollupMerge()
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
differentIds ? "merged" : "testSegment",
null,
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
0,
0,
null,
null,
null
),
"placementish",
new ColumnAnalysis(
ValueType.STRING.toString(),
true,
0,
0,
null,
null,
null
)
),
0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
null,
null,
rollup1 != rollup2 ? null : rollup1
);
QueryToolChest toolChest = FACTORY.getToolchest();
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
);
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement", "placementish")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.ROLLUP)
.merge(true)
.build(),
Maps.newHashMap()
),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test
public void testSegmentMetadataQueryWithHasMultipleValuesMerge()
{
@ -274,6 +358,7 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
null,
null,
null
);
@ -342,6 +427,7 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
null,
null,
null
);
@ -459,6 +545,7 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
null,
null,
null
);
@ -510,6 +597,7 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
null,
null,
null
);
@ -572,6 +660,7 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
expectedAggregators,
null,
null,
null
);
@ -630,6 +719,7 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
new TimestampSpec("ds", "auto", null),
null,
null
);
@ -688,7 +778,8 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
null,
QueryGranularities.NONE
QueryGranularities.NONE,
null
);
QueryToolChest toolChest = FACTORY.getToolchest();

View File

@ -19,25 +19,15 @@
package io.druid.query.metadata;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
@ -47,18 +37,13 @@ import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.TestHelper;
import io.druid.segment.TestIndex;
import io.druid.segment.column.ValueType;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class SegmentMetadataUnionQueryTest
@ -127,6 +112,7 @@ public class SegmentMetadataUnionQueryTest
4836,
null,
null,
null,
null
);
SegmentMetadataQuery query = new Druids.SegmentMetadataQueryBuilder()

View File

@ -61,6 +61,7 @@ public class EmptyIndexTest
);
TestHelper.getTestIndexMerger().merge(
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
true,
new AggregatorFactory[0],
tmpDir,
new IndexSpec()

View File

@ -25,7 +25,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
@ -33,7 +32,6 @@ import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@ -161,6 +159,7 @@ public class IndexBuilder
}
}
),
true,
Iterables.toArray(
Iterables.transform(
Arrays.asList(schema.getMetrics()),

View File

@ -273,6 +273,7 @@ public class IndexMergerTest
IncrementalIndexTest.getDefaultCombiningAggregatorFactories()
)
.setQueryGranularity(QueryGranularities.NONE)
.setRollup(Boolean.TRUE)
.putAll(metadataElems),
index.getMetadata()
);
@ -347,6 +348,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2),
true,
mergedAggregators,
mergedDir,
indexSpec
@ -424,6 +426,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2),
true,
new AggregatorFactory[]{},
tmpDir3,
indexSpec
@ -485,6 +488,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
ImmutableList.of(index1),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
indexSpec
@ -543,6 +547,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
ImmutableList.of(index1),
true,
mergedAggregators,
mergedDir,
indexSpec
@ -611,6 +616,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
ImmutableList.of(index1),
true,
mergedAggregators,
mergedDir,
newSpec
@ -841,6 +847,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2, index3),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
@ -940,6 +947,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2, index3),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
@ -1020,6 +1028,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
@ -1031,6 +1040,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB2),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
@ -1178,6 +1188,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
@ -1228,6 +1239,261 @@ public class IndexMergerTest
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921"));
}
@Test
public void testNoRollupMergeWithoutDuplicateRow() throws Exception
{
// (d1, d2, d3) from only one index, and their dim values are ('empty', 'has null', 'no null')
// (d4, d5, d6, d7, d8, d9) are from both indexes
// d4: 'empty' join 'empty'
// d5: 'empty' join 'has null'
// d6: 'empty' join 'no null'
// d7: 'has null' join 'has null'
// d8: 'has null' join 'no null'
// d9: 'no null' join 'no null'
IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0L)
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withRollup(false)
.build();
IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000);
toPersistA.add(
new MapBasedInputRow(
1,
Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910"
)
)
);
toPersistA.add(
new MapBasedInputRow(
2,
Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d2", "210", "d3", "311", "d7", "710", "d8", "810", "d9", "911"
)
)
);
IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000);
toPersistB.add(
new MapBasedInputRow(
3,
Arrays.asList("d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d5", "520", "d6", "620", "d7", "720", "d8", "820", "d9", "920"
)
)
);
toPersistB.add(
new MapBasedInputRow(
4,
Arrays.asList("d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d5", "", "d6", "621", "d7", "", "d8", "821", "d9", "921"
)
)
);
final File tmpDirA = temporaryFolder.newFolder();
final File tmpDirB = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex indexA = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
indexSpec
)
)
);
QueryableIndex indexB = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
Assert.assertEquals(
ImmutableList.of("d2", "d3", "d5", "d6", "d7", "d8", "d9"),
ImmutableList.copyOf(adapter.getDimensionNames())
);
Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims());
checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210"));
checkBitmapIndex(Lists.newArrayList(2, 3), adapter.getBitmapIndex("d3", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d3", "310"));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d3", "311"));
checkBitmapIndex(Lists.newArrayList(0, 1, 3), adapter.getBitmapIndex("d5", ""));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d5", "520"));
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("d6", ""));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d6", "620"));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d6", "621"));
checkBitmapIndex(Lists.newArrayList(0, 3), adapter.getBitmapIndex("d7", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d7", "710"));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d7", "720"));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d8", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d8", "810"));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d8", "820"));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d8", "821"));
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d9", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d9", "910"));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d9", "911"));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d9", "920"));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921"));
}
@Test
public void testNoRollupMergeWithDuplicateRow() throws Exception
{
// (d3, d6, d8, d9) as actually data from index1 and index2
// index1 has two duplicate rows
// index2 has 1 row which is same as index1 row and another different row
// then we can test
// 1. incrementalIndex with duplicate rows
// 2. incrementalIndex without duplicate rows
// 3. merge 2 indexes with duplicate rows
IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0L)
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withRollup(false)
.build();
IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000);
toPersistA.add(
new MapBasedInputRow(
1,
Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910"
)
)
);
toPersistA.add(
new MapBasedInputRow(
1,
Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910"
)
)
);
IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000);
toPersistB.add(
new MapBasedInputRow(
1,
Arrays.asList("d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d1", "", "d2", "", "d3", "310", "d7", "", "d9", "910"
)
)
);
toPersistB.add(
new MapBasedInputRow(
4,
Arrays.asList("d4", "d5", "d6", "d7", "d8", "d9"),
ImmutableMap.<String, Object>of(
"d5", "", "d6", "621", "d7", "", "d8", "821", "d9", "921"
)
)
);
final File tmpDirA = temporaryFolder.newFolder();
final File tmpDirB = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex indexA = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
indexSpec
)
)
);
QueryableIndex indexB = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB),
false,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
Assert.assertEquals(
ImmutableList.of("d3", "d6", "d8", "d9"),
ImmutableList.copyOf(adapter.getDimensionNames())
);
Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}, {0}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}, {0}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}, {0}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {1}, {1}}, boatList.get(3).getDims());
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d3", ""));
checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d3", "310"));
checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d6", ""));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d6", "621"));
checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d8", ""));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d8", "821"));
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d9", ""));
checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("d9", "910"));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921"));
}
private void checkBitmapIndex(ArrayList<Integer> expected, IndexedInts real)
{
Assert.assertEquals(expected.size(), real.size());
@ -1335,6 +1601,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB, indexBA, indexBA2),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
@ -1346,6 +1613,7 @@ public class IndexMergerTest
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB, indexBA, indexC),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged2,
indexSpec
@ -1463,6 +1731,7 @@ public class IndexMergerTest
INDEX_MERGER.merge(
toMerge,
true,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C"),
@ -1514,6 +1783,7 @@ public class IndexMergerTest
File merged = INDEX_MERGER.merge(
toMerge,
true,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
@ -1584,6 +1854,7 @@ public class IndexMergerTest
File merged = INDEX_MERGER.merge(
toMerge,
true,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
@ -1644,6 +1915,7 @@ public class IndexMergerTest
File merged = INDEX_MERGER.merge(
toMerge,
true,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("B", "B"),
@ -1688,6 +1960,7 @@ public class IndexMergerTest
final File merged = INDEX_MERGER.merge(
toMerge,
true,
new AggregatorFactory[]{
new LongSumAggregatorFactory("B", "B"),
new LongSumAggregatorFactory("A", "A"),
@ -1770,6 +2043,7 @@ public class IndexMergerTest
.withQueryGranularity(QueryGranularities.NONE)
.withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null))
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withRollup(true)
.build();
return new OnheapIncrementalIndex(schema, true, 1000);

View File

@ -489,6 +489,7 @@ public class IndexMergerV9WithSpatialIndexTest
INDEX_IO.loadIndex(secondFile),
INDEX_IO.loadIndex(thirdFile)
),
true,
METRIC_AGGS,
mergedFile,
indexSpec

View File

@ -52,6 +52,7 @@ public class MetadataTest
};
metadata.setAggregators(aggregators);
metadata.setQueryGranularity(QueryGranularities.ALL);
metadata.setRollup(Boolean.FALSE);
Metadata other = jsonMapper.readValue(
jsonMapper.writeValueAsString(metadata),
@ -81,12 +82,14 @@ public class MetadataTest
m1.setAggregators(aggs);
m1.setTimestampSpec(new TimestampSpec("ds", "auto", null));
m1.setQueryGranularity(QueryGranularities.ALL);
m1.setRollup(Boolean.FALSE);
Metadata m2 = new Metadata();
m2.put("k", "v");
m2.setAggregators(aggs);
m2.setTimestampSpec(new TimestampSpec("ds", "auto", null));
m2.setQueryGranularity(QueryGranularities.ALL);
m2.setRollup(Boolean.FALSE);
Metadata merged = new Metadata();
merged.put("k", "v");
@ -96,6 +99,7 @@ public class MetadataTest
}
);
merged.setTimestampSpec(new TimestampSpec("ds", "auto", null));
merged.setRollup(Boolean.FALSE);
merged.setQueryGranularity(QueryGranularities.ALL);
Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null));
@ -108,6 +112,7 @@ public class MetadataTest
merged.setAggregators(null);
merged.setTimestampSpec(null);
merged.setQueryGranularity(null);
merged.setRollup(null);
Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null));
//merge check with client explicitly providing merged aggregators
@ -123,6 +128,7 @@ public class MetadataTest
merged.setTimestampSpec(new TimestampSpec("ds", "auto", null));
merged.setQueryGranularity(QueryGranularities.ALL);
m1.setRollup(Boolean.TRUE);
Assert.assertEquals(
merged,
Metadata.merge(ImmutableList.of(m1, m2), explicitAggs)

View File

@ -197,6 +197,7 @@ public class SchemalessIndex
mergedIndex = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(INDEX_IO.loadIndex(topFile), INDEX_IO.loadIndex(bottomFile)),
true,
METRIC_AGGS,
mergedFile,
indexSpec
@ -242,6 +243,7 @@ public class SchemalessIndex
QueryableIndex index = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)),
true,
METRIC_AGGS,
mergedFile,
indexSpec
@ -280,7 +282,7 @@ public class SchemalessIndex
}
QueryableIndex index = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile, indexSpec)
INDEX_MERGER.mergeQueryableIndex(indexesToMerge, true, METRIC_AGGS, mergedFile, indexSpec)
);
return index;
@ -533,6 +535,7 @@ public class SchemalessIndex
}
)
),
true,
METRIC_AGGS,
mergedFile,
indexSpec

View File

@ -90,7 +90,9 @@ public class TestIndex
}
private static IncrementalIndex realtimeIndex = null;
private static IncrementalIndex noRollupRealtimeIndex = null;
private static QueryableIndex mmappedIndex = null;
private static QueryableIndex noRollupMmappedIndex = null;
private static QueryableIndex mergedRealtime = null;
public static IncrementalIndex getIncrementalTestIndex()
@ -104,6 +106,17 @@ public class TestIndex
return realtimeIndex = makeRealtimeIndex("druid.sample.tsv");
}
public static IncrementalIndex getNoRollupIncrementalTestIndex()
{
synchronized (log) {
if (noRollupRealtimeIndex != null) {
return noRollupRealtimeIndex;
}
}
return noRollupRealtimeIndex = makeRealtimeIndex("druid.sample.tsv", false);
}
public static QueryableIndex getMMappedTestIndex()
{
synchronized (log) {
@ -118,6 +131,20 @@ public class TestIndex
return mmappedIndex;
}
public static QueryableIndex getNoRollupMMappedTestIndex()
{
synchronized (log) {
if (noRollupMmappedIndex != null) {
return noRollupMmappedIndex;
}
}
IncrementalIndex incrementalIndex = getNoRollupIncrementalTestIndex();
noRollupMmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex);
return noRollupMmappedIndex;
}
public static QueryableIndex mergedRealtimeIndex()
{
synchronized (log) {
@ -149,6 +176,7 @@ public class TestIndex
mergedRealtime = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(INDEX_IO.loadIndex(topFile), INDEX_IO.loadIndex(bottomFile)),
true,
METRIC_AGGS,
mergedFile,
indexSpec
@ -164,6 +192,11 @@ public class TestIndex
}
public static IncrementalIndex makeRealtimeIndex(final String resourceFilename)
{
return makeRealtimeIndex(resourceFilename, true);
}
public static IncrementalIndex makeRealtimeIndex(final String resourceFilename, boolean rollup)
{
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
if (resource == null) {
@ -171,16 +204,22 @@ public class TestIndex
}
log.info("Realtime loading index file[%s]", resource);
CharSource stream = Resources.asByteSource(resource).asCharSource(Charsets.UTF_8);
return makeRealtimeIndex(stream);
return makeRealtimeIndex(stream, rollup);
}
public static IncrementalIndex makeRealtimeIndex(final CharSource source)
{
return makeRealtimeIndex(source, true);
}
public static IncrementalIndex makeRealtimeIndex(final CharSource source, boolean rollup)
{
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(METRIC_AGGS)
.withRollup(rollup)
.build();
final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000);
@ -188,7 +227,11 @@ public class TestIndex
return loadIncrementalIndex(retVal, source);
}
catch (Exception e) {
realtimeIndex = null;
if (rollup) {
realtimeIndex = null;
} else {
noRollupRealtimeIndex = null;
}
throw Throwables.propagate(e);
}
}

View File

@ -144,6 +144,38 @@ public class IncrementalIndexTest
);
}
}
},
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex(AggregatorFactory[] factories)
{
return IncrementalIndexTest.createNoRollupIndex(factories);
}
}
},
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex(AggregatorFactory[] factories)
{
return new OffheapIncrementalIndex(
0L, QueryGranularities.NONE, false, factories, 1000000,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
}
}
@ -171,6 +203,17 @@ public class IncrementalIndexTest
);
}
public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories)
{
if (null == aggregatorFactories) {
aggregatorFactories = defaultAggregatorFactories;
}
return new OnheapIncrementalIndex(
0L, QueryGranularities.NONE, false, aggregatorFactories, 1000000
);
}
public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
{
index.add(
@ -330,7 +373,8 @@ public class IncrementalIndexTest
new LinkedList<Result<TimeseriesResultValue>>()
);
Result<TimeseriesResultValue> result = Iterables.getOnlyElement(results);
Assert.assertEquals(rows, result.getValue().getLongMetric("rows").intValue());
boolean isRollup = index.isRollup();
Assert.assertEquals(rows * (isRollup ? 1 : 2), result.getValue().getLongMetric("rows").intValue());
for (int i = 0; i < dimensionCount; ++i) {
Assert.assertEquals(
String.format("Failed long sum on dimension %d", i),
@ -545,8 +589,12 @@ public class IncrementalIndexTest
runner.run(query, context),
new LinkedList<Result<TimeseriesResultValue>>()
);
boolean isRollup = index.isRollup();
for (Result<TimeseriesResultValue> result : results) {
Assert.assertEquals(elementsPerThread, result.getValue().getLongMetric("rows").intValue());
Assert.assertEquals(
elementsPerThread * (isRollup ? 1 : concurrentThreads),
result.getValue().getLongMetric("rows").intValue()
);
for (int i = 0; i < dimensionCount; ++i) {
Assert.assertEquals(
String.format("Failed long sum on dimension %d", i),
@ -594,17 +642,18 @@ public class IncrementalIndexTest
}
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
boolean isRollup = index.isRollup();
Assert.assertEquals(dimensionCount, index.getDimensionNames().size());
Assert.assertEquals(elementsPerThread, index.size());
Assert.assertEquals(elementsPerThread * (isRollup ? 1 : threadCount), index.size());
Iterator<Row> iterator = index.iterator();
int curr = 0;
while (iterator.hasNext()) {
Row row = iterator.next();
Assert.assertEquals(timestamp + curr, row.getTimestampFromEpoch());
Assert.assertEquals(Float.valueOf(threadCount), (Float) row.getFloatMetric("count"));
Assert.assertEquals(timestamp + (isRollup ? curr : curr / threadCount), row.getTimestampFromEpoch());
Assert.assertEquals(Float.valueOf(isRollup ? threadCount : 1), (Float) row.getFloatMetric("count"));
curr++;
}
Assert.assertEquals(elementsPerThread, curr);
Assert.assertEquals(elementsPerThread * (isRollup ? 1 : threadCount), curr);
}
@Test

View File

@ -439,6 +439,7 @@ public class SpatialFilterBonusTest
INDEX_IO.loadIndex(secondFile),
INDEX_IO.loadIndex(thirdFile)
),
true,
METRIC_AGGS,
mergedFile,
indexSpec

View File

@ -493,6 +493,7 @@ public class SpatialFilterTest
QueryableIndex mergedRealtime = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(INDEX_IO.loadIndex(firstFile), INDEX_IO.loadIndex(secondFile), INDEX_IO.loadIndex(thirdFile)),
true,
METRIC_AGGS,
mergedFile,
indexSpec

View File

@ -92,6 +92,7 @@ public class IncrementalIndexTest
.withQueryGranularity(QueryGranularities.MINUTE)
.withDimensionsSpec(dimensions)
.withMetrics(metrics)
.withRollup(true)
.build();
final List<Object[]> constructors = Lists.newArrayList();

View File

@ -146,7 +146,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
) throws IndexSizeExceededException
{
final Integer priorIdex = getFacts().get(key);
final Integer priorIdex = getFacts().getPriorIndex(key);
Aggregator[] aggs;
@ -169,7 +169,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && !getFacts().containsKey(key)) {
if (numEntries.get() >= maxRowCount && getFacts().getPriorIndex(key) == null) {
throw new IndexSizeExceededException("Maximum number of rows reached");
}
final Integer prev = getFacts().putIfAbsent(key, rowIndex);

View File

@ -28,6 +28,7 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@ -79,8 +80,8 @@ public class OnheapIncrementalIndexTest
public void run()
{
while (!Thread.interrupted()) {
for (int row : index.getFacts().values()) {
if (index.getMetricLongValue(row, 0) != 1) {
for (Map.Entry<IncrementalIndex.TimeAndDims, Integer> row : index.getFacts().entrySet()) {
if (index.getMetricLongValue(row.getValue(), 0) != 1) {
checkFailedCount.addAndGet(1);
}
}

View File

@ -41,14 +41,17 @@ public class ArbitraryGranularitySpec implements GranularitySpec
{
private final TreeSet<Interval> intervals;
private final QueryGranularity queryGranularity;
private final Boolean rollup;
@JsonCreator
public ArbitraryGranularitySpec(
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("intervals") List<Interval> inputIntervals
)
{
this.queryGranularity = queryGranularity;
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
if (inputIntervals == null) {
@ -80,6 +83,14 @@ public class ArbitraryGranularitySpec implements GranularitySpec
}
}
public ArbitraryGranularitySpec(
QueryGranularity queryGranularity,
List<Interval> inputIntervals
)
{
this(queryGranularity, true, inputIntervals);
}
@Override
@JsonProperty("intervals")
public Optional<SortedSet<Interval>> bucketIntervals()
@ -106,6 +117,13 @@ public class ArbitraryGranularitySpec implements GranularitySpec
throw new UnsupportedOperationException();
}
@Override
@JsonProperty("rollup")
public boolean isRollup()
{
return rollup;
}
@Override
@JsonProperty("queryGranularity")
public QueryGranularity getQueryGranularity()
@ -128,6 +146,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec
if (!intervals.equals(that.intervals)) {
return false;
}
if (!rollup.equals(that.rollup)) {
return false;
}
return !(queryGranularity != null
? !queryGranularity.equals(that.queryGranularity)
: that.queryGranularity != null);
@ -138,6 +159,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
public int hashCode()
{
int result = intervals.hashCode();
result = 31 * result + rollup.hashCode();
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
return result;
}

View File

@ -57,6 +57,8 @@ public interface GranularitySpec
public Granularity getSegmentGranularity();
public boolean isRollup();
public QueryGranularity getQueryGranularity();
}

View File

@ -26,8 +26,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularity;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -41,6 +41,7 @@ public class UniformGranularitySpec implements GranularitySpec
private final Granularity segmentGranularity;
private final QueryGranularity queryGranularity;
private final Boolean rollup;
private final List<Interval> inputIntervals;
private final ArbitraryGranularitySpec wrappedSpec;
@ -48,12 +49,14 @@ public class UniformGranularitySpec implements GranularitySpec
public UniformGranularitySpec(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("intervals") List<Interval> inputIntervals
)
{
this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity;
this.rollup = rollup == null ? Boolean.TRUE : rollup;
if (inputIntervals != null) {
List<Interval> granularIntervals = Lists.newArrayList();
@ -61,13 +64,22 @@ public class UniformGranularitySpec implements GranularitySpec
Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
}
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, granularIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals);
} else {
this.inputIntervals = null;
this.wrappedSpec = null;
}
}
public UniformGranularitySpec(
Granularity segmentGranularity,
QueryGranularity queryGranularity,
List<Interval> inputIntervals
)
{
this(segmentGranularity, queryGranularity, true, inputIntervals);
}
@Override
public Optional<SortedSet<Interval>> bucketIntervals()
{
@ -91,6 +103,13 @@ public class UniformGranularitySpec implements GranularitySpec
return segmentGranularity;
}
@Override
@JsonProperty("rollup")
public boolean isRollup()
{
return rollup;
}
@Override
@JsonProperty("queryGranularity")
public QueryGranularity getQueryGranularity()
@ -122,6 +141,9 @@ public class UniformGranularitySpec implements GranularitySpec
if (!queryGranularity.equals(that.queryGranularity)) {
return false;
}
if (!rollup.equals(that.rollup)) {
return false;
}
if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) {
return false;
}
@ -134,6 +156,7 @@ public class UniformGranularitySpec implements GranularitySpec
{
int result = segmentGranularity.hashCode();
result = 31 * result + queryGranularity.hashCode();
result = 31 * result + rollup.hashCode();
result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0);
result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0);
return result;

View File

@ -560,6 +560,7 @@ public class AppenderatorImpl implements Appenderator
final File mergedFile;
mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
tuningConfig.getIndexSpec()

View File

@ -412,6 +412,7 @@ public class RealtimePlumber implements Plumber
final File mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
config.getIndexSpec()

View File

@ -248,6 +248,7 @@ public class Sink implements Iterable<FireHydrant>
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withDimensionsSpec(schema.getParser())
.withMetrics(schema.getAggregators())
.withRollup(schema.getGranularitySpec().isRollup())
.build();
final IncrementalIndex newIndex = new OnheapIncrementalIndex(indexSchema, reportParseExceptions, maxRowsInMemory);

View File

@ -49,6 +49,8 @@ public class ArbitraryGranularityTest
new Interval("2012-01-01T00Z/2012-01-03T00Z")
));
Assert.assertTrue(spec.isRollup());
Assert.assertEquals(
Lists.newArrayList(
new Interval("2012-01-01T00Z/2012-01-03T00Z"),
@ -122,6 +124,21 @@ public class ArbitraryGranularityTest
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testRollupSetting()
{
List<Interval> intervals = Lists.newArrayList(
new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-02-01T00Z/2012-03-01T00Z"),
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
);
final GranularitySpec spec = new ArbitraryGranularitySpec(QueryGranularities.NONE, false, intervals);
Assert.assertFalse(spec.isRollup());
}
@Test
public void testOverlapViolationSameStartInstant()
{

View File

@ -31,6 +31,8 @@ import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class UniformGranularityTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@ -49,6 +51,8 @@ public class UniformGranularityTest
)
);
Assert.assertTrue(spec.isRollup());
Assert.assertEquals(
Lists.newArrayList(
new Interval("2012-01-01T00Z/P1D"),
@ -93,6 +97,20 @@ public class UniformGranularityTest
);
}
@Test
public void testRollupSetting()
{
List<Interval> intervals = Lists.newArrayList(
new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
);
final GranularitySpec spec = new UniformGranularitySpec(Granularity.DAY, QueryGranularities.NONE, false, intervals);
Assert.assertFalse(spec.isRollup());
}
@Test
public void testJson()
{