Merge pull request #2325 from himanshug/gp_by_merge

new OffHeapIncrementalIndex that does only aggregations off-heap
This commit is contained in:
Charles Allen 2016-02-05 13:47:12 -08:00
commit 08802c345d
12 changed files with 546 additions and 38 deletions

View File

@ -38,11 +38,24 @@ public class StupidPool<T>
private final Queue<T> objects = new ConcurrentLinkedQueue<>();
//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
private final int objectsCacheMaxCount;
public StupidPool(
Supplier<T> generator
)
{
this.generator = generator;
this.objectsCacheMaxCount = Integer.MAX_VALUE;
}
public StupidPool(
Supplier<T> generator,
int objectsCacheMaxCount
)
{
this.generator = generator;
this.objectsCacheMaxCount = objectsCacheMaxCount;
}
public ResourceHolder<T> take()
@ -80,8 +93,12 @@ public class StupidPool<T>
log.warn(new ISE("Already Closed!"), "Already closed");
return;
}
if (!objects.offer(object)) {
log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object);
if (objects.size() < objectsCacheMaxCount) {
if (!objects.offer(object)) {
log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object);
}
} else {
log.debug("cache num entries is exceeding max limit [%s]", objectsCacheMaxCount);
}
}

View File

@ -55,6 +55,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|Property|Description|Default|
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|

View File

@ -53,6 +53,7 @@ Druid uses Jetty to serve HTTP requests.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|

View File

@ -31,6 +31,12 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
return 1024 * 1024 * 1024;
}
@Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"})
public int poolCacheMaxCount()
{
return Integer.MAX_VALUE;
}
@Override @Config(value = "${base_path}.numThreads")
public int getNumThreads()
{

View File

@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.nio.ByteBuffer;
@ -45,7 +46,6 @@ public class GroupByQueryHelper
final GroupByQuery query,
final GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool
)
{
final QueryGranularity gran = query.getGranularity();
@ -77,15 +77,30 @@ public class GroupByQueryHelper
}
}
);
final IncrementalIndex index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults()
);
final IncrementalIndex index;
if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults(),
bufferPool
);
} else {
index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults()
);
}
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{

View File

@ -0,0 +1,349 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
private final StupidPool<ByteBuffer> bufferPool;
private final List<ResourceHolder<ByteBuffer>> aggBuffers = new ArrayList<>();
private final List<int[]> indexAndOffsets = new ArrayList<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
//given a ByteBuffer and an offset where all aggregates for a row are stored
//offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate
//is stored
private volatile int[] aggOffsetInBuffer;
private volatile int aggsTotalSize;
private String outOfRowsReason = null;
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();
if (bb.get().capacity() < aggsTotalSize) {
RuntimeException ex = new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize);
try {
bb.close();
} catch(IOException ioe){
ex.addSuppressed(ioe);
}
throw ex;
}
aggBuffers.add(bb);
}
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
deserializeComplexMetrics,
maxRowCount,
bufferPool
);
}
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true,
maxRowCount,
bufferPool
);
}
@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
@Override
protected DimDim makeDimDim(String dimension, Object lock)
{
return new OnheapIncrementalIndex.OnHeapDimDim(lock);
}
@Override
protected BufferAggregator[] initAggs(
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
)
{
selectors = Maps.newHashMap();
aggOffsetInBuffer = new int[metrics.length];
BufferAggregator[] aggregators = new BufferAggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
AggregatorFactory agg = metrics[i];
ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory(
agg,
rowSupplier,
deserializeComplexMetrics
);
selectors.put(
agg.getName(),
new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory)
);
aggregators[i] = agg.factorizeBuffered(columnSelectorFactory);
if (i == 0) {
aggOffsetInBuffer[i] = 0;
} else {
aggOffsetInBuffer[i] = aggOffsetInBuffer[i-1] + metrics[i-1].getMaxIntermediateSize();
}
}
aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize();
return aggregators;
}
@Override
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException
{
ByteBuffer aggBuffer;
int bufferIndex;
int bufferOffset;
synchronized (this) {
final Integer priorIndex = facts.get(key);
if (null != priorIndex) {
final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
bufferIndex = indexAndOffset[0];
bufferOffset = indexAndOffset[1];
aggBuffer = aggBuffers.get(bufferIndex).get();
} else {
bufferIndex = aggBuffers.size() - 1;
ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get();
int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty()
? null
: indexAndOffsets.get(indexAndOffsets.size() - 1);
if (lastAggregatorsIndexAndOffset != null && lastAggregatorsIndexAndOffset[0] != bufferIndex) {
throw new ISE("last row's aggregate's buffer and last buffer index must be same");
}
bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0);
if (lastBuffer != null &&
lastBuffer.capacity() - bufferOffset >= aggsTotalSize) {
aggBuffer = lastBuffer;
} else {
ResourceHolder<ByteBuffer> bb = bufferPool.take();
aggBuffers.add(bb);
bufferIndex = aggBuffers.size() - 1;
bufferOffset = 0;
aggBuffer = bb.get();
}
for (int i = 0; i < metrics.length; i++) {
getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]);
}
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
final Integer rowIndex = indexIncrement.getAndIncrement();
final Integer prev = facts.putIfAbsent(key, rowIndex);
if (null == prev) {
numEntries.incrementAndGet();
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
} else {
throw new ISE("WTF! we are in sychronized block.");
}
}
}
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
final BufferAggregator agg = getAggs()[i];
synchronized (agg) {
agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]);
}
}
rowContainer.set(null);
return numEntries.get();
}
@Override
public boolean canAppendRow()
{
final boolean canAdd = size() < maxRowCount;
if (!canAdd) {
outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount);
}
return canAdd;
}
@Override
public String getOutOfRowsReason()
{
return outOfRowsReason;
}
@Override
protected BufferAggregator[] getAggsForRow(int rowOffset)
{
return getAggs();
}
@Override
protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition)
{
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]);
}
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
BufferAggregator agg = getAggs()[aggOffset];
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]);
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
BufferAggregator agg = getAggs()[aggOffset];
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]);
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
BufferAggregator agg = getAggs()[aggOffset];
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]);
}
/**
* NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing
*/
@Override
public void close()
{
super.close();
facts.clear();
indexAndOffsets.clear();
if (selectors != null) {
selectors.clear();
}
RuntimeException ex = null;
for (ResourceHolder<ByteBuffer> buffHolder : aggBuffers) {
try {
buffHolder.close();
} catch(IOException ioe) {
if (ex == null) {
ex = Throwables.propagate(ioe);
} else {
ex.addSuppressed(ioe);
}
}
}
aggBuffers.clear();
if (ex != null) {
throw ex;
}
}
}

