From 72a1e730a2d6df31c07e6643430a224e7994c1ec Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Feb 2016 10:47:05 -0600 Subject: [PATCH] OffheapIncrementalIndex updates to do the aggregation merging off-heap --- .../incremental/OffheapIncrementalIndex.java | 431 +++++++----------- .../incremental/OnheapIncrementalIndex.java | 6 +- .../segment/data/IncrementalIndexTest.java | 53 ++- .../incremental/IncrementalIndexTest.java | 39 +- 4 files changed, 237 insertions(+), 292 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index be1aeb2d5e4..355cf19417b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -20,56 +20,85 @@ package io.druid.segment.incremental; import com.google.common.base.Supplier; -import com.google.common.collect.Lists; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.dimension.DimensionSpec; +import io.druid.query.aggregation.BufferAggregator; import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; /** */ -public class OnheapIncrementalIndex extends IncrementalIndex +public class OffheapIncrementalIndex extends IncrementalIndex { - private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); + private final StupidPool bufferPool; + + private final List> aggBuffers = new ArrayList<>(); + private final List indexAndOffsets = new ArrayList<>(); + private final ConcurrentNavigableMap facts; + private final AtomicInteger indexIncrement = new AtomicInteger(0); + protected final int maxRowCount; + private volatile Map selectors; + //given a ByteBuffer and an offset where all aggregates for a row are stored + //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate + //is stored + private volatile int[] aggOffsetInBuffer; + private volatile int aggsTotalSize; + private String outOfRowsReason = null; - public OnheapIncrementalIndex( + public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, - int maxRowCount + int maxRowCount, + StupidPool bufferPool ) { super(incrementalIndexSchema, deserializeComplexMetrics); this.maxRowCount = maxRowCount; + this.bufferPool = bufferPool; this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + + //check that stupid pool gives buffers that can hold at least one row's aggregators + ResourceHolder bb = bufferPool.take(); + if (bb.get().capacity() < aggsTotalSize) { + RuntimeException ex = new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); + try { + bb.close(); + } catch(IOException ioe){ + ex.addSuppressed(ioe); + } + throw ex; + } + aggBuffers.add(bb); } - public OnheapIncrementalIndex( + public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, - int maxRowCount + int maxRowCount, + StupidPool bufferPool ) { this( @@ -78,15 +107,17 @@ public class OnheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), deserializeComplexMetrics, - maxRowCount + maxRowCount, + bufferPool ); } - public OnheapIncrementalIndex( + public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, final AggregatorFactory[] metrics, - int maxRowCount + int maxRowCount, + StupidPool bufferPool ) { this( @@ -95,18 +126,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), true, - maxRowCount + maxRowCount, + bufferPool ); } - public OnheapIncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - int maxRowCount - ) - { - this(incrementalIndexSchema, true, maxRowCount); - } - @Override public ConcurrentNavigableMap getFacts() { @@ -116,23 +140,44 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override protected DimDim makeDimDim(String dimension, Object lock) { - return new OnHeapDimDim(lock); + return new OnheapIncrementalIndex.OnHeapDimDim(lock); } @Override - protected Aggregator[] initAggs( + protected BufferAggregator[] initAggs( AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics ) { selectors = Maps.newHashMap(); - for (AggregatorFactory agg : metrics) { + aggOffsetInBuffer = new int[metrics.length]; + + BufferAggregator[] aggregators = new BufferAggregator[metrics.length]; + + for (int i = 0; i < metrics.length; i++) { + AggregatorFactory agg = metrics[i]; + + ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( + agg, + rowSupplier, + deserializeComplexMetrics + ); + selectors.put( agg.getName(), - new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) + new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory) ); + + aggregators[i] = agg.factorizeBuffered(columnSelectorFactory); + if (i == 0) { + aggOffsetInBuffer[i] = 0; + } else { + aggOffsetInBuffer[i] = aggOffsetInBuffer[i-1] + metrics[i-1].getMaxIntermediateSize(); + } } - return new Aggregator[metrics.length]; + aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); + + return aggregators; } @Override @@ -146,71 +191,73 @@ public class OnheapIncrementalIndex extends IncrementalIndex Supplier rowSupplier ) throws IndexSizeExceededException { - final Integer priorIndex = facts.get(key); + ByteBuffer aggBuffer; + int bufferIndex; + int bufferOffset; - Aggregator[] aggs; - - if (null != priorIndex) { - aggs = concurrentGet(priorIndex); - } else { - aggs = new Aggregator[metrics.length]; - - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorize( - selectors.get(agg.getName()) - ); - } - final Integer rowIndex = indexIncrement.getAndIncrement(); - - concurrentSet(rowIndex, aggs); - - // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { - throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); - } - final Integer prev = facts.putIfAbsent(key, rowIndex); - if (null == prev) { - numEntries.incrementAndGet(); + synchronized (this) { + final Integer priorIndex = facts.get(key); + if (null != priorIndex) { + final int[] indexAndOffset = indexAndOffsets.get(priorIndex); + bufferIndex = indexAndOffset[0]; + bufferOffset = indexAndOffset[1]; + aggBuffer = aggBuffers.get(bufferIndex).get(); } else { - // We lost a race - aggs = concurrentGet(prev); - // Free up the misfire - concurrentRemove(rowIndex); - // This is expected to occur ~80% of the time in the worst scenarios + bufferIndex = aggBuffers.size() - 1; + ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get(); + int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty() + ? null + : indexAndOffsets.get(indexAndOffsets.size() - 1); + + if (lastAggregatorsIndexAndOffset != null && lastAggregatorsIndexAndOffset[0] != bufferIndex) { + throw new ISE("last row's aggregate's buffer and last buffer index must be same"); + } + + bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0); + if (lastBuffer != null && + lastBuffer.capacity() - bufferOffset >= aggsTotalSize) { + aggBuffer = lastBuffer; + } else { + ResourceHolder bb = bufferPool.take(); + aggBuffers.add(bb); + bufferIndex = aggBuffers.size() - 1; + bufferOffset = 0; + aggBuffer = bb.get(); + } + + for (int i = 0; i < metrics.length; i++) { + getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); + } + + // Last ditch sanity checks + if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { + throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); + } + + final Integer rowIndex = indexIncrement.getAndIncrement(); + final Integer prev = facts.putIfAbsent(key, rowIndex); + if (null == prev) { + numEntries.incrementAndGet(); + indexAndOffsets.add(new int[]{bufferIndex, bufferOffset}); + } else { + throw new ISE("WTF! we are in sychronized block."); + } } } rowContainer.set(row); - for (Aggregator agg : aggs) { + for (int i = 0; i < metrics.length; i++) { + final BufferAggregator agg = getAggs()[i]; + synchronized (agg) { - agg.aggregate(); + agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); } } - rowContainer.set(null); - - return numEntries.get(); } - protected Aggregator[] concurrentGet(int offset) - { - // All get operations should be fine - return aggregators.get(offset); - } - - protected void concurrentSet(int offset, Aggregator[] value) - { - aggregators.put(offset, value); - } - - protected void concurrentRemove(int offset) - { - aggregators.remove(offset); - } - @Override public boolean canAppendRow() { @@ -228,229 +275,75 @@ public class OnheapIncrementalIndex extends IncrementalIndex } @Override - protected Aggregator[] getAggsForRow(int rowOffset) + protected BufferAggregator[] getAggsForRow(int rowOffset) { - return concurrentGet(rowOffset); + return getAggs(); } @Override - protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition) + protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) { - return agg.get(); + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]); } @Override public float getMetricFloatValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getFloat(); + BufferAggregator agg = getAggs()[aggOffset]; + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @Override public long getMetricLongValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getLong(); + BufferAggregator agg = getAggs()[aggOffset]; + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @Override public Object getMetricObjectValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].get(); + BufferAggregator agg = getAggs()[aggOffset]; + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } /** - * Clear out maps to allow GC * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing */ @Override public void close() { super.close(); - aggregators.clear(); facts.clear(); + indexAndOffsets.clear(); + if (selectors != null) { selectors.clear(); } - } - private static class OnHeapDimDim implements DimDim - { - private final Map valueToId = Maps.newHashMap(); - - private final List idToValue = Lists.newArrayList(); - private final Object lock; - - public OnHeapDimDim(Object lock) - { - this.lock = lock; - } - - public int getId(String value) - { - synchronized (lock) { - final Integer id = valueToId.get(value); - return id == null ? -1 : id; - } - } - - public String getValue(int id) - { - synchronized (lock) { - return idToValue.get(id); - } - } - - public boolean contains(String value) - { - synchronized (lock) { - return valueToId.containsKey(value); - } - } - - public int size() - { - synchronized (lock) { - return valueToId.size(); - } - } - - public int add(String value) - { - synchronized (lock) { - Integer prev = valueToId.get(value); - if (prev != null) { - return prev; + RuntimeException ex = null; + for (ResourceHolder buffHolder : aggBuffers) { + try { + buffHolder.close(); + } catch(IOException ioe) { + if (ex == null) { + ex = Throwables.propagate(ioe); + } else { + ex.addSuppressed(ioe); } - final int index = size(); - valueToId.put(value, index); - idToValue.add(value); - return index; } } - - public OnHeapDimLookup sort() - { - synchronized (lock) { - return new OnHeapDimLookup(idToValue, size()); - } + aggBuffers.clear(); + if (ex != null) { + throw ex; } } - - private static class OnHeapDimLookup implements SortedDimLookup - { - private final String[] sortedVals; - private final int[] idToIndex; - private final int[] indexToId; - - public OnHeapDimLookup(List idToValue, int length) - { - Map sortedMap = Maps.newTreeMap(); - for (int id = 0; id < length; id++) { - sortedMap.put(idToValue.get(id), id); - } - this.sortedVals = sortedMap.keySet().toArray(new String[length]); - this.idToIndex = new int[length]; - this.indexToId = new int[length]; - int index = 0; - for (Integer id : sortedMap.values()) { - idToIndex[id] = index; - indexToId[index] = id; - index++; - } - } - - @Override - public int size() - { - return sortedVals.length; - } - - @Override - public int indexToId(int index) - { - return indexToId[index]; - } - - @Override - public String getValue(int index) - { - return sortedVals[index]; - } - - @Override - public int idToIndex(int id) - { - return idToIndex[id]; - } - } - - // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. - // In general the selectorFactory need not to thread-safe. - // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. - private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory - { - private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap objectColumnSelectorMap = Maps.newConcurrentMap(); - private final ColumnSelectorFactory delegate; - - public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate) - { - this.delegate = delegate; - } - - @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) - { - return delegate.makeDimensionSelector(dimensionSpec); - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName); - FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - - @Override - public LongColumnSelector makeLongColumnSelector(String columnName) - { - LongColumnSelector existing = longColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName); - LongColumnSelector prev = longColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String columnName) - { - ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName); - ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - } - } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index be1aeb2d5e4..9676ee67b32 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -272,7 +272,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - private static class OnHeapDimDim implements DimDim + static class OnHeapDimDim implements DimDim { private final Map valueToId = Maps.newHashMap(); @@ -335,7 +335,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - private static class OnHeapDimLookup implements SortedDimLookup + static class OnHeapDimLookup implements SortedDimLookup { private final String[] sortedVals; private final int[] idToIndex; @@ -386,7 +386,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. // In general the selectorFactory need not to thread-safe. // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. - private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory + static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory { private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index b769c6ba3fb..0658e02fc74 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -19,6 +19,7 @@ package io.druid.segment.data; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -32,6 +33,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionsSpec; @@ -52,19 +54,23 @@ import io.druid.query.timeseries.TimeseriesQueryEngine; import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.CloserRule; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -92,6 +98,9 @@ public class IncrementalIndexTest private final IndexCreator indexCreator; + @Rule + public final CloserRule closer = new CloserRule(false); + public IncrementalIndexTest( IndexCreator indexCreator ) @@ -120,7 +129,19 @@ public class IncrementalIndexTest @Override public IncrementalIndex createIndex(AggregatorFactory[] factories) { - return IncrementalIndexTest.createIndex(factories); + return new OffheapIncrementalIndex( + 0L, QueryGranularity.NONE, factories, 1000000, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); } } } @@ -207,7 +228,8 @@ public class IncrementalIndexTest public void testCaseSensitivity() throws Exception { long timestamp = System.currentTimeMillis(); - IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories)); + populateIndex(timestamp, index); Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames()); Assert.assertEquals(2, index.size()); @@ -222,8 +244,6 @@ public class IncrementalIndexTest Assert.assertEquals(timestamp, row.getTimestampFromEpoch()); Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1")); Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2")); - - index.close(); } @Test @@ -247,11 +267,14 @@ public class IncrementalIndexTest ); } - final IncrementalIndex index = indexCreator.createIndex( - ingestAggregatorFactories.toArray( - new AggregatorFactory[ingestAggregatorFactories.size()] + final IncrementalIndex index = closer.closeLater( + indexCreator.createIndex( + ingestAggregatorFactories.toArray( + new AggregatorFactory[ingestAggregatorFactories.size()] + ) ) ); + final long timestamp = System.currentTimeMillis(); final int rows = 50; @@ -320,8 +343,6 @@ public class IncrementalIndexTest result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue() ); } - - index.close(); } @Test(timeout = 60_000L) @@ -363,7 +384,9 @@ public class IncrementalIndexTest } - final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount])); + final IncrementalIndex index = closer.closeLater( + indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount])) + ); final int concurrentThreads = 2; final int elementsPerThread = 10_000; final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator( @@ -537,14 +560,12 @@ public class IncrementalIndexTest ); } } - - index.close(); } @Test public void testConcurrentAdd() throws Exception { - final IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories); + final IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories)); final int threadCount = 10; final int elementsPerThread = 200; final int dimensionCount = 5; @@ -584,8 +605,6 @@ public class IncrementalIndexTest curr++; } Assert.assertEquals(elementsPerThread, curr); - - index.close(); } @Test @@ -611,8 +630,8 @@ public class IncrementalIndexTest true, 1000000 ); - Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); + closer.closeLater(incrementalIndex); - incrementalIndex.close(); + Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); } } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 7d7e53dabdb..86777202051 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -19,19 +19,24 @@ package io.druid.segment.incremental; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.ISE; +import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.segment.CloserRule; import org.joda.time.DateTime; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -46,6 +51,9 @@ public class IncrementalIndexTest IncrementalIndex createIndex(); } + @Rule + public final CloserRule closer = new CloserRule(false); + private final IndexCreator indexCreator; public IncrementalIndexTest(IndexCreator IndexCreator) @@ -70,6 +78,31 @@ public class IncrementalIndexTest } } + }, + { + new IndexCreator() + { + @Override + public IncrementalIndex createIndex() + { + return new OffheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + 1000000, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); + } + } } } @@ -79,7 +112,7 @@ public class IncrementalIndexTest @Test(expected = ISE.class) public void testDuplicateDimensions() throws IndexSizeExceededException { - IncrementalIndex index = indexCreator.createIndex(); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( new DateTime().minus(1).getMillis(), @@ -99,7 +132,7 @@ public class IncrementalIndexTest @Test(expected = ISE.class) public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException { - IncrementalIndex index = indexCreator.createIndex(); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( new DateTime().minus(1).getMillis(), @@ -112,7 +145,7 @@ public class IncrementalIndexTest @Test public void controlTest() throws IndexSizeExceededException { - IncrementalIndex index = indexCreator.createIndex(); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( new DateTime().minus(1).getMillis(),