OffheapIncrementalIndex updates to do the aggregation merging off-heap

This commit is contained in:
Himanshu Gupta 2016-02-03 10:47:05 -06:00
parent 907dd77483
commit 72a1e730a2
4 changed files with 237 additions and 292 deletions

View File

@ -20,56 +20,85 @@
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final StupidPool<ByteBuffer> bufferPool;
private final List<ResourceHolder<ByteBuffer>> aggBuffers = new ArrayList<>();
private final List<int[]> indexAndOffsets = new ArrayList<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
//given a ByteBuffer and an offset where all aggregates for a row are stored
//offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate
//is stored
private volatile int[] aggOffsetInBuffer;
private volatile int aggsTotalSize;
private String outOfRowsReason = null;
public OnheapIncrementalIndex(
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
int maxRowCount
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();
if (bb.get().capacity() < aggsTotalSize) {
RuntimeException ex = new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize);
try {
bb.close();
} catch(IOException ioe){
ex.addSuppressed(ioe);
}
throw ex;
}
aggBuffers.add(bb);
}
public OnheapIncrementalIndex(
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
int maxRowCount
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
this(
@ -78,15 +107,17 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
.withMetrics(metrics)
.build(),
deserializeComplexMetrics,
maxRowCount
maxRowCount,
bufferPool
);
}
public OnheapIncrementalIndex(
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
int maxRowCount
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
this(
@ -95,18 +126,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
.withMetrics(metrics)
.build(),
true,
maxRowCount
maxRowCount,
bufferPool
);
}
public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
int maxRowCount
)
{
this(incrementalIndexSchema, true, maxRowCount);
}
@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
@ -116,23 +140,44 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
@Override
protected DimDim makeDimDim(String dimension, Object lock)
{
return new OnHeapDimDim(lock);
return new OnheapIncrementalIndex.OnHeapDimDim(lock);
}
@Override
protected Aggregator[] initAggs(
protected BufferAggregator[] initAggs(
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
)
{
selectors = Maps.newHashMap();
for (AggregatorFactory agg : metrics) {
aggOffsetInBuffer = new int[metrics.length];
BufferAggregator[] aggregators = new BufferAggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
AggregatorFactory agg = metrics[i];
ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory(
agg,
rowSupplier,
deserializeComplexMetrics
);
selectors.put(
agg.getName(),
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics))
new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory)
);
aggregators[i] = agg.factorizeBuffered(columnSelectorFactory);
if (i == 0) {
aggOffsetInBuffer[i] = 0;
} else {
aggOffsetInBuffer[i] = aggOffsetInBuffer[i-1] + metrics[i-1].getMaxIntermediateSize();
}
}
return new Aggregator[metrics.length];
aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize();
return aggregators;
}
@Override
@ -146,71 +191,73 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException
{
final Integer priorIndex = facts.get(key);
ByteBuffer aggBuffer;
int bufferIndex;
int bufferOffset;
Aggregator[] aggs;
if (null != priorIndex) {
aggs = concurrentGet(priorIndex);
} else {
aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
selectors.get(agg.getName())
);
}
final Integer rowIndex = indexIncrement.getAndIncrement();
concurrentSet(rowIndex, aggs);
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
final Integer prev = facts.putIfAbsent(key, rowIndex);
if (null == prev) {
numEntries.incrementAndGet();
synchronized (this) {
final Integer priorIndex = facts.get(key);
if (null != priorIndex) {
final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
bufferIndex = indexAndOffset[0];
bufferOffset = indexAndOffset[1];
aggBuffer = aggBuffers.get(bufferIndex).get();
} else {
// We lost a race
aggs = concurrentGet(prev);
// Free up the misfire
concurrentRemove(rowIndex);
// This is expected to occur ~80% of the time in the worst scenarios
bufferIndex = aggBuffers.size() - 1;
ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get();
int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty()
? null
: indexAndOffsets.get(indexAndOffsets.size() - 1);
if (lastAggregatorsIndexAndOffset != null && lastAggregatorsIndexAndOffset[0] != bufferIndex) {
throw new ISE("last row's aggregate's buffer and last buffer index must be same");
}
bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0);
if (lastBuffer != null &&
lastBuffer.capacity() - bufferOffset >= aggsTotalSize) {
aggBuffer = lastBuffer;
} else {
ResourceHolder<ByteBuffer> bb = bufferPool.take();
aggBuffers.add(bb);
bufferIndex = aggBuffers.size() - 1;
bufferOffset = 0;
aggBuffer = bb.get();
}
for (int i = 0; i < metrics.length; i++) {
getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]);
}
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
final Integer rowIndex = indexIncrement.getAndIncrement();
final Integer prev = facts.putIfAbsent(key, rowIndex);
if (null == prev) {
numEntries.incrementAndGet();
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
} else {
throw new ISE("WTF! we are in sychronized block.");
}
}
}
rowContainer.set(row);
for (Aggregator agg : aggs) {
for (int i = 0; i < metrics.length; i++) {
final BufferAggregator agg = getAggs()[i];
synchronized (agg) {
agg.aggregate();
agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]);
}
}
rowContainer.set(null);
return numEntries.get();
}
protected Aggregator[] concurrentGet(int offset)
{
// All get operations should be fine
return aggregators.get(offset);
}
protected void concurrentSet(int offset, Aggregator[] value)
{
aggregators.put(offset, value);
}
protected void concurrentRemove(int offset)
{
aggregators.remove(offset);
}
@Override
public boolean canAppendRow()
{
@ -228,229 +275,75 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
@Override
protected Aggregator[] getAggsForRow(int rowOffset)
protected BufferAggregator[] getAggsForRow(int rowOffset)
{
return concurrentGet(rowOffset);
return getAggs();
}
@Override
protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition)
protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition)
{
return agg.get();
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]);
}
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return concurrentGet(rowOffset)[aggOffset].getFloat();
BufferAggregator agg = getAggs()[aggOffset];
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]);
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return concurrentGet(rowOffset)[aggOffset].getLong();
BufferAggregator agg = getAggs()[aggOffset];
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]);
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
return concurrentGet(rowOffset)[aggOffset].get();
BufferAggregator agg = getAggs()[aggOffset];
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]);
}
/**
* Clear out maps to allow GC
* NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing
*/
@Override
public void close()
{
super.close();
aggregators.clear();
facts.clear();
indexAndOffsets.clear();
if (selectors != null) {
selectors.clear();
}
}
private static class OnHeapDimDim implements DimDim
{
private final Map<String, Integer> valueToId = Maps.newHashMap();
private final List<String> idToValue = Lists.newArrayList();
private final Object lock;
public OnHeapDimDim(Object lock)
{
this.lock = lock;
}
public int getId(String value)
{
synchronized (lock) {
final Integer id = valueToId.get(value);
return id == null ? -1 : id;
}
}
public String getValue(int id)
{
synchronized (lock) {
return idToValue.get(id);
}
}
public boolean contains(String value)
{
synchronized (lock) {
return valueToId.containsKey(value);
}
}
public int size()
{
synchronized (lock) {
return valueToId.size();
}
}
public int add(String value)
{
synchronized (lock) {
Integer prev = valueToId.get(value);
if (prev != null) {
return prev;
RuntimeException ex = null;
for (ResourceHolder<ByteBuffer> buffHolder : aggBuffers) {
try {
buffHolder.close();
} catch(IOException ioe) {
if (ex == null) {
ex = Throwables.propagate(ioe);
} else {
ex.addSuppressed(ioe);
}
final int index = size();
valueToId.put(value, index);
idToValue.add(value);
return index;
}
}
public OnHeapDimLookup sort()
{
synchronized (lock) {
return new OnHeapDimLookup(idToValue, size());
}
aggBuffers.clear();
if (ex != null) {
throw ex;
}
}
private static class OnHeapDimLookup implements SortedDimLookup
{
private final String[] sortedVals;
private final int[] idToIndex;
private final int[] indexToId;
public OnHeapDimLookup(List<String> idToValue, int length)
{
Map<String, Integer> sortedMap = Maps.newTreeMap();
for (int id = 0; id < length; id++) {
sortedMap.put(idToValue.get(id), id);
}
this.sortedVals = sortedMap.keySet().toArray(new String[length]);
this.idToIndex = new int[length];
this.indexToId = new int[length];
int index = 0;
for (Integer id : sortedMap.values()) {
idToIndex[id] = index;
indexToId[index] = id;
index++;
}
}
@Override
public int size()
{
return sortedVals.length;
}
@Override
public int indexToId(int index)
{
return indexToId[index];
}
@Override
public String getValue(int index)
{
return sortedVals[index];
}
@Override
public int idToIndex(int id)
{
return idToIndex[id];
}
}
// Caches references to selector objects for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
{
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, ObjectColumnSelector> objectColumnSelectorMap = Maps.newConcurrentMap();
private final ColumnSelectorFactory delegate;
public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate)
{
this.delegate = delegate;
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return delegate.makeDimensionSelector(dimensionSpec);
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName);
FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
LongColumnSelector existing = longColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName);
LongColumnSelector prev = longColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName);
ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
}
}