View File

@ -272,7 +272,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
private static class OnHeapDimDim implements DimDim
static class OnHeapDimDim implements DimDim
{
private final Map<String, Integer> valueToId = Maps.newHashMap();
@ -335,7 +335,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
private static class OnHeapDimLookup implements SortedDimLookup
static class OnHeapDimLookup implements SortedDimLookup
{
private final String[] sortedVals;
private final int[] idToIndex;
@ -386,7 +386,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
// Caches references to selector objects for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
{
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();

View File

@ -0,0 +1,66 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.config.Config;
import org.junit.Assert;
import org.junit.Test;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
/**
*/
public class DruidProcessingConfigTest
{
@Test
public void testDeserialization() throws Exception
{
ConfigurationObjectFactory factory = Config.createFactory(new Properties());
//with defaults
DruidProcessingConfig config = factory.build(DruidProcessingConfig.class);
Assert.assertEquals(1024 * 1024 * 1024, config.intermediateComputeSizeBytes());
Assert.assertEquals(Integer.MAX_VALUE, config.poolCacheMaxCount());
Assert.assertTrue(config.getNumThreads() < Runtime.getRuntime().availableProcessors());
Assert.assertEquals(0, config.columnCacheSizeBytes());
Assert.assertFalse(config.isFifo());
//with non-defaults
Properties props = new Properties();
props.setProperty("druid.processing.buffer.sizeBytes", "1");
props.setProperty("druid.processing.buffer.poolCacheMaxCount", "1");
props.setProperty("druid.processing.numThreads", "5");
props.setProperty("druid.processing.columnCache.sizeBytes", "1");
props.setProperty("druid.processing.fifo", "true");
factory = Config.createFactory(props);
config = factory.buildWithReplacements(DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
Assert.assertEquals(1, config.intermediateComputeSizeBytes());
Assert.assertEquals(1, config.poolCacheMaxCount());
Assert.assertEquals(5, config.getNumThreads());
Assert.assertEquals(1, config.columnCacheSizeBytes());
Assert.assertTrue(config.isFifo());
}
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.data;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -32,6 +33,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
@ -52,19 +54,23 @@ import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.CloserRule;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -92,6 +98,9 @@ public class IncrementalIndexTest
private final IndexCreator indexCreator;
@Rule
public final CloserRule closer = new CloserRule(false);
public IncrementalIndexTest(
IndexCreator indexCreator
)
@ -120,7 +129,19 @@ public class IncrementalIndexTest
@Override
public IncrementalIndex createIndex(AggregatorFactory[] factories)
{
return IncrementalIndexTest.createIndex(factories);
return new OffheapIncrementalIndex(
0L, QueryGranularity.NONE, factories, 1000000,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
}
@ -207,7 +228,8 @@ public class IncrementalIndexTest
public void testCaseSensitivity() throws Exception
{
long timestamp = System.currentTimeMillis();
IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories);
IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
Assert.assertEquals(2, index.size());
@ -222,8 +244,6 @@ public class IncrementalIndexTest
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
index.close();
}
@Test
@ -247,11 +267,14 @@ public class IncrementalIndexTest
);
}
final IncrementalIndex index = indexCreator.createIndex(
ingestAggregatorFactories.toArray(
new AggregatorFactory[ingestAggregatorFactories.size()]
final IncrementalIndex index = closer.closeLater(
indexCreator.createIndex(
ingestAggregatorFactories.toArray(
new AggregatorFactory[ingestAggregatorFactories.size()]
)
)
);
final long timestamp = System.currentTimeMillis();
final int rows = 50;
@ -320,8 +343,6 @@ public class IncrementalIndexTest
result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue()
);
}
index.close();
}
@Test(timeout = 60_000L)
@ -363,7 +384,9 @@ public class IncrementalIndexTest
}
final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount]));
final IncrementalIndex index = closer.closeLater(
indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount]))
);
final int concurrentThreads = 2;
final int elementsPerThread = 10_000;
final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator(
@ -537,14 +560,12 @@ public class IncrementalIndexTest
);
}
}
index.close();
}
@Test
public void testConcurrentAdd() throws Exception
{
final IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories);
final IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories));
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
@ -584,8 +605,6 @@ public class IncrementalIndexTest
curr++;
}
Assert.assertEquals(elementsPerThread, curr);
index.close();
}
@Test
@ -611,8 +630,8 @@ public class IncrementalIndexTest
true,
1000000
);
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
closer.closeLater(incrementalIndex);
incrementalIndex.close();
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
}
}

View File

@ -19,19 +19,24 @@
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.CloserRule;
import org.joda.time.DateTime;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
@ -46,6 +51,9 @@ public class IncrementalIndexTest
IncrementalIndex createIndex();
}
@Rule
public final CloserRule closer = new CloserRule(false);
private final IndexCreator indexCreator;
public IncrementalIndexTest(IndexCreator IndexCreator)
@ -70,6 +78,31 @@ public class IncrementalIndexTest
}
}
},
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1000000,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}
}
}
@ -79,7 +112,7 @@ public class IncrementalIndexTest
@Test(expected = ISE.class)
public void testDuplicateDimensions() throws IndexSizeExceededException
{
IncrementalIndex index = indexCreator.createIndex();
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
@ -99,7 +132,7 @@ public class IncrementalIndexTest
@Test(expected = ISE.class)
public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException
{
IncrementalIndex index = indexCreator.createIndex();
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
@ -112,7 +145,7 @@ public class IncrementalIndexTest
@Test
public void controlTest() throws IndexSizeExceededException
{
IncrementalIndex index = indexCreator.createIndex();
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),

View File

@ -125,7 +125,7 @@ public class DruidProcessingModule implements Module
log.info(e.getMessage());
}
return new OffheapBufferPool(config.intermediateComputeSizeBytes());
return new OffheapBufferPool(config.intermediateComputeSizeBytes(), config.poolCacheMaxCount());
}

View File

@ -31,7 +31,7 @@ public class OffheapBufferPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(OffheapBufferPool.class);
public OffheapBufferPool(final int computationBufferSize)
public OffheapBufferPool(final int computationBufferSize, final int cacheMaxCount)
{
super(
new Supplier<ByteBuffer>()
@ -47,7 +47,8 @@ public class OffheapBufferPool extends StupidPool<ByteBuffer>
);
return ByteBuffer.allocateDirect(computationBufferSize);
}
}
},
cacheMaxCount
);
}
}