From 907dd77483c0cdbe550fbd351b4ea31260866ca3 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Feb 2016 10:26:45 -0600 Subject: [PATCH 1/4] OffheapIncrementalIndex a copy/paste of OnheapIncrementalIndex --- .../incremental/OffheapIncrementalIndex.java | 456 ++++++++++++++++++ 1 file changed, 456 insertions(+) create mode 100644 processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java new file mode 100644 index 00000000000..be1aeb2d5e4 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -0,0 +1,456 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.incremental; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.dimension.DimensionSpec; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class OnheapIncrementalIndex extends IncrementalIndex +{ + private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); + private final ConcurrentNavigableMap facts; + private final AtomicInteger indexIncrement = new AtomicInteger(0); + protected final int maxRowCount; + private volatile Map selectors; + + private String outOfRowsReason = null; + + public OnheapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema, + boolean deserializeComplexMetrics, + int maxRowCount + ) + { + super(incrementalIndexSchema, deserializeComplexMetrics); + this.maxRowCount = maxRowCount; + this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + } + + public OnheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics, + boolean deserializeComplexMetrics, + int maxRowCount + ) + { + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + deserializeComplexMetrics, + maxRowCount + ); + } + + public OnheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics, + int maxRowCount + ) + { + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + true, + maxRowCount + ); + } + + public OnheapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema, + int maxRowCount + ) + { + this(incrementalIndexSchema, true, maxRowCount); + } + + @Override + public ConcurrentNavigableMap getFacts() + { + return facts; + } + + @Override + protected DimDim makeDimDim(String dimension, Object lock) + { + return new OnHeapDimDim(lock); + } + + @Override + protected Aggregator[] initAggs( + AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics + ) + { + selectors = Maps.newHashMap(); + for (AggregatorFactory agg : metrics) { + selectors.put( + agg.getName(), + new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) + ); + } + + return new Aggregator[metrics.length]; + } + + @Override + protected Integer addToFacts( + AggregatorFactory[] metrics, + boolean deserializeComplexMetrics, + InputRow row, + AtomicInteger numEntries, + TimeAndDims key, + ThreadLocal rowContainer, + Supplier rowSupplier + ) throws IndexSizeExceededException + { + final Integer priorIndex = facts.get(key); + + Aggregator[] aggs; + + if (null != priorIndex) { + aggs = concurrentGet(priorIndex); + } else { + aggs = new Aggregator[metrics.length]; + + for (int i = 0; i < metrics.length; i++) { + final AggregatorFactory agg = metrics[i]; + aggs[i] = agg.factorize( + selectors.get(agg.getName()) + ); + } + final Integer rowIndex = indexIncrement.getAndIncrement(); + + concurrentSet(rowIndex, aggs); + + // Last ditch sanity checks + if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { + throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); + } + final Integer prev = facts.putIfAbsent(key, rowIndex); + if (null == prev) { + numEntries.incrementAndGet(); + } else { + // We lost a race + aggs = concurrentGet(prev); + // Free up the misfire + concurrentRemove(rowIndex); + // This is expected to occur ~80% of the time in the worst scenarios + } + } + + rowContainer.set(row); + + for (Aggregator agg : aggs) { + synchronized (agg) { + agg.aggregate(); + } + } + + rowContainer.set(null); + + + return numEntries.get(); + } + + protected Aggregator[] concurrentGet(int offset) + { + // All get operations should be fine + return aggregators.get(offset); + } + + protected void concurrentSet(int offset, Aggregator[] value) + { + aggregators.put(offset, value); + } + + protected void concurrentRemove(int offset) + { + aggregators.remove(offset); + } + + @Override + public boolean canAppendRow() + { + final boolean canAdd = size() < maxRowCount; + if (!canAdd) { + outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount); + } + return canAdd; + } + + @Override + public String getOutOfRowsReason() + { + return outOfRowsReason; + } + + @Override + protected Aggregator[] getAggsForRow(int rowOffset) + { + return concurrentGet(rowOffset); + } + + @Override + protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition) + { + return agg.get(); + } + + @Override + public float getMetricFloatValue(int rowOffset, int aggOffset) + { + return concurrentGet(rowOffset)[aggOffset].getFloat(); + } + + @Override + public long getMetricLongValue(int rowOffset, int aggOffset) + { + return concurrentGet(rowOffset)[aggOffset].getLong(); + } + + @Override + public Object getMetricObjectValue(int rowOffset, int aggOffset) + { + return concurrentGet(rowOffset)[aggOffset].get(); + } + + /** + * Clear out maps to allow GC + * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing + */ + @Override + public void close() + { + super.close(); + aggregators.clear(); + facts.clear(); + if (selectors != null) { + selectors.clear(); + } + } + + private static class OnHeapDimDim implements DimDim + { + private final Map valueToId = Maps.newHashMap(); + + private final List idToValue = Lists.newArrayList(); + private final Object lock; + + public OnHeapDimDim(Object lock) + { + this.lock = lock; + } + + public int getId(String value) + { + synchronized (lock) { + final Integer id = valueToId.get(value); + return id == null ? -1 : id; + } + } + + public String getValue(int id) + { + synchronized (lock) { + return idToValue.get(id); + } + } + + public boolean contains(String value) + { + synchronized (lock) { + return valueToId.containsKey(value); + } + } + + public int size() + { + synchronized (lock) { + return valueToId.size(); + } + } + + public int add(String value) + { + synchronized (lock) { + Integer prev = valueToId.get(value); + if (prev != null) { + return prev; + } + final int index = size(); + valueToId.put(value, index); + idToValue.add(value); + return index; + } + } + + public OnHeapDimLookup sort() + { + synchronized (lock) { + return new OnHeapDimLookup(idToValue, size()); + } + } + } + + private static class OnHeapDimLookup implements SortedDimLookup + { + private final String[] sortedVals; + private final int[] idToIndex; + private final int[] indexToId; + + public OnHeapDimLookup(List idToValue, int length) + { + Map sortedMap = Maps.newTreeMap(); + for (int id = 0; id < length; id++) { + sortedMap.put(idToValue.get(id), id); + } + this.sortedVals = sortedMap.keySet().toArray(new String[length]); + this.idToIndex = new int[length]; + this.indexToId = new int[length]; + int index = 0; + for (Integer id : sortedMap.values()) { + idToIndex[id] = index; + indexToId[index] = id; + index++; + } + } + + @Override + public int size() + { + return sortedVals.length; + } + + @Override + public int indexToId(int index) + { + return indexToId[index]; + } + + @Override + public String getValue(int index) + { + return sortedVals[index]; + } + + @Override + public int idToIndex(int id) + { + return idToIndex[id]; + } + } + + // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. + // In general the selectorFactory need not to thread-safe. + // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. + private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory + { + private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); + private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); + private final ConcurrentMap objectColumnSelectorMap = Maps.newConcurrentMap(); + private final ColumnSelectorFactory delegate; + + public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate) + { + this.delegate = delegate; + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return delegate.makeDimensionSelector(dimensionSpec); + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName); + FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + + @Override + public LongColumnSelector makeLongColumnSelector(String columnName) + { + LongColumnSelector existing = longColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName); + LongColumnSelector prev = longColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String columnName) + { + ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName); + ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + } + +} From 72a1e730a2d6df31c07e6643430a224e7994c1ec Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Feb 2016 10:47:05 -0600 Subject: [PATCH 2/4] OffheapIncrementalIndex updates to do the aggregation merging off-heap --- .../incremental/OffheapIncrementalIndex.java | 431 +++++++----------- .../incremental/OnheapIncrementalIndex.java | 6 +- .../segment/data/IncrementalIndexTest.java | 53 ++- .../incremental/IncrementalIndexTest.java | 39 +- 4 files changed, 237 insertions(+), 292 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index be1aeb2d5e4..355cf19417b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -20,56 +20,85 @@ package io.druid.segment.incremental; import com.google.common.base.Supplier; -import com.google.common.collect.Lists; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.dimension.DimensionSpec; +import io.druid.query.aggregation.BufferAggregator; import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; /** */ -public class OnheapIncrementalIndex extends IncrementalIndex +public class OffheapIncrementalIndex extends IncrementalIndex { - private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); + private final StupidPool bufferPool; + + private final List> aggBuffers = new ArrayList<>(); + private final List indexAndOffsets = new ArrayList<>(); + private final ConcurrentNavigableMap facts; + private final AtomicInteger indexIncrement = new AtomicInteger(0); + protected final int maxRowCount; + private volatile Map selectors; + //given a ByteBuffer and an offset where all aggregates for a row are stored + //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate + //is stored + private volatile int[] aggOffsetInBuffer; + private volatile int aggsTotalSize; + private String outOfRowsReason = null; - public OnheapIncrementalIndex( + public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, - int maxRowCount + int maxRowCount, + StupidPool bufferPool ) { super(incrementalIndexSchema, deserializeComplexMetrics); this.maxRowCount = maxRowCount; + this.bufferPool = bufferPool; this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + + //check that stupid pool gives buffers that can hold at least one row's aggregators + ResourceHolder bb = bufferPool.take(); + if (bb.get().capacity() < aggsTotalSize) { + RuntimeException ex = new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); + try { + bb.close(); + } catch(IOException ioe){ + ex.addSuppressed(ioe); + } + throw ex; + } + aggBuffers.add(bb); } - public OnheapIncrementalIndex( + public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, - int maxRowCount + int maxRowCount, + StupidPool bufferPool ) { this( @@ -78,15 +107,17 @@ public class OnheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), deserializeComplexMetrics, - maxRowCount + maxRowCount, + bufferPool ); } - public OnheapIncrementalIndex( + public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, final AggregatorFactory[] metrics, - int maxRowCount + int maxRowCount, + StupidPool bufferPool ) { this( @@ -95,18 +126,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), true, - maxRowCount + maxRowCount, + bufferPool ); } - public OnheapIncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - int maxRowCount - ) - { - this(incrementalIndexSchema, true, maxRowCount); - } - @Override public ConcurrentNavigableMap getFacts() { @@ -116,23 +140,44 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override protected DimDim makeDimDim(String dimension, Object lock) { - return new OnHeapDimDim(lock); + return new OnheapIncrementalIndex.OnHeapDimDim(lock); } @Override - protected Aggregator[] initAggs( + protected BufferAggregator[] initAggs( AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics ) { selectors = Maps.newHashMap(); - for (AggregatorFactory agg : metrics) { + aggOffsetInBuffer = new int[metrics.length]; + + BufferAggregator[] aggregators = new BufferAggregator[metrics.length]; + + for (int i = 0; i < metrics.length; i++) { + AggregatorFactory agg = metrics[i]; + + ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( + agg, + rowSupplier, + deserializeComplexMetrics + ); + selectors.put( agg.getName(), - new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) + new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory) ); + + aggregators[i] = agg.factorizeBuffered(columnSelectorFactory); + if (i == 0) { + aggOffsetInBuffer[i] = 0; + } else { + aggOffsetInBuffer[i] = aggOffsetInBuffer[i-1] + metrics[i-1].getMaxIntermediateSize(); + } } - return new Aggregator[metrics.length]; + aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); + + return aggregators; } @Override @@ -146,71 +191,73 @@ public class OnheapIncrementalIndex extends IncrementalIndex Supplier rowSupplier ) throws IndexSizeExceededException { - final Integer priorIndex = facts.get(key); + ByteBuffer aggBuffer; + int bufferIndex; + int bufferOffset; - Aggregator[] aggs; - - if (null != priorIndex) { - aggs = concurrentGet(priorIndex); - } else { - aggs = new Aggregator[metrics.length]; - - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorize( - selectors.get(agg.getName()) - ); - } - final Integer rowIndex = indexIncrement.getAndIncrement(); - - concurrentSet(rowIndex, aggs); - - // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { - throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); - } - final Integer prev = facts.putIfAbsent(key, rowIndex); - if (null == prev) { - numEntries.incrementAndGet(); + synchronized (this) { + final Integer priorIndex = facts.get(key); + if (null != priorIndex) { + final int[] indexAndOffset = indexAndOffsets.get(priorIndex); + bufferIndex = indexAndOffset[0]; + bufferOffset = indexAndOffset[1]; + aggBuffer = aggBuffers.get(bufferIndex).get(); } else { - // We lost a race - aggs = concurrentGet(prev); - // Free up the misfire - concurrentRemove(rowIndex); - // This is expected to occur ~80% of the time in the worst scenarios + bufferIndex = aggBuffers.size() - 1; + ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get(); + int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty() + ? null + : indexAndOffsets.get(indexAndOffsets.size() - 1); + + if (lastAggregatorsIndexAndOffset != null && lastAggregatorsIndexAndOffset[0] != bufferIndex) { + throw new ISE("last row's aggregate's buffer and last buffer index must be same"); + } + + bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0); + if (lastBuffer != null && + lastBuffer.capacity() - bufferOffset >= aggsTotalSize) { + aggBuffer = lastBuffer; + } else { + ResourceHolder bb = bufferPool.take(); + aggBuffers.add(bb); + bufferIndex = aggBuffers.size() - 1; + bufferOffset = 0; + aggBuffer = bb.get(); + } + + for (int i = 0; i < metrics.length; i++) { + getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); + } + + // Last ditch sanity checks + if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { + throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); + } + + final Integer rowIndex = indexIncrement.getAndIncrement(); + final Integer prev = facts.putIfAbsent(key, rowIndex); + if (null == prev) { + numEntries.incrementAndGet(); + indexAndOffsets.add(new int[]{bufferIndex, bufferOffset}); + } else { + throw new ISE("WTF! we are in sychronized block."); + } } } rowContainer.set(row); - for (Aggregator agg : aggs) { + for (int i = 0; i < metrics.length; i++) { + final BufferAggregator agg = getAggs()[i]; + synchronized (agg) { - agg.aggregate(); + agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); } } - rowContainer.set(null); - - return numEntries.get(); } - protected Aggregator[] concurrentGet(int offset) - { - // All get operations should be fine - return aggregators.get(offset); - } - - protected void concurrentSet(int offset, Aggregator[] value) - { - aggregators.put(offset, value); - } - - protected void concurrentRemove(int offset) - { - aggregators.remove(offset); - } - @Override public boolean canAppendRow() { @@ -228,229 +275,75 @@ public class OnheapIncrementalIndex extends IncrementalIndex } @Override - protected Aggregator[] getAggsForRow(int rowOffset) + protected BufferAggregator[] getAggsForRow(int rowOffset) { - return concurrentGet(rowOffset); + return getAggs(); } @Override - protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition) + protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) { - return agg.get(); + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]); } @Override public float getMetricFloatValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getFloat(); + BufferAggregator agg = getAggs()[aggOffset]; + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @Override public long getMetricLongValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getLong(); + BufferAggregator agg = getAggs()[aggOffset]; + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @Override public Object getMetricObjectValue(int rowOffset, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].get(); + BufferAggregator agg = getAggs()[aggOffset]; + int[] indexAndOffset = indexAndOffsets.get(rowOffset); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } /** - * Clear out maps to allow GC * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing */ @Override public void close() { super.close(); - aggregators.clear(); facts.clear(); + indexAndOffsets.clear(); + if (selectors != null) { selectors.clear(); } - } - private static class OnHeapDimDim implements DimDim - { - private final Map valueToId = Maps.newHashMap(); - - private final List idToValue = Lists.newArrayList(); - private final Object lock; - - public OnHeapDimDim(Object lock) - { - this.lock = lock; - } - - public int getId(String value) - { - synchronized (lock) { - final Integer id = valueToId.get(value); - return id == null ? -1 : id; - } - } - - public String getValue(int id) - { - synchronized (lock) { - return idToValue.get(id); - } - } - - public boolean contains(String value) - { - synchronized (lock) { - return valueToId.containsKey(value); - } - } - - public int size() - { - synchronized (lock) { - return valueToId.size(); - } - } - - public int add(String value) - { - synchronized (lock) { - Integer prev = valueToId.get(value); - if (prev != null) { - return prev; + RuntimeException ex = null; + for (ResourceHolder buffHolder : aggBuffers) { + try { + buffHolder.close(); + } catch(IOException ioe) { + if (ex == null) { + ex = Throwables.propagate(ioe); + } else { + ex.addSuppressed(ioe); } - final int index = size(); - valueToId.put(value, index); - idToValue.add(value); - return index; } } - - public OnHeapDimLookup sort() - { - synchronized (lock) { - return new OnHeapDimLookup(idToValue, size()); - } + aggBuffers.clear(); + if (ex != null) { + throw ex; } } - - private static class OnHeapDimLookup implements SortedDimLookup - { - private final String[] sortedVals; - private final int[] idToIndex; - private final int[] indexToId; - - public OnHeapDimLookup(List idToValue, int length) - { - Map sortedMap = Maps.newTreeMap(); - for (int id = 0; id < length; id++) { - sortedMap.put(idToValue.get(id), id); - } - this.sortedVals = sortedMap.keySet().toArray(new String[length]); - this.idToIndex = new int[length]; - this.indexToId = new int[length]; - int index = 0; - for (Integer id : sortedMap.values()) { - idToIndex[id] = index; - indexToId[index] = id; - index++; - } - } - - @Override - public int size() - { - return sortedVals.length; - } - - @Override - public int indexToId(int index) - { - return indexToId[index]; - } - - @Override - public String getValue(int index) - { - return sortedVals[index]; - } - - @Override - public int idToIndex(int id) - { - return idToIndex[id]; - } - } - - // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. - // In general the selectorFactory need not to thread-safe. - // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. - private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory - { - private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap objectColumnSelectorMap = Maps.newConcurrentMap(); - private final ColumnSelectorFactory delegate; - - public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate) - { - this.delegate = delegate; - } - - @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) - { - return delegate.makeDimensionSelector(dimensionSpec); - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName); - FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - - @Override - public LongColumnSelector makeLongColumnSelector(String columnName) - { - LongColumnSelector existing = longColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName); - LongColumnSelector prev = longColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String columnName) - { - ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName); - ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - } - } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index be1aeb2d5e4..9676ee67b32 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -272,7 +272,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - private static class OnHeapDimDim implements DimDim + static class OnHeapDimDim implements DimDim { private final Map valueToId = Maps.newHashMap(); @@ -335,7 +335,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - private static class OnHeapDimLookup implements SortedDimLookup + static class OnHeapDimLookup implements SortedDimLookup { private final String[] sortedVals; private final int[] idToIndex; @@ -386,7 +386,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. // In general the selectorFactory need not to thread-safe. // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. - private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory + static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory { private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index b769c6ba3fb..0658e02fc74 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -19,6 +19,7 @@ package io.druid.segment.data; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -32,6 +33,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionsSpec; @@ -52,19 +54,23 @@ import io.druid.query.timeseries.TimeseriesQueryEngine; import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.CloserRule; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -92,6 +98,9 @@ public class IncrementalIndexTest private final IndexCreator indexCreator; + @Rule + public final CloserRule closer = new CloserRule(false); + public IncrementalIndexTest( IndexCreator indexCreator ) @@ -120,7 +129,19 @@ public class IncrementalIndexTest @Override public IncrementalIndex createIndex(AggregatorFactory[] factories) { - return IncrementalIndexTest.createIndex(factories); + return new OffheapIncrementalIndex( + 0L, QueryGranularity.NONE, factories, 1000000, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); } } } @@ -207,7 +228,8 @@ public class IncrementalIndexTest public void testCaseSensitivity() throws Exception { long timestamp = System.currentTimeMillis(); - IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories)); + populateIndex(timestamp, index); Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames()); Assert.assertEquals(2, index.size()); @@ -222,8 +244,6 @@ public class IncrementalIndexTest Assert.assertEquals(timestamp, row.getTimestampFromEpoch()); Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1")); Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2")); - - index.close(); } @Test @@ -247,11 +267,14 @@ public class IncrementalIndexTest ); } - final IncrementalIndex index = indexCreator.createIndex( - ingestAggregatorFactories.toArray( - new AggregatorFactory[ingestAggregatorFactories.size()] + final IncrementalIndex index = closer.closeLater( + indexCreator.createIndex( + ingestAggregatorFactories.toArray( + new AggregatorFactory[ingestAggregatorFactories.size()] + ) ) ); + final long timestamp = System.currentTimeMillis(); final int rows = 50; @@ -320,8 +343,6 @@ public class IncrementalIndexTest result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue() ); } - - index.close(); } @Test(timeout = 60_000L) @@ -363,7 +384,9 @@ public class IncrementalIndexTest } - final IncrementalIndex index = indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount])); + final IncrementalIndex index = closer.closeLater( + indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[dimensionCount])) + ); final int concurrentThreads = 2; final int elementsPerThread = 10_000; final ListeningExecutorService indexExecutor = MoreExecutors.listeningDecorator( @@ -537,14 +560,12 @@ public class IncrementalIndexTest ); } } - - index.close(); } @Test public void testConcurrentAdd() throws Exception { - final IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories); + final IncrementalIndex index = closer.closeLater(indexCreator.createIndex(defaultAggregatorFactories)); final int threadCount = 10; final int elementsPerThread = 200; final int dimensionCount = 5; @@ -584,8 +605,6 @@ public class IncrementalIndexTest curr++; } Assert.assertEquals(elementsPerThread, curr); - - index.close(); } @Test @@ -611,8 +630,8 @@ public class IncrementalIndexTest true, 1000000 ); - Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); + closer.closeLater(incrementalIndex); - incrementalIndex.close(); + Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); } } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 7d7e53dabdb..86777202051 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -19,19 +19,24 @@ package io.druid.segment.incremental; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.ISE; +import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.segment.CloserRule; import org.joda.time.DateTime; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -46,6 +51,9 @@ public class IncrementalIndexTest IncrementalIndex createIndex(); } + @Rule + public final CloserRule closer = new CloserRule(false); + private final IndexCreator indexCreator; public IncrementalIndexTest(IndexCreator IndexCreator) @@ -70,6 +78,31 @@ public class IncrementalIndexTest } } + }, + { + new IndexCreator() + { + @Override + public IncrementalIndex createIndex() + { + return new OffheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + 1000000, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); + } + } } } @@ -79,7 +112,7 @@ public class IncrementalIndexTest @Test(expected = ISE.class) public void testDuplicateDimensions() throws IndexSizeExceededException { - IncrementalIndex index = indexCreator.createIndex(); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( new DateTime().minus(1).getMillis(), @@ -99,7 +132,7 @@ public class IncrementalIndexTest @Test(expected = ISE.class) public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException { - IncrementalIndex index = indexCreator.createIndex(); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( new DateTime().minus(1).getMillis(), @@ -112,7 +145,7 @@ public class IncrementalIndexTest @Test public void controlTest() throws IndexSizeExceededException { - IncrementalIndex index = indexCreator.createIndex(); + IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( new DateTime().minus(1).getMillis(), From b40c342cd118f3b7a4393c5449b829cbe844c5c8 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sat, 23 Jan 2016 00:12:22 -0600 Subject: [PATCH 3/4] make Global stupid pool cache size configurable --- .../java/io/druid/collections/StupidPool.java | 21 +++++- docs/content/configuration/broker.md | 1 + docs/content/configuration/historical.md | 1 + .../io/druid/query/DruidProcessingConfig.java | 6 ++ .../query/DruidProcessingConfigTest.java | 66 +++++++++++++++++++ .../io/druid/guice/DruidProcessingModule.java | 2 +- .../io/druid/offheap/OffheapBufferPool.java | 5 +- 7 files changed, 97 insertions(+), 5 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java diff --git a/common/src/main/java/io/druid/collections/StupidPool.java b/common/src/main/java/io/druid/collections/StupidPool.java index 6ba14099a26..8c5937ad335 100644 --- a/common/src/main/java/io/druid/collections/StupidPool.java +++ b/common/src/main/java/io/druid/collections/StupidPool.java @@ -38,11 +38,24 @@ public class StupidPool private final Queue objects = new ConcurrentLinkedQueue<>(); + //note that this is just the max entries in the cache, pool can still create as many buffers as needed. + private final int objectsCacheMaxCount; + public StupidPool( Supplier generator ) { this.generator = generator; + this.objectsCacheMaxCount = Integer.MAX_VALUE; + } + + public StupidPool( + Supplier generator, + int objectsCacheMaxCount + ) + { + this.generator = generator; + this.objectsCacheMaxCount = objectsCacheMaxCount; } public ResourceHolder take() @@ -80,8 +93,12 @@ public class StupidPool log.warn(new ISE("Already Closed!"), "Already closed"); return; } - if (!objects.offer(object)) { - log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object); + if (objects.size() < objectsCacheMaxCount) { + if (!objects.offer(object)) { + log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object); + } + } else { + log.debug("cache num entries is exceeding max limit [%s]", objectsCacheMaxCount); } } diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 9f74731b7a9..8cc0ff031ea 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -55,6 +55,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally, |Property|Description|Default| |--------|-----------|-------| |`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)| +|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE| |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 86c6330ffb7..57264a844ea 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -53,6 +53,7 @@ Druid uses Jetty to serve HTTP requests. |Property|Description|Default| |--------|-----------|-------| |`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)| +|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE| |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| diff --git a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java index 6c99e6ec9c0..73f7b007a37 100644 --- a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java @@ -31,6 +31,12 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem return 1024 * 1024 * 1024; } + @Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"}) + public int poolCacheMaxCount() + { + return Integer.MAX_VALUE; + } + @Override @Config(value = "${base_path}.numThreads") public int getNumThreads() { diff --git a/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java new file mode 100644 index 00000000000..9d3a4edfb0b --- /dev/null +++ b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableMap; +import com.metamx.common.config.Config; +import org.junit.Assert; +import org.junit.Test; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; + +/** + */ +public class DruidProcessingConfigTest +{ + + @Test + public void testDeserialization() throws Exception + { + ConfigurationObjectFactory factory = Config.createFactory(new Properties()); + + //with defaults + DruidProcessingConfig config = factory.build(DruidProcessingConfig.class); + + Assert.assertEquals(1024 * 1024 * 1024, config.intermediateComputeSizeBytes()); + Assert.assertEquals(Integer.MAX_VALUE, config.poolCacheMaxCount()); + Assert.assertTrue(config.getNumThreads() < Runtime.getRuntime().availableProcessors()); + Assert.assertEquals(0, config.columnCacheSizeBytes()); + Assert.assertFalse(config.isFifo()); + + //with non-defaults + Properties props = new Properties(); + props.setProperty("druid.processing.buffer.sizeBytes", "1"); + props.setProperty("druid.processing.buffer.poolCacheMaxCount", "1"); + props.setProperty("druid.processing.numThreads", "5"); + props.setProperty("druid.processing.columnCache.sizeBytes", "1"); + props.setProperty("druid.processing.fifo", "true"); + + factory = Config.createFactory(props); + config = factory.buildWithReplacements(DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); + + Assert.assertEquals(1, config.intermediateComputeSizeBytes()); + Assert.assertEquals(1, config.poolCacheMaxCount()); + Assert.assertEquals(5, config.getNumThreads()); + Assert.assertEquals(1, config.columnCacheSizeBytes()); + Assert.assertTrue(config.isFifo()); + } +} diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index aafd142a298..15fb776abc9 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -125,7 +125,7 @@ public class DruidProcessingModule implements Module log.info(e.getMessage()); } - return new OffheapBufferPool(config.intermediateComputeSizeBytes()); + return new OffheapBufferPool(config.intermediateComputeSizeBytes(), config.poolCacheMaxCount()); } diff --git a/server/src/main/java/io/druid/offheap/OffheapBufferPool.java b/server/src/main/java/io/druid/offheap/OffheapBufferPool.java index 4d3df2fc90d..aab3ffa7c18 100644 --- a/server/src/main/java/io/druid/offheap/OffheapBufferPool.java +++ b/server/src/main/java/io/druid/offheap/OffheapBufferPool.java @@ -31,7 +31,7 @@ public class OffheapBufferPool extends StupidPool { private static final Logger log = new Logger(OffheapBufferPool.class); - public OffheapBufferPool(final int computationBufferSize) + public OffheapBufferPool(final int computationBufferSize, final int cacheMaxCount) { super( new Supplier() @@ -47,7 +47,8 @@ public class OffheapBufferPool extends StupidPool ); return ByteBuffer.allocateDirect(computationBufferSize); } - } + }, + cacheMaxCount ); } } From 9fe1b28ee5451fb948a339157fed70a872c8e264 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sat, 23 Jan 2016 00:13:09 -0600 Subject: [PATCH 4/4] provide configuration to enable usage of Off heap merging for groupBy query --- .../query/groupby/GroupByQueryHelper.java | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index c0ab3f5913d..49caa8013bd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import java.nio.ByteBuffer; @@ -45,7 +46,6 @@ public class GroupByQueryHelper final GroupByQuery query, final GroupByQueryConfig config, StupidPool bufferPool - ) { final QueryGranularity gran = query.getGranularity(); @@ -77,15 +77,30 @@ public class GroupByQueryHelper } } ); - final IncrementalIndex index = new OnheapIncrementalIndex( - // use granularity truncated min timestamp - // since incoming truncated timestamps may precede timeStart - granTimeStart, - gran, - aggs.toArray(new AggregatorFactory[aggs.size()]), - false, - config.getMaxResults() - ); + final IncrementalIndex index; + + if (query.getContextValue("useOffheap", false)) { + index = new OffheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + false, + config.getMaxResults(), + bufferPool + ); + } else { + index = new OnheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + false, + config.getMaxResults() + ); + } Accumulator accumulator = new Accumulator() {