View File

@ -272,7 +272,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
private static class OnHeapDimDim implements DimDim
static class OnHeapDimDim implements DimDim
{
private final Map<String, Integer> valueToId = Maps.newHashMap();
@ -335,7 +335,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
private static class OnHeapDimLookup implements SortedDimLookup
static class OnHeapDimLookup implements SortedDimLookup
{
private final String[] sortedVals;
private final int[] idToIndex;
@ -386,7 +386,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
// Caches references to selector objects for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
{
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();

View File

@ -19,6 +19,7 @@
package io.druid.segment.data;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -32,6 +33,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
@ -52,19 +54,23 @@ import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.CloserRule;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -92,6 +98,9 @@ public class IncrementalIndexTest
private final IndexCreator indexCreator;
@Rule
public final CloserRule closer = new CloserRule(false);
public IncrementalIndexTest(
IndexCreator indexCreator
)
@ -120,7 +129,19 @@ public class IncrementalIndexTest
@Override
public IncrementalIndex createIndex(AggregatorFactory[] factories)
{
return IncrementalIndexTest.createIndex(factories);
return new OffheapIncrementalIndex(
0L, QueryGranularity.NONE, factories, 1000000,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
}
@ -207,7 +228,8 @@ public class IncrementalIndexTest
public void testCaseSensitivity() throws Exception
{
long timestamp = System.currentTimeMillis();
IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories);
IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
Assert.assertEquals(2, index.size());
@ -222,8 +244,6 @@ public class IncrementalIndexTest
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
index.close();
}
@Test
@ -247,11 +267,14 @@ public class IncrementalIndexTest
);
}
final IncrementalIndex index = indexCreator.createIndex(
ingestAggregatorFactories.toArray(
new AggregatorFactory[ingestAggregatorFactories.size()]
final IncrementalIndex index = closer.closeLater(
indexCreator.createIndex(
ingestAggregatorFactories.toArray(
new AggregatorFactory[ingestAggregatorFactories.size()]
)
)
);
final long timestamp = System.currentTimeMillis();
final int rows = 50;
@ -320,8 +343,6 @@ public class IncrementalIndexTest
result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue()
);
}
index.close();
}
@Test(timeout = 60_000L)
@ -363,7 +384,9 @@ public class IncrementalIndexTest
}
final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount]));
final IncrementalIndex index = closer.closeLater(
indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount]))
);
final int concurrentThreads = 2;
final int elementsPerThread = 10_000;
final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator(
@ -537,14 +560,12 @@ public class IncrementalIndexTest
);
}
}
index.close();
}
@Test
public void testConcurrentAdd() throws Exception
{
final IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories);
final IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
@ -584,8 +605,6 @@ public class IncrementalIndexTest
curr++;
}
Assert.assertEquals(elementsPerThread, curr);
index.close();
}
@Test
@ -611,8 +630,8 @@ public class IncrementalIndexTest
true,
1000000
);
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
closer.closeLater(incrementalIndex);
incrementalIndex.close();
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
}
}

View File

@ -19,19 +19,24 @@
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.CloserRule;
import org.joda.time.DateTime;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
@ -46,6 +51,9 @@ public class IncrementalIndexTest
IncrementalIndex createIndex();
}
@Rule
public final CloserRule closer = new CloserRule(false);
private final IndexCreator indexCreator;
public IncrementalIndexTest(IndexCreator IndexCreator)
@ -70,6 +78,31 @@ public class IncrementalIndexTest
}
}
},
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1000000,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
}
}
@ -79,7 +112,7 @@ public class IncrementalIndexTest
@Test(expected = ISE.class)
public void testDuplicateDimensions() throws IndexSizeExceededException
{
IncrementalIndex index = indexCreator.createIndex();
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
@ -99,7 +132,7 @@ public class IncrementalIndexTest
@Test(expected = ISE.class)
public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException
{
IncrementalIndex index = indexCreator.createIndex();
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
@ -112,7 +145,7 @@ public class IncrementalIndexTest
@Test
public void controlTest() throws IndexSizeExceededException
{
IncrementalIndex index = indexCreator.createIndex();
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),