From eac776f1a7b3c6a80ccd97a33bf6b35331077ad5 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 2 Dec 2014 22:29:28 +0530 Subject: [PATCH] tests passing with on heap incremental index --- .../ApproximateHistogramAggregator.java | 6 + ...ApproximateHistogramFoldingAggregator.java | 6 + .../io/druid/indexer/IndexGeneratorJob.java | 9 +- .../druid/query/aggregation/Aggregator.java | 2 + .../druid/query/aggregation/Aggregators.java | 6 + .../query/aggregation/CountAggregator.java | 6 + .../aggregation/DoubleSumAggregator.java | 6 + .../query/aggregation/FilteredAggregator.java | 6 + .../aggregation/HistogramAggregator.java | 6 + .../aggregation/JavaScriptAggregator.java | 6 + .../query/aggregation/LongSumAggregator.java | 6 + .../query/aggregation/MaxAggregator.java | 6 + .../query/aggregation/MinAggregator.java | 6 + .../cardinality/CardinalityAggregator.java | 6 + .../hyperloglog/HyperUniquesAggregator.java | 6 + .../query/groupby/GroupByQueryHelper.java | 4 +- .../segment/incremental/IncrementalIndex.java | 840 ++---------------- .../incremental/IncrementalIndexAdapter.java | 3 +- .../IncrementalIndexStorageAdapter.java | 29 +- .../incremental/OffheapIncrementalIndex.java | 661 +++++++++++++- .../incremental/OnheapIncrementalIndex.java | 778 ++++++++++++++++ .../TimeseriesQueryRunnerBonusTest.java | 6 +- .../java/io/druid/segment/EmptyIndexTest.java | 6 +- .../io/druid/segment/IndexMergerTest.java | 7 +- .../io/druid/segment/SchemalessIndex.java | 11 +- .../test/java/io/druid/segment/TestIndex.java | 9 +- .../segment/data/IncrementalIndexTest.java | 11 +- .../filter/SpatialFilterBonusTest.java | 13 +- .../segment/filter/SpatialFilterTest.java | 13 +- .../IncrementalIndexStorageAdapterTest.java | 24 +- .../druid/segment/realtime/plumber/Sink.java | 9 +- 31 files changed, 1627 insertions(+), 886 deletions(-) create mode 100644 processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java diff --git a/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java b/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java index 7fcac6d7213..aa5df0a7485 100644 --- a/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java +++ b/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java @@ -89,6 +89,12 @@ public class ApproximateHistogramAggregator implements Aggregator throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()"); } + @Override + public long getLong() + { + throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getLong()"); + } + @Override public String getName() { diff --git a/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregator.java b/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregator.java index 51dc682de0a..1b234cd8fc5 100644 --- a/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregator.java +++ b/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingAggregator.java @@ -87,6 +87,12 @@ public class ApproximateHistogramFoldingAggregator implements Aggregator throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()"); } + @Override + public long getLong() + { + throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getLong()"); + } + @Override public String getName() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 2c8ab814141..b830a9614d4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -46,6 +46,7 @@ import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OffheapIncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configurable; @@ -645,12 +646,12 @@ public class IndexGeneratorJob implements Jobby if (tuningConfig.isIngestOffheap()) { return new OffheapIncrementalIndex( indexSchema, - new OffheapBufferPool(bufferSize) + new OffheapBufferPool(bufferSize), + true ); } else { - return new IncrementalIndex( - indexSchema, - new OffheapBufferPool(bufferSize) + return new OnheapIncrementalIndex( + indexSchema ); } } diff --git a/processing/src/main/java/io/druid/query/aggregation/Aggregator.java b/processing/src/main/java/io/druid/query/aggregation/Aggregator.java index a48feb65e55..2c3ee8cf4c1 100644 --- a/processing/src/main/java/io/druid/query/aggregation/Aggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/Aggregator.java @@ -39,4 +39,6 @@ public interface Aggregator { float getFloat(); String getName(); void close(); + + long getLong(); } diff --git a/processing/src/main/java/io/druid/query/aggregation/Aggregators.java b/processing/src/main/java/io/druid/query/aggregation/Aggregators.java index fb421e5d8d4..d4572aacc7e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/Aggregators.java +++ b/processing/src/main/java/io/druid/query/aggregation/Aggregators.java @@ -64,6 +64,12 @@ public class Aggregators { } + + @Override + public long getLong() + { + return 0; + } }; } diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregator.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregator.java index ca934ffad05..86ba5eecc88 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregator.java @@ -64,6 +64,12 @@ public class CountAggregator implements Aggregator return (float) count; } + @Override + public long getLong() + { + return count; + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregator.java index b9dcd56dbac..38b01b7cbe8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregator.java @@ -80,6 +80,12 @@ public class DoubleSumAggregator implements Aggregator return (float) sum; } + @Override + public long getLong() + { + return (long) sum; + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregator.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregator.java index 1c8bf9c76a2..25f97597d35 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregator.java @@ -58,6 +58,12 @@ public class FilteredAggregator implements Aggregator return delegate.getFloat(); } + @Override + public long getLong() + { + return delegate.getLong(); + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java index 8f4e9fd576c..83d1b432bb3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java @@ -74,6 +74,12 @@ public class HistogramAggregator implements Aggregator throw new UnsupportedOperationException("HistogramAggregator does not support getFloat()"); } + @Override + public long getLong() + { + throw new UnsupportedOperationException("HistogramAggregator does not support getLong()"); + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java index 4f85c1de81a..912bcb6563e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java @@ -76,6 +76,12 @@ public class JavaScriptAggregator implements Aggregator return (float) current; } + @Override + public long getLong() + { + return (long) current; + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregator.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregator.java index d268afe185f..ebee16956d5 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregator.java @@ -79,6 +79,12 @@ public class LongSumAggregator implements Aggregator return (float) sum; } + @Override + public long getLong() + { + return sum; + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/MaxAggregator.java b/processing/src/main/java/io/druid/query/aggregation/MaxAggregator.java index f3e40183ad7..dc4f2652b4b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/MaxAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/MaxAggregator.java @@ -71,6 +71,12 @@ public class MaxAggregator implements Aggregator return (float) max; } + @Override + public long getLong() + { + return (long) max; + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/MinAggregator.java b/processing/src/main/java/io/druid/query/aggregation/MinAggregator.java index c4e3dc67a31..9c2c08e1723 100644 --- a/processing/src/main/java/io/druid/query/aggregation/MinAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/MinAggregator.java @@ -71,6 +71,12 @@ public class MinAggregator implements Aggregator return (float) min; } + @Override + public long getLong() + { + return (long) min; + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java index afd893afc3f..d844978d540 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java @@ -126,6 +126,12 @@ public class CardinalityAggregator implements Aggregator throw new UnsupportedOperationException("CardinalityAggregator does not support getFloat()"); } + @Override + public long getLong() + { + throw new UnsupportedOperationException("CardinalityAggregator does not support getLong()"); + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java index 1aa8f6fd6d2..9602c2333ad 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java @@ -66,6 +66,12 @@ public class HyperUniquesAggregator implements Aggregator throw new UnsupportedOperationException(); } + @Override + public long getLong() + { + throw new UnsupportedOperationException(); + } + @Override public String getName() { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index a3ca6dbee72..6ff2b70fa87 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.OffheapIncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import java.nio.ByteBuffer; import java.util.List; @@ -86,13 +87,12 @@ public class GroupByQueryHelper false ); } else { - index = new IncrementalIndex( + index = new OnheapIncrementalIndex( // use granularity truncated min timestamp // since incoming truncated timestamps may precede timeStart granTimeStart, gran, aggs.toArray(new AggregatorFactory[aggs.size()]), - bufferPool, false ); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index cb2cc7e3b0c..fdfb9519794 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -20,687 +20,93 @@ package io.druid.segment.incremental; import com.google.common.base.Function; -import com.google.common.base.Throwables; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; -import io.druid.data.input.impl.SpatialDimensionSchema; -import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.PostAggregator; -import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ColumnCapabilitiesImpl; -import io.druid.segment.column.ValueType; -import io.druid.segment.data.IndexedInts; -import io.druid.segment.serde.ComplexMetricExtractor; -import io.druid.segment.serde.ComplexMetricSerde; -import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; /** */ -public class IncrementalIndex implements Iterable, Closeable +public interface IncrementalIndex extends Iterable, Closeable { - private final long minTimestamp; - private final QueryGranularity gran; - private final List> rowTransformers; - private final AggregatorFactory[] metrics; - private final Map metricIndexes; - private final Map metricTypes; - private final ImmutableList metricNames; - private final BufferAggregator[] aggs; - private final int[] aggPositionOffsets; - private final int totalAggSize; - private final LinkedHashMap dimensionOrder; - protected final CopyOnWriteArrayList dimensions; - private final DimensionHolder dimValues; - private final Map columnCapabilities; - private final ConcurrentNavigableMap facts; - private final ResourceHolder bufferHolder; - private volatile AtomicInteger numEntries = new AtomicInteger(); - // This is modified on add() in a critical section. - private ThreadLocal in = new ThreadLocal<>(); - /** - * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that - * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. - * - * @param incrementalIndexSchema - * @param bufferPool - * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input - * value for aggregators that return metrics other than float. - */ - public IncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - StupidPool bufferPool, - final boolean deserializeComplexMetrics - ) + List getDimensions(); + + ConcurrentNavigableMap getFacts(); + + Integer getDimensionIndex(String dimension); + + List getMetricNames(); + + DimDim getDimension(String dimension); + + AggregatorFactory[] getMetricAggs(); + + Interval getInterval(); + + DateTime getMinTime(); + + DateTime getMaxTime(); + + boolean isEmpty(); + + ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end); + + Integer getMetricIndex(String columnName); + + String getMetricType(String metric); + + ColumnCapabilities getCapabilities(String column); + + int size(); + + float getMetricFloatValue(int rowOffset, int aggOffset); + + long getMetricLongValue(int rowOffset, int aggOffset); + + Object getMetricObjectValue(int rowOffset, int aggOffset); + + Iterable iterableWithPostAggregations(List postAggregatorSpecs); + + int add(InputRow inputRow); + + void close(); + + InputRow formatRow(InputRow parse); + + static interface DimDim { - this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); - this.gran = incrementalIndexSchema.getGran(); - this.metrics = incrementalIndexSchema.getMetrics(); - this.rowTransformers = Lists.newCopyOnWriteArrayList(); + public String get(String value); - final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); - final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); - final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); - this.aggs = new BufferAggregator[metrics.length]; - this.aggPositionOffsets = new int[metrics.length]; - int currAggSize = 0; - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorizeBuffered( - new ColumnSelectorFactory() - { - @Override - public LongColumnSelector makeLongColumnSelector(final String columnName) - { - if(columnName.equals(Column.TIME_COLUMN_NAME)){ - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getTimestampFromEpoch(); - } - }; - } - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getLongMetric(columnName); - } - }; - } + public int getId(String value); - @Override - public FloatColumnSelector makeFloatColumnSelector(final String columnName) - { - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.get().getFloatMetric(columnName); - } - }; - } + public String getValue(int id); - @Override - public ObjectColumnSelector makeObjectColumnSelector(final String column) - { - final String typeName = agg.getTypeName(); + public boolean contains(String value); - final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } + public int size(); - @Override - public Object get() - { - return in.get().getRaw(column); - } - }; + public int add(String value); - if (!deserializeComplexMetrics) { - return rawColumnSelector; - } else { - if (typeName.equals("float")) { - return rawColumnSelector; - } + public int getSortedId(String value); - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } + public String getSortedValue(int index); - final ComplexMetricExtractor extractor = serde.getExtractor(); - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); - } + public void sort(); - @Override - public Object get() - { - return extractor.extractValue(in.get(), column); - } - }; - } - } - - @Override - public DimensionSelector makeDimensionSelector(final String dimension) - { - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - final List dimensionValues = in.get().getDimension(dimension); - final ArrayList vals = Lists.newArrayList(); - if (dimensionValues != null) { - for (int i = 0; i < dimensionValues.size(); ++i) { - vals.add(i); - } - } - - return new IndexedInts() - { - @Override - public int size() - { - return vals.size(); - } - - @Override - public int get(int index) - { - return vals.get(index); - } - - @Override - public Iterator iterator() - { - return vals.iterator(); - } - }; - } - - @Override - public int getValueCardinality() - { - throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); - } - - @Override - public String lookupName(int id) - { - return in.get().getDimension(dimension).get(id); - } - - @Override - public int lookupId(String name) - { - return in.get().getDimension(dimension).indexOf(name); - } - }; - } - } - ); - aggPositionOffsets[i] = currAggSize; - currAggSize += agg.getMaxIntermediateSize(); - final String metricName = metrics[i].getName(); - metricNamesBuilder.add(metricName); - metricIndexesBuilder.put(metricName, i); - metricTypesBuilder.put(metricName, metrics[i].getTypeName()); - } - metricNames = metricNamesBuilder.build(); - metricIndexes = metricIndexesBuilder.build(); - metricTypes = metricTypesBuilder.build(); - - this.totalAggSize = currAggSize; - - this.dimensionOrder = Maps.newLinkedHashMap(); - this.dimensions = new CopyOnWriteArrayList<>(); - // This should really be more generic - List spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions(); - if (!spatialDimensions.isEmpty()) { - this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); - } - - this.columnCapabilities = Maps.newHashMap(); - for (Map.Entry entry : metricTypes.entrySet()) { - ValueType type; - if (entry.getValue().equalsIgnoreCase("float")) { - type = ValueType.FLOAT; - } else if (entry.getValue().equalsIgnoreCase("long")) { - type = ValueType.LONG; - } else { - type = ValueType.COMPLEX; - } - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(type); - columnCapabilities.put(entry.getKey(), capabilities); - } - for (String dimension : dimensions) { - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dimension, capabilities); - } - for (SpatialDimensionSchema spatialDimension : spatialDimensions) { - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - capabilities.setHasSpatialIndexes(true); - columnCapabilities.put(spatialDimension.getDimName(), capabilities); - } - this.bufferHolder = bufferPool.take(); - this.dimValues = new DimensionHolder(); - this.facts = createFactsTable(); - } - - protected ConcurrentNavigableMap createFactsTable() { - return new ConcurrentSkipListMap<>(); - } - - public IncrementalIndex( - long minTimestamp, - QueryGranularity gran, - final AggregatorFactory[] metrics, - StupidPool bufferPool - ) - { - this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - bufferPool, - true - ); - } - - public IncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - StupidPool bufferPool - ) - { - this(incrementalIndexSchema, bufferPool, true); - } - - public IncrementalIndex( - long minTimestamp, - QueryGranularity gran, - final AggregatorFactory[] metrics, - StupidPool bufferPool, - boolean deserializeComplexMetrics - ) - { - this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - bufferPool, - deserializeComplexMetrics - ); - } - - public InputRow formatRow(InputRow row) - { - for (Function rowTransformer : rowTransformers) { - row = rowTransformer.apply(row); - } - - if (row == null) { - throw new IAE("Row is null? How can this be?!"); - } - return row; - } - - /** - * Adds a new row. The row might correspond with another row that already exists, in which case this will - * update that row instead of inserting a new one. - *

- *

- * Calls to add() are thread safe. - *

- * - * @param row the row of data to add - * - * @return the number of rows in the data set after adding the InputRow - */ - public int add(InputRow row) - { - row = formatRow(row); - if (row.getTimestampFromEpoch() < minTimestamp) { - throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp)); - } - - final List rowDimensions = row.getDimensions(); - - String[][] dims; - List overflow = null; - synchronized (dimensionOrder) { - dims = new String[dimensionOrder.size()][]; - for (String dimension : rowDimensions) { - List dimensionValues = row.getDimension(dimension); - - // Set column capabilities as data is coming in - ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); - if (capabilities == null) { - capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dimension, capabilities); - } - if (dimensionValues.size() > 1) { - capabilities.setHasMultipleValues(true); - } - - Integer index = dimensionOrder.get(dimension); - if (index == null) { - dimensionOrder.put(dimension, dimensionOrder.size()); - dimensions.add(dimension); - - if (overflow == null) { - overflow = Lists.newArrayList(); - } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); - } else { - dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); - } - } - } - - - if (overflow != null) { - // Merge overflow and non-overflow - String[][] newDims = new String[dims.length + overflow.size()][]; - System.arraycopy(dims, 0, newDims, 0, dims.length); - for (int i = 0; i < overflow.size(); ++i) { - newDims[dims.length + i] = overflow.get(i); - } - dims = newDims; - } - - final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - Integer rowOffset; - synchronized (this) { - rowOffset = totalAggSize * numEntries.get(); - final Integer prev = facts.putIfAbsent(key, rowOffset); - if (prev != null) { - rowOffset = prev; - } else { - if (rowOffset + totalAggSize > bufferHolder.get().limit()) { - facts.remove(key); - throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get()); - } - numEntries.incrementAndGet(); - for (int i = 0; i < aggs.length; i++) { - aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); - } - } - } - in.set(row); - for (int i = 0; i < aggs.length; i++) { - synchronized (aggs[i]) { - aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); - } - } - in.set(null); - return numEntries.get(); - } - - public boolean isEmpty() - { - return numEntries.get() == 0; - } - - public int size() - { - return numEntries.get(); - } - - public long getMinTimeMillis() - { - return facts.firstKey().getTimestamp(); - } - - public long getMaxTimeMillis() - { - return facts.lastKey().getTimestamp(); - } - - private String[] getDimVals(final DimDim dimLookup, final List dimValues) - { - final String[] retVal = new String[dimValues.size()]; - - int count = 0; - for (String dimValue : dimValues) { - String canonicalDimValue = dimLookup.get(dimValue); - if (!dimLookup.contains(canonicalDimValue)) { - dimLookup.add(dimValue); - } - retVal[count] = canonicalDimValue; - count++; - } - Arrays.sort(retVal); - - return retVal; - } - - public AggregatorFactory[] getMetricAggs() - { - return metrics; - } - - public List getDimensions() - { - return dimensions; - } - - public String getMetricType(String metric) - { - return metricTypes.get(metric); - } - - public long getMinTimestamp() - { - return minTimestamp; - } - - public QueryGranularity getGranularity() - { - return gran; - } - - public Interval getInterval() - { - return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis())); - } - - public DateTime getMinTime() - { - return isEmpty() ? null : new DateTime(getMinTimeMillis()); - } - - public DateTime getMaxTime() - { - return isEmpty() ? null : new DateTime(getMaxTimeMillis()); - } - - DimDim getDimension(String dimension) - { - return isEmpty() ? null : dimValues.get(dimension); - } - - Integer getDimensionIndex(String dimension) - { - return dimensionOrder.get(dimension); - } - - List getMetricNames() - { - return metricNames; - } - - Integer getMetricIndex(String metricName) - { - return metricIndexes.get(metricName); - } - - int getMetricPosition(int rowOffset, int metricIndex) - { - return rowOffset + aggPositionOffsets[metricIndex]; - } - - ByteBuffer getMetricBuffer() - { - return bufferHolder.get(); - } - - BufferAggregator getAggregator(int metricIndex) - { - return aggs[metricIndex]; - } - - ColumnCapabilities getCapabilities(String column) - { - return columnCapabilities.get(column); - } - - ConcurrentNavigableMap getFacts() - { - return facts; - } - - ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) - { - return facts.subMap(start, end); - } - - @Override - public Iterator iterator() - { - return iterableWithPostAggregations(null).iterator(); - } - - public Iterable iterableWithPostAggregations(final List postAggs) - { - return new Iterable() - { - @Override - public Iterator iterator() - { - return Iterators.transform( - facts.entrySet().iterator(), - new Function, Row>() - { - @Override - public Row apply(final Map.Entry input) - { - final TimeAndDims timeAndDims = input.getKey(); - final int rowOffset = input.getValue(); - - String[][] theDims = timeAndDims.getDims(); - - Map theVals = Maps.newLinkedHashMap(); - for (int i = 0; i < theDims.length; ++i) { - String[] dim = theDims[i]; - if (dim != null && dim.length != 0) { - theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); - } - } - - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i))); - } - - if (postAggs != null) { - for (PostAggregator postAgg : postAggs) { - theVals.put(postAgg.getName(), postAgg.compute(theVals)); - } - } - - return new MapBasedRow(timeAndDims.getTimestamp(), theVals); - } - } - ); - } - }; - } - - @Override - public void close() - { - try { - bufferHolder.close(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - class DimensionHolder - { - private final Map dimensions; - - DimensionHolder() - { - dimensions = Maps.newConcurrentMap(); - } - - void reset() - { - dimensions.clear(); - } - - DimDim add(String dimension) - { - DimDim holder = dimensions.get(dimension); - if (holder == null) { - holder = createDimDim(dimension); - dimensions.put(dimension, holder); - } else { - throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); - } - return holder; - } - - DimDim get(String dimension) - { - return dimensions.get(dimension); - } - } - - protected DimDim createDimDim(String dimension){ - return new DimDimImpl(); + public boolean compareCannonicalValues(String s1, String s2); } static class TimeAndDims implements Comparable @@ -773,134 +179,18 @@ public class IncrementalIndex implements Iterable, Closeable "timestamp=" + new DateTime(timestamp) + ", dims=" + Lists.transform( Arrays.asList(dims), new Function() - { - @Override - public Object apply(@Nullable String[] input) - { - if (input == null || input.length == 0) { - return Arrays.asList("null"); + { + @Override + public Object apply(@Nullable String[] input) + { + if (input == null || input.length == 0) { + return Arrays.asList("null"); + } + return Arrays.asList(input); + } } - return Arrays.asList(input); - } - } ) + '}'; } } - - static interface DimDim - { - public String get(String value); - - public int getId(String value); - - public String getValue(int id); - - public boolean contains(String value); - - public int size(); - - public int add(String value); - - public int getSortedId(String value); - - public String getSortedValue(int index); - - public void sort(); - - public boolean compareCannonicalValues(String s1, String s2); - } - - private static class DimDimImpl implements DimDim{ - private final Map falseIds; - private final Map falseIdsReverse; - private volatile String[] sortedVals = null; - final ConcurrentMap poorMansInterning = Maps.newConcurrentMap(); - - - public DimDimImpl() - { - BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); - falseIds = biMap; - falseIdsReverse = biMap.inverse(); - } - - /** - * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` - * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) - */ - public String get(String str) - { - String prev = poorMansInterning.putIfAbsent(str, str); - return prev != null ? prev : str; - } - - public int getId(String value) - { - if (value == null) { - value = ""; - } - final Integer id = falseIds.get(value); - return id == null ? -1 : id; - } - - public String getValue(int id) - { - return falseIdsReverse.get(id); - } - - public boolean contains(String value) - { - return falseIds.containsKey(value); - } - - public int size() - { - return falseIds.size(); - } - - public synchronized int add(String value) - { - int id = falseIds.size(); - falseIds.put(value, id); - return id; - } - - public int getSortedId(String value) - { - assertSorted(); - return Arrays.binarySearch(sortedVals, value); - } - - public String getSortedValue(int index) - { - assertSorted(); - return sortedVals[index]; - } - - public void sort() - { - if (sortedVals == null) { - sortedVals = new String[falseIds.size()]; - - int index = 0; - for (String value : falseIds.keySet()) { - sortedVals[index++] = value; - } - Arrays.sort(sortedVals); - } - } - - private void assertSorted() - { - if (sortedVals == null) { - throw new ISE("Call sort() before calling the getSorted* methods."); - } - } - - public boolean compareCannonicalValues(String s1, String s2) - { - return s1 ==s2; - } - } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index f37a9962fb3..77d0d9e0c62 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -205,8 +205,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter Object[] metrics = new Object[index.getMetricAggs().length]; for (int i = 0; i < metrics.length; i++) { - metrics[i] = index.getAggregator(i) - .get(index.getMetricBuffer(), index.getMetricPosition(rowOffset, i)); + metrics[i] = index.getMetricObjectValue(rowOffset, i); } return new Rowboat( diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index c36f8820aa0..6eb3404a15f 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -1,4 +1,3 @@ - /* * Druid - a distributed column store. * Copyright (C) 2012, 2013 Metamarkets Group Inc. @@ -30,7 +29,6 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; import io.druid.query.QueryInterruptedException; -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcherFactory; @@ -352,17 +350,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int metricIndex = metricIndexInt; - final BufferAggregator agg = index.getAggregator(metricIndex); - return new FloatColumnSelector() { @Override public float get() { - return agg.getFloat( - index.getMetricBuffer(), - index.getMetricPosition(currEntry.getValue(), metricIndex) - ); + return index.getMetricFloatValue(currEntry.getValue(), metricIndex); } }; } @@ -370,7 +363,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public LongColumnSelector makeLongColumnSelector(String columnName) { - if(columnName.equals(Column.TIME_COLUMN_NAME)){ + if (columnName.equals(Column.TIME_COLUMN_NAME)) { return new LongColumnSelector() { @Override @@ -393,16 +386,15 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int metricIndex = metricIndexInt; - final BufferAggregator agg = index.getAggregator(metricIndex); return new LongColumnSelector() { @Override public long get() { - return agg.getLong( - index.getMetricBuffer(), - index.getMetricPosition(currEntry.getValue(), metricIndex) + return index.getMetricLongValue( + currEntry.getValue(), + metricIndex ); } }; @@ -417,7 +409,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter final int metricIndex = metricIndexInt; final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(column)); - final BufferAggregator agg = index.getAggregator(metricIndex); return new ObjectColumnSelector() { @Override @@ -429,9 +420,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Object get() { - return agg.get( - index.getMetricBuffer(), - index.getMetricPosition(currEntry.getValue(), metricIndex) + return index.getMetricObjectValue( + currEntry.getValue(), + metricIndex ); } }; @@ -453,7 +444,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter public Object get() { final String[][] dims = currEntry.getKey().getDims(); - if(dimensionIndex >= dims.length) { + if (dimensionIndex >= dims.length) { return null; } final String[] dimVals = dims[dimensionIndex]; @@ -562,7 +553,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } for (String dimVal : dims[dimIndex]) { - if (dimDim.compareCannonicalValues(id,dimVal)) { + if (dimDim.compareCannonicalValues(id, dimVal)) { return true; } } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 7e048c37e70..038668bec30 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -20,84 +20,693 @@ package io.druid.segment.incremental; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.metamx.common.IAE; import com.metamx.common.ISE; +import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ColumnCapabilitiesImpl; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; +import io.druid.segment.serde.ComplexMetrics; +import org.joda.time.DateTime; +import org.joda.time.Interval; import org.mapdb.BTreeKeySerializer; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; +import javax.annotation.Nullable; +import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; -public class OffheapIncrementalIndex extends IncrementalIndex +public class OffheapIncrementalIndex implements IncrementalIndex { - private volatile DB db; - private volatile DB factsDb; + private final long minTimestamp; + private final QueryGranularity gran; + private final List> rowTransformers; + private final AggregatorFactory[] metrics; + private final Map metricIndexes; + private final Map metricTypes; + private final ImmutableList metricNames; + private final BufferAggregator[] aggs; + private final int[] aggPositionOffsets; + private final int totalAggSize; + private final LinkedHashMap dimensionOrder; + protected final CopyOnWriteArrayList dimensions; + private final DimensionHolder dimValues; + private final Map columnCapabilities; + private final ConcurrentNavigableMap facts; + private final ResourceHolder bufferHolder; + private final DB db; + private final DB factsDb; + private volatile AtomicInteger numEntries = new AtomicInteger(); + // This is modified on add() in a critical section. + private ThreadLocal in = new ThreadLocal<>(); + /** + * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that + * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. + * + * @param incrementalIndexSchema + * @param bufferPool + * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input + * value for aggregators that return metrics other than float. + */ public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, - StupidPool bufferPool + StupidPool bufferPool, + final boolean deserializeComplexMetrics ) { - super(incrementalIndexSchema, bufferPool); + this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); + this.gran = incrementalIndexSchema.getGran(); + this.metrics = incrementalIndexSchema.getMetrics(); + this.rowTransformers = Lists.newCopyOnWriteArrayList(); + + final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); + final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); + final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); + this.aggs = new BufferAggregator[metrics.length]; + this.aggPositionOffsets = new int[metrics.length]; + int currAggSize = 0; + for (int i = 0; i < metrics.length; i++) { + final AggregatorFactory agg = metrics[i]; + aggs[i] = agg.factorizeBuffered( + new ColumnSelectorFactory() + { + @Override + public LongColumnSelector makeLongColumnSelector(final String columnName) + { + if(columnName.equals(Column.TIME_COLUMN_NAME)){ + return new LongColumnSelector() + { + @Override + public long get() + { + return in.get().getTimestampFromEpoch(); + } + }; + } + return new LongColumnSelector() + { + @Override + public long get() + { + return in.get().getLongMetric(columnName); + } + }; + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(final String columnName) + { + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.get().getFloatMetric(columnName); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(final String column) + { + final String typeName = agg.getTypeName(); + + final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + public Object get() + { + return in.get().getRaw(column); + } + }; + + if (!deserializeComplexMetrics) { + return rawColumnSelector; + } else { + if (typeName.equals("float")) { + return rawColumnSelector; + } + + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } + + final ComplexMetricExtractor extractor = serde.getExtractor(); + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } + + @Override + public Object get() + { + return extractor.extractValue(in.get(), column); + } + }; + } + } + + @Override + public DimensionSelector makeDimensionSelector(final String dimension) + { + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + final List dimensionValues = in.get().getDimension(dimension); + final ArrayList vals = Lists.newArrayList(); + if (dimensionValues != null) { + for (int i = 0; i < dimensionValues.size(); ++i) { + vals.add(i); + } + } + + return new IndexedInts() + { + @Override + public int size() + { + return vals.size(); + } + + @Override + public int get(int index) + { + return vals.get(index); + } + + @Override + public Iterator iterator() + { + return vals.iterator(); + } + }; + } + + @Override + public int getValueCardinality() + { + throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); + } + + @Override + public String lookupName(int id) + { + return in.get().getDimension(dimension).get(id); + } + + @Override + public int lookupId(String name) + { + return in.get().getDimension(dimension).indexOf(name); + } + }; + } + } + ); + aggPositionOffsets[i] = currAggSize; + currAggSize += agg.getMaxIntermediateSize(); + final String metricName = metrics[i].getName(); + metricNamesBuilder.add(metricName); + metricIndexesBuilder.put(metricName, i); + metricTypesBuilder.put(metricName, metrics[i].getTypeName()); + } + metricNames = metricNamesBuilder.build(); + metricIndexes = metricIndexesBuilder.build(); + metricTypes = metricTypesBuilder.build(); + + this.totalAggSize = currAggSize; + + this.dimensionOrder = Maps.newLinkedHashMap(); + this.dimensions = new CopyOnWriteArrayList<>(); + // This should really be more generic + List spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions(); + if (!spatialDimensions.isEmpty()) { + this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); + } + + this.columnCapabilities = Maps.newHashMap(); + for (Map.Entry entry : metricTypes.entrySet()) { + ValueType type; + if (entry.getValue().equalsIgnoreCase("float")) { + type = ValueType.FLOAT; + } else if (entry.getValue().equalsIgnoreCase("long")) { + type = ValueType.LONG; + } else { + type = ValueType.COMPLEX; + } + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(type); + columnCapabilities.put(entry.getKey(), capabilities); + } + for (String dimension : dimensions) { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dimension, capabilities); + } + for (SpatialDimensionSchema spatialDimension : spatialDimensions) { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + capabilities.setHasSpatialIndexes(true); + columnCapabilities.put(spatialDimension.getDimName(), capabilities); + } + this.bufferHolder = bufferPool.take(); + this.dimValues = new DimensionHolder(); + final DBMaker dbMaker = DBMaker.newMemoryDirectDB() + .transactionDisable() + .asyncWriteEnable() + .cacheSoftRefEnable(); + factsDb = dbMaker.make(); + db = dbMaker.make(); + final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); + this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); } + public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, final AggregatorFactory[] metrics, StupidPool bufferPool, boolean deserializeComplexMetrics - ) { - super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics); + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + bufferPool, + deserializeComplexMetrics + ); } - @Override - protected synchronized ConcurrentNavigableMap createFactsTable() + public InputRow formatRow(InputRow row) { - if (factsDb == null) { - final DBMaker dbMaker = DBMaker.newMemoryDirectDB() - .transactionDisable() - .asyncWriteEnable() - .cacheSoftRefEnable(); - factsDb = dbMaker.make(); - db = dbMaker.make(); + for (Function rowTransformer : rowTransformers) { + row = rowTransformer.apply(row); } - final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); - return factsDb.createTreeMap("__facts" + UUID.randomUUID()) - .keySerializer(timeAndDimsSerializer) - .comparator(timeAndDimsSerializer.getComparator()) - .valueSerializer(Serializer.INTEGER) - .make(); + + if (row == null) { + throw new IAE("Row is null? How can this be?!"); + } + return row; + } + + /** + * Adds a new row. The row might correspond with another row that already exists, in which case this will + * update that row instead of inserting a new one. + *

+ *

+ * Calls to add() are thread safe. + *

+ * + * @param row the row of data to add + * + * @return the number of rows in the data set after adding the InputRow + */ + public int add(InputRow row) + { + row = formatRow(row); + if (row.getTimestampFromEpoch() < minTimestamp) { + throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp)); + } + + final List rowDimensions = row.getDimensions(); + + String[][] dims; + List overflow = null; + synchronized (dimensionOrder) { + dims = new String[dimensionOrder.size()][]; + for (String dimension : rowDimensions) { + List dimensionValues = row.getDimension(dimension); + + // Set column capabilities as data is coming in + ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); + if (capabilities == null) { + capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dimension, capabilities); + } + if (dimensionValues.size() > 1) { + capabilities.setHasMultipleValues(true); + } + + Integer index = dimensionOrder.get(dimension); + if (index == null) { + dimensionOrder.put(dimension, dimensionOrder.size()); + dimensions.add(dimension); + + if (overflow == null) { + overflow = Lists.newArrayList(); + } + overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + } else { + dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + } + } + } + + + if (overflow != null) { + // Merge overflow and non-overflow + String[][] newDims = new String[dims.length + overflow.size()][]; + System.arraycopy(dims, 0, newDims, 0, dims.length); + for (int i = 0; i < overflow.size(); ++i) { + newDims[dims.length + i] = overflow.get(i); + } + dims = newDims; + } + + final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); + Integer rowOffset; + synchronized (this) { + rowOffset = totalAggSize * numEntries.get(); + final Integer prev = facts.putIfAbsent(key, rowOffset); + if (prev != null) { + rowOffset = prev; + } else { + if (rowOffset + totalAggSize > bufferHolder.get().limit()) { + facts.remove(key); + throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get()); + } + numEntries.incrementAndGet(); + for (int i = 0; i < aggs.length; i++) { + aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); + } + } + } + in.set(row); + for (int i = 0; i < aggs.length; i++) { + synchronized (aggs[i]) { + aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); + } + } + in.set(null); + return numEntries.get(); + } + + public boolean isEmpty() + { + return numEntries.get() == 0; + } + + public int size() + { + return numEntries.get(); + } + + public long getMinTimeMillis() + { + return facts.firstKey().getTimestamp(); + } + + public long getMaxTimeMillis() + { + return facts.lastKey().getTimestamp(); + } + + private String[] getDimVals(final DimDim dimLookup, final List dimValues) + { + final String[] retVal = new String[dimValues.size()]; + + int count = 0; + for (String dimValue : dimValues) { + String canonicalDimValue = dimLookup.get(dimValue); + if (!dimLookup.contains(canonicalDimValue)) { + dimLookup.add(dimValue); + } + retVal[count] = canonicalDimValue; + count++; + } + Arrays.sort(retVal); + + return retVal; + } + + public AggregatorFactory[] getMetricAggs() + { + return metrics; + } + + public List getDimensions() + { + return dimensions; + } + + public String getMetricType(String metric) + { + return metricTypes.get(metric); + } + + public long getMinTimestamp() + { + return minTimestamp; + } + + public QueryGranularity getGranularity() + { + return gran; + } + + public Interval getInterval() + { + return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis())); + } + + public DateTime getMinTime() + { + return isEmpty() ? null : new DateTime(getMinTimeMillis()); + } + + public DateTime getMaxTime() + { + return isEmpty() ? null : new DateTime(getMaxTimeMillis()); + } + + public DimDim getDimension(String dimension) + { + return isEmpty() ? null : dimValues.get(dimension); + } + + public Integer getDimensionIndex(String dimension) + { + return dimensionOrder.get(dimension); + } + + public List getMetricNames() + { + return metricNames; + } + + public Integer getMetricIndex(String metricName) + { + return metricIndexes.get(metricName); + } + + private int getMetricPosition(int rowOffset, int metricIndex) + { + return rowOffset + aggPositionOffsets[metricIndex]; } @Override - protected DimDim createDimDim(String dimension) + public float getMetricFloatValue(int rowOffset, int aggOffset) { - return new OffheapDimDim(dimension); + return aggs[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); + } + + @Override + public long getMetricLongValue(int rowOffset, int aggOffset) + { + return aggs[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); + } + + @Override + public Object getMetricObjectValue(int rowOffset, int aggOffset) + { + return aggs[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); + } + + public ColumnCapabilities getCapabilities(String column) + { + return columnCapabilities.get(column); + } + + public ConcurrentNavigableMap getFacts() + { + return facts; + } + + public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) + { + return facts.subMap(start, end); + } + + @Override + public Iterator iterator() + { + return iterableWithPostAggregations(null).iterator(); + } + + public Iterable iterableWithPostAggregations(final List postAggs) + { + return new Iterable() + { + @Override + public Iterator iterator() + { + return Iterators.transform( + facts.entrySet().iterator(), + new Function, Row>() + { + @Override + public Row apply(final Map.Entry input) + { + final TimeAndDims timeAndDims = input.getKey(); + final int rowOffset = input.getValue(); + + String[][] theDims = timeAndDims.getDims(); + + Map theVals = Maps.newLinkedHashMap(); + for (int i = 0; i < theDims.length; ++i) { + String[] dim = theDims[i]; + if (dim != null && dim.length != 0) { + theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); + } + } + + for (int i = 0; i < aggs.length; ++i) { + theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i))); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(timeAndDims.getTimestamp(), theVals); + } + } + ); + } + }; + } + + @Override + public void close() + { + try { + bufferHolder.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + class DimensionHolder + { + private final Map dimensions; + + DimensionHolder() + { + dimensions = Maps.newConcurrentMap(); + } + + void reset() + { + dimensions.clear(); + } + + DimDim add(String dimension) + { + DimDim holder = dimensions.get(dimension); + if (holder == null) { + holder = new OffheapDimDim(dimension); + dimensions.put(dimension, holder); + } else { + throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); + } + return holder; + } + + DimDim get(String dimension) + { + return dimensions.get(dimension); + } } public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable { private final TimeAndDimsComparator comparator; - private final transient IncrementalIndex incrementalIndex; + private final transient OffheapIncrementalIndex incrementalIndex; - TimeAndDimsSerializer(IncrementalIndex incrementalIndex) + TimeAndDimsSerializer(OffheapIncrementalIndex incrementalIndex) { this.comparator = new TimeAndDimsComparator(); this.incrementalIndex = incrementalIndex; diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java new file mode 100644 index 00000000000..83e652f1e66 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -0,0 +1,778 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.incremental; + +import com.google.common.base.Function; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import io.druid.collections.StupidPool; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ColumnCapabilitiesImpl; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; +import io.druid.segment.serde.ComplexMetrics; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class OnheapIncrementalIndex implements IncrementalIndex +{ + private final long minTimestamp; + private final QueryGranularity gran; + private final List> rowTransformers; + private final AggregatorFactory[] metrics; + private final Map metricIndexes; + private final Map metricTypes; + private final ImmutableList metricNames; + private final LinkedHashMap dimensionOrder; + protected final CopyOnWriteArrayList dimensions; + private final DimensionHolder dimValues; + private final Map columnCapabilities; + private final ConcurrentNavigableMap facts; + private final List aggList; + private volatile AtomicInteger numEntries = new AtomicInteger(); + // This is modified on add() in a critical section. + private ThreadLocal in = new ThreadLocal<>(); + private final boolean deserializeComplexMetrics; + + /** + * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that + * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. + * + * @param incrementalIndexSchema + * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input + * value for aggregators that return metrics other than float. + */ + public OnheapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema, + final boolean deserializeComplexMetrics + ) + { + this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); + this.gran = incrementalIndexSchema.getGran(); + this.metrics = incrementalIndexSchema.getMetrics(); + this.rowTransformers = Lists.newCopyOnWriteArrayList(); + + final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); + final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); + final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); + this.aggList = Lists.newArrayList(); + + for (int i = 0; i < metrics.length; i++) { + final String metricName = metrics[i].getName(); + metricNamesBuilder.add(metricName); + metricIndexesBuilder.put(metricName, i); + metricTypesBuilder.put(metricName, metrics[i].getTypeName()); + } + + metricNames = metricNamesBuilder.build(); + metricIndexes = metricIndexesBuilder.build(); + metricTypes = metricTypesBuilder.build(); + + this.dimensionOrder = Maps.newLinkedHashMap(); + this.dimensions = new CopyOnWriteArrayList<>(); + // This should really be more generic + List spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions(); + if (!spatialDimensions.isEmpty()) { + this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); + } + + this.columnCapabilities = Maps.newHashMap(); + for (Map.Entry entry : metricTypes.entrySet()) { + ValueType type; + if (entry.getValue().equalsIgnoreCase("float")) { + type = ValueType.FLOAT; + } else if (entry.getValue().equalsIgnoreCase("long")) { + type = ValueType.LONG; + } else { + type = ValueType.COMPLEX; + } + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(type); + columnCapabilities.put(entry.getKey(), capabilities); + } + for (String dimension : dimensions) { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dimension, capabilities); + } + for (SpatialDimensionSchema spatialDimension : spatialDimensions) { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + capabilities.setHasSpatialIndexes(true); + columnCapabilities.put(spatialDimension.getDimName(), capabilities); + } + this.dimValues = new DimensionHolder(); + this.facts = createFactsTable(); + this.deserializeComplexMetrics = deserializeComplexMetrics; + } + + protected ConcurrentNavigableMap createFactsTable() { + return new ConcurrentSkipListMap<>(); + } + + public OnheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics + ) + { + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + true + ); + } + + public OnheapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema + ) + { + this(incrementalIndexSchema, true); + } + + public OnheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics, + boolean deserializeComplexMetrics + ) + { + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + deserializeComplexMetrics + ); + } + + public InputRow formatRow(InputRow row) + { + for (Function rowTransformer : rowTransformers) { + row = rowTransformer.apply(row); + } + + if (row == null) { + throw new IAE("Row is null? How can this be?!"); + } + return row; + } + + /** + * Adds a new row. The row might correspond with another row that already exists, in which case this will + * update that row instead of inserting a new one. + *

+ *

+ * Calls to add() are thread safe. + *

+ * + * @param row the row of data to add + * + * @return the number of rows in the data set after adding the InputRow + */ + public int add(InputRow row) + { + row = formatRow(row); + if (row.getTimestampFromEpoch() < minTimestamp) { + throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp)); + } + + final List rowDimensions = row.getDimensions(); + + String[][] dims; + List overflow = null; + synchronized (dimensionOrder) { + dims = new String[dimensionOrder.size()][]; + for (String dimension : rowDimensions) { + List dimensionValues = row.getDimension(dimension); + + // Set column capabilities as data is coming in + ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); + if (capabilities == null) { + capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dimension, capabilities); + } + if (dimensionValues.size() > 1) { + capabilities.setHasMultipleValues(true); + } + + Integer index = dimensionOrder.get(dimension); + if (index == null) { + dimensionOrder.put(dimension, dimensionOrder.size()); + dimensions.add(dimension); + + if (overflow == null) { + overflow = Lists.newArrayList(); + } + overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + } else { + dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + } + } + } + + + if (overflow != null) { + // Merge overflow and non-overflow + String[][] newDims = new String[dims.length + overflow.size()][]; + System.arraycopy(dims, 0, newDims, 0, dims.length); + for (int i = 0; i < overflow.size(); ++i) { + newDims[dims.length + i] = overflow.get(i); + } + dims = newDims; + } + + final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); + Integer rowOffset; + synchronized (this) { + rowOffset = numEntries.get(); + final Integer prev = facts.putIfAbsent(key, rowOffset); + if (prev != null) { + rowOffset = prev; + } else { + Aggregator[] aggs = new Aggregator[metrics.length]; + for (int i = 0; i < metrics.length; i++) { + final AggregatorFactory agg = metrics[i]; + aggs[i] = agg.factorize( + new ColumnSelectorFactory() + { + @Override + public LongColumnSelector makeLongColumnSelector(final String columnName) + { + if(columnName.equals(Column.TIME_COLUMN_NAME)){ + return new LongColumnSelector() + { + @Override + public long get() + { + return in.get().getTimestampFromEpoch(); + } + }; + } + return new LongColumnSelector() + { + @Override + public long get() + { + return in.get().getLongMetric(columnName); + } + }; + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(final String columnName) + { + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.get().getFloatMetric(columnName); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(final String column) + { + final String typeName = agg.getTypeName(); + + final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + public Object get() + { + return in.get().getRaw(column); + } + }; + + if (!deserializeComplexMetrics) { + return rawColumnSelector; + } else { + if (typeName.equals("float")) { + return rawColumnSelector; + } + + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } + + final ComplexMetricExtractor extractor = serde.getExtractor(); + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } + + @Override + public Object get() + { + return extractor.extractValue(in.get(), column); + } + }; + } + } + + @Override + public DimensionSelector makeDimensionSelector(final String dimension) + { + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + final List dimensionValues = in.get().getDimension(dimension); + final ArrayList vals = Lists.newArrayList(); + if (dimensionValues != null) { + for (int i = 0; i < dimensionValues.size(); ++i) { + vals.add(i); + } + } + + return new IndexedInts() + { + @Override + public int size() + { + return vals.size(); + } + + @Override + public int get(int index) + { + return vals.get(index); + } + + @Override + public Iterator iterator() + { + return vals.iterator(); + } + }; + } + + @Override + public int getValueCardinality() + { + throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); + } + + @Override + public String lookupName(int id) + { + return in.get().getDimension(dimension).get(id); + } + + @Override + public int lookupId(String name) + { + return in.get().getDimension(dimension).indexOf(name); + } + }; + } + } + ); + } + aggList.add(aggs); + numEntries.incrementAndGet(); + } + } + in.set(row); + Aggregator[] aggs = aggList.get(rowOffset); + for (int i = 0; i < aggs.length; i++) { + synchronized (aggs[i]) { + aggs[i].aggregate(); + } + } + in.set(null); + return numEntries.get(); + } + + public boolean isEmpty() + { + return numEntries.get() == 0; + } + + public int size() + { + return numEntries.get(); + } + + @Override + public float getMetricFloatValue(int rowOffset, int aggOffset) + { + return aggList.get(rowOffset)[aggOffset].getFloat(); + } + + @Override + public long getMetricLongValue(int rowOffset, int aggOffset) + { + return aggList.get(rowOffset)[aggOffset].getLong(); + } + + @Override + public Object getMetricObjectValue(int rowOffset, int aggOffset) + { + return aggList.get(rowOffset)[aggOffset].get(); + } + + public long getMinTimeMillis() + { + return facts.firstKey().getTimestamp(); + } + + public long getMaxTimeMillis() + { + return facts.lastKey().getTimestamp(); + } + + private String[] getDimVals(final DimDim dimLookup, final List dimValues) + { + final String[] retVal = new String[dimValues.size()]; + + int count = 0; + for (String dimValue : dimValues) { + String canonicalDimValue = dimLookup.get(dimValue); + if (!dimLookup.contains(canonicalDimValue)) { + dimLookup.add(dimValue); + } + retVal[count] = canonicalDimValue; + count++; + } + Arrays.sort(retVal); + + return retVal; + } + + public AggregatorFactory[] getMetricAggs() + { + return metrics; + } + + public List getDimensions() + { + return dimensions; + } + + public String getMetricType(String metric) + { + return metricTypes.get(metric); + } + + public long getMinTimestamp() + { + return minTimestamp; + } + + public QueryGranularity getGranularity() + { + return gran; + } + + public Interval getInterval() + { + return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis())); + } + + public DateTime getMinTime() + { + return isEmpty() ? null : new DateTime(getMinTimeMillis()); + } + + public DateTime getMaxTime() + { + return isEmpty() ? null : new DateTime(getMaxTimeMillis()); + } + + public DimDim getDimension(String dimension) + { + return isEmpty() ? null : dimValues.get(dimension); + } + + public Integer getDimensionIndex(String dimension) + { + return dimensionOrder.get(dimension); + } + + public List getMetricNames() + { + return metricNames; + } + + public Integer getMetricIndex(String metricName) + { + return metricIndexes.get(metricName); + } + + Aggregator getAggregator(int rowOffset, int metricIndex) + { + return aggList.get(rowOffset)[metricIndex]; + } + + public ColumnCapabilities getCapabilities(String column) + { + return columnCapabilities.get(column); + } + + public ConcurrentNavigableMap getFacts() + { + return facts; + } + + public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) + { + return facts.subMap(start, end); + } + + @Override + public Iterator iterator() + { + return iterableWithPostAggregations(null).iterator(); + } + + public Iterable iterableWithPostAggregations(final List postAggs) + { + return new Iterable() + { + @Override + public Iterator iterator() + { + return Iterators.transform( + facts.entrySet().iterator(), + new Function, Row>() + { + @Override + public Row apply(final Map.Entry input) + { + final TimeAndDims timeAndDims = input.getKey(); + final int rowOffset = input.getValue(); + + String[][] theDims = timeAndDims.getDims(); + + Map theVals = Maps.newLinkedHashMap(); + for (int i = 0; i < theDims.length; ++i) { + String[] dim = theDims[i]; + if (dim != null && dim.length != 0) { + theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); + } + } + Aggregator[] aggs = aggList.get(rowOffset); + for (int i = 0; i < aggs.length; ++i) { + theVals.put(metrics[i].getName(), aggs[i].get()); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(timeAndDims.getTimestamp(), theVals); + } + } + ); + } + }; + } + + @Override + public void close() + { + } + + class DimensionHolder + { + private final Map dimensions; + + DimensionHolder() + { + dimensions = Maps.newConcurrentMap(); + } + + void reset() + { + dimensions.clear(); + } + + DimDim add(String dimension) + { + DimDim holder = dimensions.get(dimension); + if (holder == null) { + holder = new DimDimImpl(); + dimensions.put(dimension, holder); + } else { + throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); + } + return holder; + } + + DimDim get(String dimension) + { + return dimensions.get(dimension); + } + } + + private static class DimDimImpl implements DimDim{ + private final Map falseIds; + private final Map falseIdsReverse; + private volatile String[] sortedVals = null; + final ConcurrentMap poorMansInterning = Maps.newConcurrentMap(); + + + public DimDimImpl() + { + BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); + falseIds = biMap; + falseIdsReverse = biMap.inverse(); + } + + /** + * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` + * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) + */ + public String get(String str) + { + String prev = poorMansInterning.putIfAbsent(str, str); + return prev != null ? prev : str; + } + + public int getId(String value) + { + if (value == null) { + value = ""; + } + final Integer id = falseIds.get(value); + return id == null ? -1 : id; + } + + public String getValue(int id) + { + return falseIdsReverse.get(id); + } + + public boolean contains(String value) + { + return falseIds.containsKey(value); + } + + public int size() + { + return falseIds.size(); + } + + public synchronized int add(String value) + { + int id = falseIds.size(); + falseIds.put(value, id); + return id; + } + + public int getSortedId(String value) + { + assertSorted(); + return Arrays.binarySearch(sortedVals, value); + } + + public String getSortedValue(int index) + { + assertSorted(); + return sortedVals[index]; + } + + public void sort() + { + if (sortedVals == null) { + sortedVals = new String[falseIds.size()]; + + int index = 0; + for (String value : falseIds.keySet()) { + sortedVals[index++] = value; + } + Arrays.sort(sortedVals); + } + } + + private void assertSorted() + { + if (sortedVals == null) { + throw new ISE("Call sort() before calling the getSorted* methods."); + } + } + + public boolean compareCannonicalValues(String s1, String s2) + { + return s1 ==s2; + } + } +} diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java index 95d654b9153..d4bd729a607 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -39,6 +39,7 @@ import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import junit.framework.Assert; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -52,9 +53,8 @@ public class TimeseriesQueryRunnerBonusTest @Test public void testOneRowAtATime() throws Exception { - final IncrementalIndex oneRowIndex = new IncrementalIndex( - new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{}, - TestQueryRunners.pool + final IncrementalIndex oneRowIndex = new OnheapIncrementalIndex( + new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{} ); List> results; diff --git a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java index ae781ca626c..17a1df19cb6 100644 --- a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java @@ -28,6 +28,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.Column; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; +import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -48,11 +49,10 @@ public class EmptyIndexTest } tmpDir.deleteOnExit(); - IncrementalIndex emptyIndex = new IncrementalIndex( + IncrementalIndex emptyIndex = new OnheapIncrementalIndex( 0, QueryGranularity.NONE, - new AggregatorFactory[0], - TestQueryRunners.pool + new AggregatorFactory[0] ); IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter( new Interval("2012-08-01/P3D"), diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index c01c11500bb..8dd608f7fe5 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -30,6 +30,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.Column; import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import junit.framework.Assert; import org.apache.commons.io.FileUtils; import org.junit.Test; @@ -71,7 +72,7 @@ public class IndexMergerTest final long timestamp = System.currentTimeMillis(); IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(timestamp); - IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool); + IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); toPersist2.add( new MapBasedInputRow( @@ -127,8 +128,8 @@ public class IndexMergerTest @Test public void testPersistEmptyColumn() throws Exception { - final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool); - final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool); + final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); + final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); final File tmpDir1 = Files.createTempDir(); final File tmpDir2 = Files.createTempDir(); final File tmpDir3 = Files.createTempDir(); diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index 84283c8082e..df234cf8c7e 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -40,6 +40,7 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -134,7 +135,7 @@ public class SchemalessIndex final long timestamp = new DateTime(event.get(TIMESTAMP)).getMillis(); if (theIndex == null) { - theIndex = new IncrementalIndex(timestamp, QueryGranularity.MINUTE, METRIC_AGGS, TestQueryRunners.pool); + theIndex = new OnheapIncrementalIndex(timestamp, QueryGranularity.MINUTE, METRIC_AGGS); } final List dims = Lists.newArrayList(); @@ -330,8 +331,8 @@ public class SchemalessIndex } } - final IncrementalIndex rowIndex = new IncrementalIndex( - timestamp, QueryGranularity.MINUTE, METRIC_AGGS, TestQueryRunners.pool + final IncrementalIndex rowIndex = new OnheapIncrementalIndex( + timestamp, QueryGranularity.MINUTE, METRIC_AGGS ); rowIndex.add( @@ -360,8 +361,8 @@ public class SchemalessIndex String filename = resource.getFile(); log.info("Realtime loading index file[%s]", filename); - final IncrementalIndex retVal = new IncrementalIndex( - new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.MINUTE, aggs, TestQueryRunners.pool + final IncrementalIndex retVal = new OnheapIncrementalIndex( + new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.MINUTE, aggs ); try { diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 35e5630c7ab..75bf9a4502b 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -39,6 +39,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OffheapIncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -164,12 +165,12 @@ public class TestIndex if (useOffheap) { retVal = new OffheapIncrementalIndex( schema, - TestQueryRunners.pool + TestQueryRunners.pool, + true ); } else { - retVal = new IncrementalIndex( - schema, - TestQueryRunners.pool + retVal = new OnheapIncrementalIndex( + schema ); } diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 410c9d3ce6c..f2795ca604a 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -27,6 +27,7 @@ import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import junit.framework.Assert; import org.junit.Test; @@ -46,9 +47,8 @@ public class IncrementalIndexTest public static IncrementalIndex createCaseInsensitiveIndex(long timestamp) { - IncrementalIndex index = new IncrementalIndex( - 0L, QueryGranularity.NONE, new AggregatorFactory[]{}, - TestQueryRunners.pool + IncrementalIndex index = new OnheapIncrementalIndex( + 0L, QueryGranularity.NONE, new AggregatorFactory[]{} ); index.add( @@ -106,11 +106,10 @@ public class IncrementalIndexTest @Test public void testConcurrentAdd() throws Exception { - final IncrementalIndex index = new IncrementalIndex( + final IncrementalIndex index = new OnheapIncrementalIndex( 0L, QueryGranularity.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - TestQueryRunners.pool + new AggregatorFactory[]{new CountAggregatorFactory("count")} ); final int threadCount = 10; final int elementsPerThread = 200; diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index b4c94e18dfb..d91ac51f7b2 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -53,6 +53,7 @@ import io.druid.segment.Segment; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; @@ -108,7 +109,7 @@ public class SpatialFilterBonusTest private static IncrementalIndex makeIncrementalIndex() throws IOException { - IncrementalIndex theIndex = new IncrementalIndex( + IncrementalIndex theIndex = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -124,7 +125,6 @@ public class SpatialFilterBonusTest ) ) ).build(), - TestQueryRunners.pool, false ); theIndex.add( @@ -239,7 +239,7 @@ public class SpatialFilterBonusTest private static QueryableIndex makeMergedQueryableIndex() { try { - IncrementalIndex first = new IncrementalIndex( + IncrementalIndex first = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -256,10 +256,9 @@ public class SpatialFilterBonusTest ) ).build(), - TestQueryRunners.pool, false ); - IncrementalIndex second = new IncrementalIndex( + IncrementalIndex second = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -275,10 +274,9 @@ public class SpatialFilterBonusTest ) ) ).build(), - TestQueryRunners.pool, false ); - IncrementalIndex third = new IncrementalIndex( + IncrementalIndex third = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -295,7 +293,6 @@ public class SpatialFilterBonusTest ) ).build(), - TestQueryRunners.pool, false ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index f3397d3b4a5..75c7c8cc8ee 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -54,6 +54,7 @@ import io.druid.segment.Segment; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; @@ -104,7 +105,7 @@ public class SpatialFilterTest private static IncrementalIndex makeIncrementalIndex() throws IOException { - IncrementalIndex theIndex = new IncrementalIndex( + IncrementalIndex theIndex = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -125,7 +126,6 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool, false ); theIndex.add( @@ -267,7 +267,7 @@ public class SpatialFilterTest private static QueryableIndex makeMergedQueryableIndex() { try { - IncrementalIndex first = new IncrementalIndex( + IncrementalIndex first = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -288,10 +288,9 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool, false ); - IncrementalIndex second = new IncrementalIndex( + IncrementalIndex second = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -312,10 +311,9 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool, false ); - IncrementalIndex third = new IncrementalIndex( + IncrementalIndex third = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) .withQueryGranularity(QueryGranularity.DAY) .withMetrics(METRIC_AGGS) @@ -336,7 +334,6 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool, false ); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index bc15e5b930e..d65da87cbf6 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -63,9 +63,8 @@ public class IncrementalIndexStorageAdapterTest @Test public void testSanity() throws Exception { - IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - TestQueryRunners.pool + IncrementalIndex index = new OnheapIncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} ); index.add( @@ -111,8 +110,8 @@ public class IncrementalIndexStorageAdapterTest @Test public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception { - IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, TestQueryRunners.pool + IncrementalIndex index = new OnheapIncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} ); index.add( @@ -197,9 +196,8 @@ public class IncrementalIndexStorageAdapterTest @Test public void testResetSanity() { - IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - TestQueryRunners.pool + IncrementalIndex index = new OnheapIncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} ); @@ -250,9 +248,8 @@ public class IncrementalIndexStorageAdapterTest @Test public void testSingleValueTopN() { - IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - TestQueryRunners.pool + IncrementalIndex index = new OnheapIncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} ); DateTime t = DateTime.now(); @@ -306,9 +303,8 @@ public class IncrementalIndexStorageAdapterTest @Test public void testFilterByNull() throws Exception { - IncrementalIndex index = new IncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - TestQueryRunners.pool + IncrementalIndex index = new OnheapIncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} ); index.add( diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index d3e9a0b26a2..f145d6c8660 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OffheapIncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireHydrant; @@ -191,12 +192,12 @@ public class Sink implements Iterable if (config.isIngestOffheap()) { newIndex = new OffheapIncrementalIndex( indexSchema, - new OffheapBufferPool(bufferSize) + new OffheapBufferPool(bufferSize), + true ); } else { - newIndex = new IncrementalIndex( - indexSchema, - new OffheapBufferPool(bufferSize) + newIndex = new OnheapIncrementalIndex( + indexSchema ); }