Merge pull request #2228 from metamx/incremental-index-mem2

Improve heap usage for IncrementalIndex
This commit is contained in:
Fangjin Yang 2016-01-13 14:48:03 -08:00
commit 4c014c1574
2 changed files with 91 additions and 1 deletions

View File

@ -355,6 +355,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
boolean deserializeComplexMetrics boolean deserializeComplexMetrics
); );
// Note: This method needs to be thread safe.
protected abstract Integer addToFacts( protected abstract Integer addToFacts(
AggregatorFactory[] metrics, AggregatorFactory[] metrics,
boolean deserializeComplexMetrics, boolean deserializeComplexMetrics,

View File

@ -28,6 +28,12 @@ import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
@ -45,6 +51,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>(); private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>();
private final AtomicInteger indexIncrement = new AtomicInteger(0); private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount; protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
private String outOfRowsReason = null; private String outOfRowsReason = null;
@ -118,6 +125,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
) )
{ {
selectors = Maps.newHashMap();
for (AggregatorFactory agg : metrics) {
selectors.put(
agg.getName(),
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics))
);
}
return new Aggregator[metrics.length]; return new Aggregator[metrics.length];
} }
@ -144,7 +159,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
for (int i = 0; i < metrics.length; i++) { for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i]; final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize( aggs[i] = agg.factorize(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) selectors.get(agg.getName())
); );
} }
final Integer rowIndex = indexIncrement.getAndIncrement(); final Integer rowIndex = indexIncrement.getAndIncrement();
@ -253,6 +268,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
super.close(); super.close();
aggregators.clear(); aggregators.clear();
facts.clear(); facts.clear();
if (selectors != null) {
selectors.clear();
}
} }
private static class OnHeapDimDim implements DimDim private static class OnHeapDimDim implements DimDim
@ -345,4 +363,75 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
return s1 == s2; return s1 == s2;
} }
} }
// Caches references to selector objetcs for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
{
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, ObjectColumnSelector> objectColumnSelectorMap = Maps.newConcurrentMap();
private final ColumnSelectorFactory delegate;
public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate)
{
this.delegate = delegate;
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return delegate.makeDimensionSelector(dimensionSpec);
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName);
FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
LongColumnSelector existing = longColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName);
LongColumnSelector prev = longColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName);
ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
}
} }