mirror of https://github.com/apache/druid.git
cache metric selectors instead of creating new ones for every metric in each row
clear selectors on close. Add comments about thread safety.
This commit is contained in:
parent
01a0715ee2
commit
4863e2ca4f
|
@ -355,6 +355,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
boolean deserializeComplexMetrics
|
||||
);
|
||||
|
||||
// Note: This method needs to be thread safe.
|
||||
protected abstract Integer addToFacts(
|
||||
AggregatorFactory[] metrics,
|
||||
boolean deserializeComplexMetrics,
|
||||
|
|
|
@ -28,6 +28,12 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
@ -45,6 +51,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>();
|
||||
private final AtomicInteger indexIncrement = new AtomicInteger(0);
|
||||
protected final int maxRowCount;
|
||||
private volatile Map<String, ColumnSelectorFactory> selectors;
|
||||
|
||||
private String outOfRowsReason = null;
|
||||
|
||||
|
@ -118,6 +125,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
|
||||
)
|
||||
{
|
||||
selectors = Maps.newHashMap();
|
||||
for (AggregatorFactory agg : metrics) {
|
||||
selectors.put(
|
||||
agg.getName(),
|
||||
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics))
|
||||
);
|
||||
}
|
||||
|
||||
return new Aggregator[metrics.length];
|
||||
}
|
||||
|
||||
|
@ -144,7 +159,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
for (int i = 0; i < metrics.length; i++) {
|
||||
final AggregatorFactory agg = metrics[i];
|
||||
aggs[i] = agg.factorize(
|
||||
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
|
||||
selectors.get(agg.getName())
|
||||
);
|
||||
}
|
||||
final Integer rowIndex = indexIncrement.getAndIncrement();
|
||||
|
@ -253,6 +268,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
super.close();
|
||||
aggregators.clear();
|
||||
facts.clear();
|
||||
if (selectors != null) {
|
||||
selectors.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private static class OnHeapDimDim implements DimDim
|
||||
|
@ -345,4 +363,75 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
return s1 == s2;
|
||||
}
|
||||
}
|
||||
|
||||
// Caches references to selector objetcs for each column instead of creating a new object each time in order to save heap space.
|
||||
// In general the selectorFactory need not to thread-safe.
|
||||
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
|
||||
private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
|
||||
{
|
||||
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
|
||||
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();
|
||||
private final ConcurrentMap<String, ObjectColumnSelector> objectColumnSelectorMap = Maps.newConcurrentMap();
|
||||
private final ColumnSelectorFactory delegate;
|
||||
|
||||
public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return delegate.makeDimensionSelector(dimensionSpec);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(String columnName)
|
||||
{
|
||||
FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
|
||||
if (existing != null) {
|
||||
return existing;
|
||||
} else {
|
||||
FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName);
|
||||
FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent(
|
||||
columnName,
|
||||
newSelector
|
||||
);
|
||||
return prev != null ? prev : newSelector;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(String columnName)
|
||||
{
|
||||
LongColumnSelector existing = longColumnSelectorMap.get(columnName);
|
||||
if (existing != null) {
|
||||
return existing;
|
||||
} else {
|
||||
LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName);
|
||||
LongColumnSelector prev = longColumnSelectorMap.putIfAbsent(
|
||||
columnName,
|
||||
newSelector
|
||||
);
|
||||
return prev != null ? prev : newSelector;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
|
||||
{
|
||||
ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName);
|
||||
if (existing != null) {
|
||||
return existing;
|
||||
} else {
|
||||
ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName);
|
||||
ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent(
|
||||
columnName,
|
||||
newSelector
|
||||
);
|
||||
return prev != null ? prev : newSelector;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue