tests passing with on heap incremental index

This commit is contained in:
nishantmonu51 2014-12-02 22:29:28 +05:30
parent 59542c41f8
commit eac776f1a7
31 changed files with 1627 additions and 886 deletions

View File

@ -89,6 +89,12 @@ public class ApproximateHistogramAggregator implements Aggregator
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -87,6 +87,12 @@ public class ApproximateHistogramFoldingAggregator implements Aggregator
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("ApproximateHistogramAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -46,6 +46,7 @@ import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
@ -645,12 +646,12 @@ public class IndexGeneratorJob implements Jobby
if (tuningConfig.isIngestOffheap()) {
return new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
new OffheapBufferPool(bufferSize),
true
);
} else {
return new IncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
return new OnheapIncrementalIndex(
indexSchema
);
}
}

View File

@ -39,4 +39,6 @@ public interface Aggregator {
float getFloat();
String getName();
void close();
long getLong();
}

View File

@ -64,6 +64,12 @@ public class Aggregators
{
}
@Override
public long getLong()
{
return 0;
}
};
}

View File

@ -64,6 +64,12 @@ public class CountAggregator implements Aggregator
return (float) count;
}
@Override
public long getLong()
{
return count;
}
@Override
public String getName()
{

View File

@ -80,6 +80,12 @@ public class DoubleSumAggregator implements Aggregator
return (float) sum;
}
@Override
public long getLong()
{
return (long) sum;
}
@Override
public String getName()
{

View File

@ -58,6 +58,12 @@ public class FilteredAggregator implements Aggregator
return delegate.getFloat();
}
@Override
public long getLong()
{
return delegate.getLong();
}
@Override
public String getName()
{

View File

@ -74,6 +74,12 @@ public class HistogramAggregator implements Aggregator
throw new UnsupportedOperationException("HistogramAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("HistogramAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -76,6 +76,12 @@ public class JavaScriptAggregator implements Aggregator
return (float) current;
}
@Override
public long getLong()
{
return (long) current;
}
@Override
public String getName()
{

View File

@ -79,6 +79,12 @@ public class LongSumAggregator implements Aggregator
return (float) sum;
}
@Override
public long getLong()
{
return sum;
}
@Override
public String getName()
{

View File

@ -71,6 +71,12 @@ public class MaxAggregator implements Aggregator
return (float) max;
}
@Override
public long getLong()
{
return (long) max;
}
@Override
public String getName()
{

View File

@ -71,6 +71,12 @@ public class MinAggregator implements Aggregator
return (float) min;
}
@Override
public long getLong()
{
return (long) min;
}
@Override
public String getName()
{

View File

@ -126,6 +126,12 @@ public class CardinalityAggregator implements Aggregator
throw new UnsupportedOperationException("CardinalityAggregator does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("CardinalityAggregator does not support getLong()");
}
@Override
public String getName()
{

View File

@ -66,6 +66,12 @@ public class HyperUniquesAggregator implements Aggregator
throw new UnsupportedOperationException();
}
@Override
public long getLong()
{
throw new UnsupportedOperationException();
}
@Override
public String getName()
{

View File

@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List;
@ -86,13 +87,12 @@ public class GroupByQueryHelper
false
);
} else {
index = new IncrementalIndex(
index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool,
false
);
}

View File

@ -20,687 +20,93 @@
package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class IncrementalIndex implements Iterable<Row>, Closeable
public interface IncrementalIndex extends Iterable<Row>, Closeable
{
private final long minTimestamp;
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final BufferAggregator[] aggs;
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final LinkedHashMap<String, Integer> dimensionOrder;
protected final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder;
private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param incrementalIndexSchema
* @param bufferPool
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*/
public IncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool,
final boolean deserializeComplexMetrics
)
List<String> getDimensions();
ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
Integer getDimensionIndex(String dimension);
List<String> getMetricNames();
DimDim getDimension(String dimension);
AggregatorFactory[] getMetricAggs();
Interval getInterval();
DateTime getMinTime();
DateTime getMaxTime();
boolean isEmpty();
ConcurrentNavigableMap<TimeAndDims,Integer> getSubMap(TimeAndDims start, TimeAndDims end);
Integer getMetricIndex(String columnName);
String getMetricType(String metric);
ColumnCapabilities getCapabilities(String column);
int size();
float getMetricFloatValue(int rowOffset, int aggOffset);
long getMetricLongValue(int rowOffset, int aggOffset);
Object getMetricObjectValue(int rowOffset, int aggOffset);
Iterable<Row> iterableWithPostAggregations(List<PostAggregator> postAggregatorSpecs);
int add(InputRow inputRow);
void close();
InputRow formatRow(InputRow parse);
static interface DimDim
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = Lists.newCopyOnWriteArrayList();
public String get(String value);
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggs = new BufferAggregator[metrics.length];
this.aggPositionOffsets = new int[metrics.length];
int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if(columnName.equals(Column.TIME_COLUMN_NAME)){
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
public int getId(String value);
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
public String getValue(int id);
@Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
public boolean contains(String value);
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
public int size();
@Override
public Object get()
{
return in.get().getRaw(column);
}
};
public int add(String value);
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
public int getSortedId(String value);
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
public String getSortedValue(int index);
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
public void sort();
@Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
}
);
aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize();
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
}
metricNames = metricNamesBuilder.build();
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.totalAggSize = currAggSize;
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>();
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
}
for (String dimension : dimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
this.facts = createFactsTable();
}
protected ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable() {
return new ConcurrentSkipListMap<>();
}
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
bufferPool,
true
);
}
public IncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool
)
{
this(incrementalIndexSchema, bufferPool, true);
}
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
bufferPool,
deserializeComplexMetrics
);
}
public InputRow formatRow(InputRow row)
{
for (Function<InputRow, InputRow> rowTransformer : rowTransformers) {
row = rowTransformer.apply(row);
}
if (row == null) {
throw new IAE("Row is null? How can this be?!");
}
return row;
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p/>
* <p/>
* Calls to add() are thread safe.
* <p/>
*
* @param row the row of data to add
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row)
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
final List<String> rowDimensions = row.getDimensions();
String[][] dims;
List<String[]> overflow = null;
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
List<String> dimensionValues = row.getDimension(dimension);
// Set column capabilities as data is coming in
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
if (dimensionValues.size() > 1) {
capabilities.setHasMultipleValues(true);
}
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
}
dims = newDims;
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Integer rowOffset;
synchronized (this) {
rowOffset = totalAggSize * numEntries.get();
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
if (rowOffset + totalAggSize > bufferHolder.get().limit()) {
facts.remove(key);
throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get());
}
numEntries.incrementAndGet();
for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
}
in.set(row);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
in.set(null);
return numEntries.get();
}
public boolean isEmpty()
{
return numEntries.get() == 0;
}
public int size()
{
return numEntries.get();
}
public long getMinTimeMillis()
{
return facts.firstKey().getTimestamp();
}
public long getMaxTimeMillis()
{
return facts.lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (!dimLookup.contains(canonicalDimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
count++;
}
Arrays.sort(retVal);
return retVal;
}
public AggregatorFactory[] getMetricAggs()
{
return metrics;
}
public List<String> getDimensions()
{
return dimensions;
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
}
public long getMinTimestamp()
{
return minTimestamp;
}
public QueryGranularity getGranularity()
{
return gran;
}
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
}
public DateTime getMinTime()
{
return isEmpty() ? null : new DateTime(getMinTimeMillis());
}
public DateTime getMaxTime()
{
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
DimDim getDimension(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
}
Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
}
List<String> getMetricNames()
{
return metricNames;
}
Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
}
int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
}
ByteBuffer getMetricBuffer()
{
return bufferHolder.get();
}
BufferAggregator getAggregator(int metricIndex)
{
return aggs[metricIndex];
}
ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return facts.subMap(start, end);
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null).iterator();
}
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
{
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
}
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i)));
}
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
}
);
}
};
}
@Override
public void close()
{
try {
bufferHolder.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
}
void reset()
{
dimensions.clear();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = createDimDim(dimension);
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
}
DimDim get(String dimension)
{
return dimensions.get(dimension);
}
}
protected DimDim createDimDim(String dimension){
return new DimDimImpl();
public boolean compareCannonicalValues(String s1, String s2);
}
static class TimeAndDims implements Comparable<TimeAndDims>
@ -773,134 +179,18 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
"timestamp=" + new DateTime(timestamp) +
", dims=" + Lists.transform(
Arrays.asList(dims), new Function<String[], Object>()
{
@Override
public Object apply(@Nullable String[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
{
@Override
public Object apply(@Nullable String[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
}
return Arrays.asList(input);
}
}
return Arrays.asList(input);
}
}
) +
'}';
}
}
static interface DimDim
{
public String get(String value);
public int getId(String value);
public String getValue(int id);
public boolean contains(String value);
public int size();
public int add(String value);
public int getSortedId(String value);
public String getSortedValue(int index);
public void sort();
public boolean compareCannonicalValues(String s1, String s2);
}
private static class DimDimImpl implements DimDim{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
public DimDimImpl()
{
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap;
falseIdsReverse = biMap.inverse();
}
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String str)
{
String prev = poorMansInterning.putIfAbsent(str, str);
return prev != null ? prev : str;
}
public int getId(String value)
{
if (value == null) {
value = "";
}
final Integer id = falseIds.get(value);
return id == null ? -1 : id;
}
public String getValue(int id)
{
return falseIdsReverse.get(id);
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size()
{
return falseIds.size();
}
public synchronized int add(String value)
{
int id = falseIds.size();
falseIds.put(value, id);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[falseIds.size()];
int index = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
}
Arrays.sort(sortedVals);
}
}
private void assertSorted()
{
if (sortedVals == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}
public boolean compareCannonicalValues(String s1, String s2)
{
return s1 ==s2;
}
}
}

View File

@ -205,8 +205,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
Object[] metrics = new Object[index.getMetricAggs().length];
for (int i = 0; i < metrics.length; i++) {
metrics[i] = index.getAggregator(i)
.get(index.getMetricBuffer(), index.getMetricPosition(rowOffset, i));
metrics[i] = index.getMetricObjectValue(rowOffset, i);
}
return new Rowboat(

View File

@ -1,4 +1,3 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
@ -30,7 +29,6 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory;
@ -352,17 +350,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final int metricIndex = metricIndexInt;
final BufferAggregator agg = index.getAggregator(metricIndex);
return new FloatColumnSelector()
{
@Override
public float get()
{
return agg.getFloat(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
);
return index.getMetricFloatValue(currEntry.getValue(), metricIndex);
}
};
}
@ -370,7 +363,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
if(columnName.equals(Column.TIME_COLUMN_NAME)){
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
{
@Override
@ -393,16 +386,15 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final int metricIndex = metricIndexInt;
final BufferAggregator agg = index.getAggregator(metricIndex);
return new LongColumnSelector()
{
@Override
public long get()
{
return agg.getLong(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
return index.getMetricLongValue(
currEntry.getValue(),
metricIndex
);
}
};
@ -417,7 +409,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(column));
final BufferAggregator agg = index.getAggregator(metricIndex);
return new ObjectColumnSelector()
{
@Override
@ -429,9 +420,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public Object get()
{
return agg.get(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
return index.getMetricObjectValue(
currEntry.getValue(),
metricIndex
);
}
};
@ -453,7 +444,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public Object get()
{
final String[][] dims = currEntry.getKey().getDims();
if(dimensionIndex >= dims.length) {
if (dimensionIndex >= dims.length) {
return null;
}
final String[] dimVals = dims[dimensionIndex];
@ -562,7 +553,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
for (String dimVal : dims[dimIndex]) {
if (dimDim.compareCannonicalValues(id,dimVal)) {
if (dimDim.compareCannonicalValues(id, dimVal)) {
return true;
}
}

View File

@ -20,84 +20,693 @@
package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.mapdb.BTreeKeySerializer;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
public class OffheapIncrementalIndex extends IncrementalIndex
public class OffheapIncrementalIndex implements IncrementalIndex
{
private volatile DB db;
private volatile DB factsDb;
private final long minTimestamp;
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final BufferAggregator[] aggs;
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final LinkedHashMap<String, Integer> dimensionOrder;
protected final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder;
private final DB db;
private final DB factsDb;
private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param incrementalIndexSchema
* @param bufferPool
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*/
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool
StupidPool<ByteBuffer> bufferPool,
final boolean deserializeComplexMetrics
)
{
super(incrementalIndexSchema, bufferPool);
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = Lists.newCopyOnWriteArrayList();
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggs = new BufferAggregator[metrics.length];
this.aggPositionOffsets = new int[metrics.length];
int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if(columnName.equals(Column.TIME_COLUMN_NAME)){
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(column);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
}
);
aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize();
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
}
metricNames = metricNamesBuilder.build();
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.totalAggSize = currAggSize;
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>();
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
}
for (String dimension : dimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable()
.asyncWriteEnable()
.cacheSoftRefEnable();
factsDb = dbMaker.make();
db = dbMaker.make();
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer)
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
}
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics
)
{
super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics);
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
bufferPool,
deserializeComplexMetrics
);
}
@Override
protected synchronized ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable()
public InputRow formatRow(InputRow row)
{
if (factsDb == null) {
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable()
.asyncWriteEnable()
.cacheSoftRefEnable();
factsDb = dbMaker.make();
db = dbMaker.make();
for (Function<InputRow, InputRow> rowTransformer : rowTransformers) {
row = rowTransformer.apply(row);
}
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
return factsDb.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer)
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
if (row == null) {
throw new IAE("Row is null? How can this be?!");
}
return row;
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p/>
* <p/>
* Calls to add() are thread safe.
* <p/>
*
* @param row the row of data to add
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row)
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
final List<String> rowDimensions = row.getDimensions();
String[][] dims;
List<String[]> overflow = null;
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
List<String> dimensionValues = row.getDimension(dimension);
// Set column capabilities as data is coming in
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
if (dimensionValues.size() > 1) {
capabilities.setHasMultipleValues(true);
}
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
}
dims = newDims;
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Integer rowOffset;
synchronized (this) {
rowOffset = totalAggSize * numEntries.get();
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
if (rowOffset + totalAggSize > bufferHolder.get().limit()) {
facts.remove(key);
throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get());
}
numEntries.incrementAndGet();
for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
}
in.set(row);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
in.set(null);
return numEntries.get();
}
public boolean isEmpty()
{
return numEntries.get() == 0;
}
public int size()
{
return numEntries.get();
}
public long getMinTimeMillis()
{
return facts.firstKey().getTimestamp();
}
public long getMaxTimeMillis()
{
return facts.lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (!dimLookup.contains(canonicalDimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
count++;
}
Arrays.sort(retVal);
return retVal;
}
public AggregatorFactory[] getMetricAggs()
{
return metrics;
}
public List<String> getDimensions()
{
return dimensions;
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
}
public long getMinTimestamp()
{
return minTimestamp;
}
public QueryGranularity getGranularity()
{
return gran;
}
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
}
public DateTime getMinTime()
{
return isEmpty() ? null : new DateTime(getMinTimeMillis());
}
public DateTime getMaxTime()
{
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
public DimDim getDimension(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
}
public Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
}
public List<String> getMetricNames()
{
return metricNames;
}
public Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
}
private int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
}
@Override
protected DimDim createDimDim(String dimension)
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return new OffheapDimDim(dimension);
return aggs[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return aggs[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
return aggs[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
public ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return facts.subMap(start, end);
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null).iterator();
}
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
{
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
}
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i)));
}
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
}
);
}
};
}
@Override
public void close()
{
try {
bufferHolder.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
}
void reset()
{
dimensions.clear();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = new OffheapDimDim(dimension);
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
}
DimDim get(String dimension)
{
return dimensions.get(dimension);
}
}
public static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
{
private final TimeAndDimsComparator comparator;
private final transient IncrementalIndex incrementalIndex;
private final transient OffheapIncrementalIndex incrementalIndex;
TimeAndDimsSerializer(IncrementalIndex incrementalIndex)
TimeAndDimsSerializer(OffheapIncrementalIndex incrementalIndex)
{
this.comparator = new TimeAndDimsComparator();
this.incrementalIndex = incrementalIndex;

View File

@ -0,0 +1,778 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class OnheapIncrementalIndex implements IncrementalIndex
{
private final long minTimestamp;
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final LinkedHashMap<String, Integer> dimensionOrder;
protected final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final List<Aggregator[]> aggList;
private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
private final boolean deserializeComplexMetrics;
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param incrementalIndexSchema
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*/
public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = Lists.newCopyOnWriteArrayList();
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggList = Lists.newArrayList();
for (int i = 0; i < metrics.length; i++) {
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
}
metricNames = metricNamesBuilder.build();
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>();
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
}
for (String dimension : dimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.dimValues = new DimensionHolder();
this.facts = createFactsTable();
this.deserializeComplexMetrics = deserializeComplexMetrics;
}
protected ConcurrentNavigableMap<TimeAndDims, Integer> createFactsTable() {
return new ConcurrentSkipListMap<>();
}
public OnheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true
);
}
public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema
)
{
this(incrementalIndexSchema, true);
}
public OnheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
deserializeComplexMetrics
);
}
public InputRow formatRow(InputRow row)
{
for (Function<InputRow, InputRow> rowTransformer : rowTransformers) {
row = rowTransformer.apply(row);
}
if (row == null) {
throw new IAE("Row is null? How can this be?!");
}
return row;
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p/>
* <p/>
* Calls to add() are thread safe.
* <p/>
*
* @param row the row of data to add
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row)
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
final List<String> rowDimensions = row.getDimensions();
String[][] dims;
List<String[]> overflow = null;
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
List<String> dimensionValues = row.getDimension(dimension);
// Set column capabilities as data is coming in
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
if (dimensionValues.size() > 1) {
capabilities.setHasMultipleValues(true);
}
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
}
dims = newDims;
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Integer rowOffset;
synchronized (this) {
rowOffset = numEntries.get();
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
Aggregator[] aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
new ColumnSelectorFactory()
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if(columnName.equals(Column.TIME_COLUMN_NAME)){
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(column);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
}
);
}
aggList.add(aggs);
numEntries.incrementAndGet();
}
}
in.set(row);
Aggregator[] aggs = aggList.get(rowOffset);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate();
}
}
in.set(null);
return numEntries.get();
}
public boolean isEmpty()
{
return numEntries.get() == 0;
}
public int size()
{
return numEntries.get();
}
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return aggList.get(rowOffset)[aggOffset].getFloat();
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return aggList.get(rowOffset)[aggOffset].getLong();
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
return aggList.get(rowOffset)[aggOffset].get();
}
public long getMinTimeMillis()
{
return facts.firstKey().getTimestamp();
}
public long getMaxTimeMillis()
{
return facts.lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (!dimLookup.contains(canonicalDimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
count++;
}
Arrays.sort(retVal);
return retVal;
}
public AggregatorFactory[] getMetricAggs()
{
return metrics;
}
public List<String> getDimensions()
{
return dimensions;
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
}
public long getMinTimestamp()
{
return minTimestamp;
}
public QueryGranularity getGranularity()
{
return gran;
}
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
}
public DateTime getMinTime()
{
return isEmpty() ? null : new DateTime(getMinTimeMillis());
}
public DateTime getMaxTime()
{
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
public DimDim getDimension(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
}
public Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
}
public List<String> getMetricNames()
{
return metricNames;
}
public Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
}
Aggregator getAggregator(int rowOffset, int metricIndex)
{
return aggList.get(rowOffset)[metricIndex];
}
public ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return facts.subMap(start, end);
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null).iterator();
}
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
{
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
}
Aggregator[] aggs = aggList.get(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), aggs[i].get());
}
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
}
);
}
};
}
@Override
public void close()
{
}
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
}
void reset()
{
dimensions.clear();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = new DimDimImpl();
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
}
DimDim get(String dimension)
{
return dimensions.get(dimension);
}
}
private static class DimDimImpl implements DimDim{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
public DimDimImpl()
{
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap;
falseIdsReverse = biMap.inverse();
}
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String str)
{
String prev = poorMansInterning.putIfAbsent(str, str);
return prev != null ? prev : str;
}
public int getId(String value)
{
if (value == null) {
value = "";
}
final Integer id = falseIds.get(value);
return id == null ? -1 : id;
}
public String getValue(int id)
{
return falseIdsReverse.get(id);
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size()
{
return falseIds.size();
}
public synchronized int add(String value)
{
int id = falseIds.size();
falseIds.put(value, id);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[falseIds.size()];
int index = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
}
Arrays.sort(sortedVals);
}
}
private void assertSorted()
{
if (sortedVals == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}
public boolean compareCannonicalValues(String s1, String s2)
{
return s1 ==s2;
}
}
}

View File

@ -39,6 +39,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -52,9 +53,8 @@ public class TimeseriesQueryRunnerBonusTest
@Test
public void testOneRowAtATime() throws Exception
{
final IncrementalIndex oneRowIndex = new IncrementalIndex(
new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{},
TestQueryRunners.pool
final IncrementalIndex oneRowIndex = new OnheapIncrementalIndex(
new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{}
);
List<Result<TimeseriesResultValue>> results;

View File

@ -28,6 +28,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -48,11 +49,10 @@ public class EmptyIndexTest
}
tmpDir.deleteOnExit();
IncrementalIndex emptyIndex = new IncrementalIndex(
IncrementalIndex emptyIndex = new OnheapIncrementalIndex(
0,
QueryGranularity.NONE,
new AggregatorFactory[0],
TestQueryRunners.pool
new AggregatorFactory[0]
);
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(
new Interval("2012-08-01/P3D"),

View File

@ -30,6 +30,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
@ -71,7 +72,7 @@ public class IndexMergerTest
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(timestamp);
IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
toPersist2.add(
new MapBasedInputRow(
@ -127,8 +128,8 @@ public class IndexMergerTest
@Test
public void testPersistEmptyColumn() throws Exception
{
final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, TestQueryRunners.pool);
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
final File tmpDir1 = Files.createTempDir();
final File tmpDir2 = Files.createTempDir();
final File tmpDir3 = Files.createTempDir();

View File

@ -40,6 +40,7 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -134,7 +135,7 @@ public class SchemalessIndex
final long timestamp = new DateTime(event.get(TIMESTAMP)).getMillis();
if (theIndex == null) {
theIndex = new IncrementalIndex(timestamp, QueryGranularity.MINUTE, METRIC_AGGS, TestQueryRunners.pool);
theIndex = new OnheapIncrementalIndex(timestamp, QueryGranularity.MINUTE, METRIC_AGGS);
}
final List<String> dims = Lists.newArrayList();
@ -330,8 +331,8 @@ public class SchemalessIndex
}
}
final IncrementalIndex rowIndex = new IncrementalIndex(
timestamp, QueryGranularity.MINUTE, METRIC_AGGS, TestQueryRunners.pool
final IncrementalIndex rowIndex = new OnheapIncrementalIndex(
timestamp, QueryGranularity.MINUTE, METRIC_AGGS
);
rowIndex.add(
@ -360,8 +361,8 @@ public class SchemalessIndex
String filename = resource.getFile();
log.info("Realtime loading index file[%s]", filename);
final IncrementalIndex retVal = new IncrementalIndex(
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.MINUTE, aggs, TestQueryRunners.pool
final IncrementalIndex retVal = new OnheapIncrementalIndex(
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.MINUTE, aggs
);
try {

View File

@ -39,6 +39,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -164,12 +165,12 @@ public class TestIndex
if (useOffheap) {
retVal = new OffheapIncrementalIndex(
schema,
TestQueryRunners.pool
TestQueryRunners.pool,
true
);
} else {
retVal = new IncrementalIndex(
schema,
TestQueryRunners.pool
retVal = new OnheapIncrementalIndex(
schema
);
}

View File

@ -27,6 +27,7 @@ import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert;
import org.junit.Test;
@ -46,9 +47,8 @@ public class IncrementalIndexTest
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
{
IncrementalIndex index = new IncrementalIndex(
0L, QueryGranularity.NONE, new AggregatorFactory[]{},
TestQueryRunners.pool
IncrementalIndex index = new OnheapIncrementalIndex(
0L, QueryGranularity.NONE, new AggregatorFactory[]{}
);
index.add(
@ -106,11 +106,10 @@ public class IncrementalIndexTest
@Test
public void testConcurrentAdd() throws Exception
{
final IncrementalIndex index = new IncrementalIndex(
final IncrementalIndex index = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool
new AggregatorFactory[]{new CountAggregatorFactory("count")}
);
final int threadCount = 10;
final int elementsPerThread = 200;

View File

@ -53,6 +53,7 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
@ -108,7 +109,7 @@ public class SpatialFilterBonusTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
IncrementalIndex theIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -124,7 +125,6 @@ public class SpatialFilterBonusTest
)
)
).build(),
TestQueryRunners.pool,
false
);
theIndex.add(
@ -239,7 +239,7 @@ public class SpatialFilterBonusTest
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
IncrementalIndex first = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -256,10 +256,9 @@ public class SpatialFilterBonusTest
)
).build(),
TestQueryRunners.pool,
false
);
IncrementalIndex second = new IncrementalIndex(
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -275,10 +274,9 @@ public class SpatialFilterBonusTest
)
)
).build(),
TestQueryRunners.pool,
false
);
IncrementalIndex third = new IncrementalIndex(
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -295,7 +293,6 @@ public class SpatialFilterBonusTest
)
).build(),
TestQueryRunners.pool,
false
);

View File

@ -54,6 +54,7 @@ import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
@ -104,7 +105,7 @@ public class SpatialFilterTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
IncrementalIndex theIndex = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -125,7 +126,6 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
);
theIndex.add(
@ -267,7 +267,7 @@ public class SpatialFilterTest
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
IncrementalIndex first = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -288,10 +288,9 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
);
IncrementalIndex second = new IncrementalIndex(
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -312,10 +311,9 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
);
IncrementalIndex third = new IncrementalIndex(
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
@ -336,7 +334,6 @@ public class SpatialFilterTest
)
)
).build(),
TestQueryRunners.pool,
false
);

View File

@ -63,9 +63,8 @@ public class IncrementalIndexStorageAdapterTest
@Test
public void testSanity() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
index.add(
@ -111,8 +110,8 @@ public class IncrementalIndexStorageAdapterTest
@Test
public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, TestQueryRunners.pool
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
index.add(
@ -197,9 +196,8 @@ public class IncrementalIndexStorageAdapterTest
@Test
public void testResetSanity() {
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
@ -250,9 +248,8 @@ public class IncrementalIndexStorageAdapterTest
@Test
public void testSingleValueTopN()
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
DateTime t = DateTime.now();
@ -306,9 +303,8 @@ public class IncrementalIndexStorageAdapterTest
@Test
public void testFilterByNull() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
index.add(

View File

@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireHydrant;
@ -191,12 +192,12 @@ public class Sink implements Iterable<FireHydrant>
if (config.isIngestOffheap()) {
newIndex = new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
new OffheapBufferPool(bufferSize),
true
);
} else {
newIndex = new IncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
newIndex = new OnheapIncrementalIndex(
indexSchema
);
}