simplify IncrementalIndex since group-by v1 has been removed (#15448)

This commit is contained in:
Clint Wylie 2023-11-29 14:46:16 -08:00 committed by GitHub
parent 93cd638645
commit 0516d0dae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 234 additions and 364 deletions

View File

@ -392,7 +392,6 @@ public class GroupByTypeInterfaceBenchmark
{ {
return new OnheapIncrementalIndex.Builder() return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment) .setMaxRowCount(rowsPerSegment)
.build(); .build();
} }

View File

@ -135,7 +135,6 @@ public class IncrementalIndexRowTypeBenchmark
{ {
return appendableIndexSpec.builder() return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(aggs) .setSimpleTestingIndexSchema(aggs)
.setDeserializeComplexMetrics(false)
.setMaxRowCount(rowsPerSegment) .setMaxRowCount(rowsPerSegment)
.build(); .build();
} }

View File

@ -608,7 +608,6 @@ public class GroupByBenchmark
.withRollup(withRollup) .withRollup(withRollup)
.build() .build()
) )
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment) .setMaxRowCount(rowsPerSegment)
.build(); .build();
} }

View File

@ -88,7 +88,6 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest
.withMetrics(new CountAggregatorFactory("cnt")) .withMetrics(new CountAggregatorFactory("cnt"))
.build() .build()
) )
.setConcurrentEventAdd(true)
.setMaxRowCount(1000) .setMaxRowCount(1000)
.build(); .build();

View File

@ -330,7 +330,7 @@ public class InputRowSerde
writeString(k, out); writeString(k, out);
try (Aggregator agg = aggFactory.factorize( try (Aggregator agg = aggFactory.factorize(
IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, aggFactory, supplier, true) IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, aggFactory, supplier)
)) { )) {
try { try {
agg.aggregate(); agg.aggregate();

View File

@ -29,9 +29,6 @@ public abstract class AppendableIndexBuilder
{ {
@Nullable @Nullable
protected IncrementalIndexSchema incrementalIndexSchema = null; protected IncrementalIndexSchema incrementalIndexSchema = null;
protected boolean deserializeComplexMetrics = true;
protected boolean concurrentEventAdd = false;
protected boolean sortFacts = true;
protected int maxRowCount = 0; protected int maxRowCount = 0;
protected long maxBytesInMemory = 0; protected long maxBytesInMemory = 0;
// When set to true, for any row that already has metric (with the same name defined in metricSpec), // When set to true, for any row that already has metric (with the same name defined in metricSpec),
@ -88,18 +85,6 @@ public abstract class AppendableIndexBuilder
return this; return this;
} }
public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
{
this.deserializeComplexMetrics = deserializeComplexMetrics;
return this;
}
public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
{
this.concurrentEventAdd = concurrentEventAdd;
return this;
}
public AppendableIndexBuilder setMaxRowCount(final int maxRowCount) public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
{ {
this.maxRowCount = maxRowCount; this.maxRowCount = maxRowCount;

View File

@ -25,7 +25,6 @@ import com.google.common.base.Strings;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
@ -84,23 +83,15 @@ import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
public abstract class IncrementalIndex implements Iterable<Row>, Closeable, ColumnInspector public abstract class IncrementalIndex implements Iterable<Row>, Closeable, ColumnInspector
{ {
@ -109,22 +100,19 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
* *
* @param agg the aggregator * @param agg the aggregator
* @param in ingestion-time input row supplier * @param in ingestion-time input row supplier
* @param deserializeComplexMetrics whether complex objects should be deserialized by a {@link ComplexMetricExtractor}
*
* @return column selector factory * @return column selector factory
*/ */
public static ColumnSelectorFactory makeColumnSelectorFactory( public static ColumnSelectorFactory makeColumnSelectorFactory(
final VirtualColumns virtualColumns, final VirtualColumns virtualColumns,
final AggregatorFactory agg, final AggregatorFactory agg,
final Supplier<InputRow> in, final Supplier<InputRow> in
final boolean deserializeComplexMetrics
) )
{ {
// we use RowSignature.empty() because ColumnInspector here should be the InputRow schema, not the // we use RowSignature.empty() because ColumnInspector here should be the InputRow schema, not the
// IncrementalIndex schema, because we are reading values from the InputRow // IncrementalIndex schema, because we are reading values from the InputRow
final RowBasedColumnSelectorFactory<InputRow> baseSelectorFactory = RowBasedColumnSelectorFactory.create( final RowBasedColumnSelectorFactory<InputRow> baseSelectorFactory = RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(), RowAdapters.standardRow(),
in::get, in,
RowSignature.empty(), RowSignature.empty(),
true, true,
true true
@ -135,11 +123,9 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
@Override @Override
public ColumnValueSelector<?> makeColumnValueSelector(final String column) public ColumnValueSelector<?> makeColumnValueSelector(final String column)
{ {
final boolean isComplexMetric = agg.getIntermediateType().is(ValueType.COMPLEX);
final ColumnValueSelector selector = baseSelectorFactory.makeColumnValueSelector(column); final ColumnValueSelector selector = baseSelectorFactory.makeColumnValueSelector(column);
if (!isComplexMetric || !deserializeComplexMetrics) { if (!agg.getIntermediateType().is(ValueType.COMPLEX)) {
return selector; return selector;
} else { } else {
// Wrap selector in a special one that uses ComplexMetricSerde to modify incoming objects. // Wrap selector in a special one that uses ComplexMetricSerde to modify incoming objects.
@ -226,7 +212,6 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
private final List<Function<InputRow, InputRow>> rowTransformers; private final List<Function<InputRow, InputRow>> rowTransformers;
private final VirtualColumns virtualColumns; private final VirtualColumns virtualColumns;
private final AggregatorFactory[] metrics; private final AggregatorFactory[] metrics;
private final boolean deserializeComplexMetrics;
private final Metadata metadata; private final Metadata metadata;
protected final boolean preserveExistingMetrics; protected final boolean preserveExistingMetrics;
@ -252,16 +237,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
/** /**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
* <p>
* Set concurrentEventAdd to true to indicate that adding of input row should be thread-safe (for example, groupBy
* where the multiple threads can add concurrently to the IncrementalIndex).
*
* @param incrementalIndexSchema the schema to use for incremental index * @param incrementalIndexSchema the schema to use for incremental index
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
* @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe
* @param preserveExistingMetrics When set to true, for any row that already has metric * @param preserveExistingMetrics When set to true, for any row that already has metric
* (with the same name defined in metricSpec), the metric aggregator in metricSpec * (with the same name defined in metricSpec), the metric aggregator in metricSpec
* is skipped and the existing metric is unchanged. If the row does not already have * is skipped and the existing metric is unchanged. If the row does not already have
@ -273,8 +249,6 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
*/ */
protected IncrementalIndex( protected IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema, final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean concurrentEventAdd,
final boolean preserveExistingMetrics, final boolean preserveExistingMetrics,
final boolean useMaxMemoryEstimates final boolean useMaxMemoryEstimates
) )
@ -285,7 +259,6 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
this.virtualColumns = incrementalIndexSchema.getVirtualColumns(); this.virtualColumns = incrementalIndexSchema.getVirtualColumns();
this.metrics = incrementalIndexSchema.getMetrics(); this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = new CopyOnWriteArrayList<>(); this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.preserveExistingMetrics = preserveExistingMetrics; this.preserveExistingMetrics = preserveExistingMetrics;
this.useMaxMemoryEstimates = useMaxMemoryEstimates; this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec() this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec()
@ -303,7 +276,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
this.rollup this.rollup
); );
initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); initAggs(metrics, rowSupplier);
for (AggregatorFactory metric : metrics) { for (AggregatorFactory metric : metrics) {
MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric); MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric);
@ -359,9 +332,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
protected abstract void initAggs( protected abstract void initAggs(
AggregatorFactory[] metrics, AggregatorFactory[] metrics,
Supplier<InputRow> rowSupplier, Supplier<InputRow> rowSupplier
boolean deserializeComplexMetrics,
boolean concurrentEventAdd
); );
// Note: This method needs to be thread safe. // Note: This method needs to be thread safe.
@ -740,11 +711,6 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
return numEntries.get(); return numEntries.get();
} }
boolean getDeserializeComplexMetrics()
{
return deserializeComplexMetrics;
}
AtomicInteger getNumEntries() AtomicInteger getNumEntries()
{ {
return numEntries; return numEntries;
@ -1054,11 +1020,10 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
protected ColumnSelectorFactory makeColumnSelectorFactory( protected ColumnSelectorFactory makeColumnSelectorFactory(
final AggregatorFactory agg, final AggregatorFactory agg,
final Supplier<InputRow> in, final Supplier<InputRow> in
final boolean deserializeComplexMetrics
) )
{ {
return makeColumnSelectorFactory(virtualColumns, agg, in, deserializeComplexMetrics); return makeColumnSelectorFactory(virtualColumns, agg, in);
} }
protected final Comparator<IncrementalIndexRow> dimsComparator() protected final Comparator<IncrementalIndexRow> dimsComparator()
@ -1127,7 +1092,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
return true; return true;
} }
interface FactsHolder public interface FactsHolder
{ {
/** /**
* @return the previous rowIndex associated with the specified key, or * @return the previous rowIndex associated with the specified key, or
@ -1161,232 +1126,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
void clear(); void clear();
} }
static class RollupFactsHolder implements FactsHolder private final class LongMetricColumnSelector implements LongColumnSelector
{
private final boolean sortFacts;
// Can't use Set because we need to be able to get from collection
private final ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> facts;
private final List<DimensionDesc> dimensionDescsList;
RollupFactsHolder(
boolean sortFacts,
Comparator<IncrementalIndexRow> incrementalIndexRowComparator,
List<DimensionDesc> dimensionDescsList
)
{
this.sortFacts = sortFacts;
if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(incrementalIndexRowComparator);
} else {
this.facts = new ConcurrentHashMap<>();
}
this.dimensionDescsList = dimensionDescsList;
}
@Override
public int getPriorIndex(IncrementalIndexRow key)
{
IncrementalIndexRow row = facts.get(key);
return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : row.getRowIndex();
}
@Override
public long getMinTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).firstKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
}
@Override
public long getMaxTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).lastKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
}
@Override
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending && sortFacts) {
return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).descendingMap()
.keySet()
.iterator();
}
return keySet().iterator();
}
@Override
public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{
if (!sortFacts) {
throw new UnsupportedOperationException("can't get timeRange from unsorted facts data.");
}
IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new Object[]{}, dimensionDescsList);
IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new Object[]{}, dimensionDescsList);
ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> subMap =
((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).subMap(start, end);
ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap = descending ? subMap.descendingMap() : subMap;
return rangeMap.keySet();
}
@Override
public Iterable<IncrementalIndexRow> keySet()
{
return facts.keySet();
}
@Override
public Iterable<IncrementalIndexRow> persistIterable()
{
// with rollup, facts are already pre-sorted so just return keyset
return keySet();
}
@Override
public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
{
// setRowIndex() must be called before facts.putIfAbsent() for visibility of rowIndex from concurrent readers.
key.setRowIndex(rowIndex);
IncrementalIndexRow prev = facts.putIfAbsent(key, key);
return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : prev.getRowIndex();
}
@Override
public void clear()
{
facts.clear();
}
}
static class PlainFactsHolder implements FactsHolder
{
private final boolean sortFacts;
private final ConcurrentMap<Long, Deque<IncrementalIndexRow>> facts;
private final Comparator<IncrementalIndexRow> incrementalIndexRowComparator;
public PlainFactsHolder(boolean sortFacts, Comparator<IncrementalIndexRow> incrementalIndexRowComparator)
{
this.sortFacts = sortFacts;
if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>();
} else {
this.facts = new ConcurrentHashMap<>();
}
this.incrementalIndexRowComparator = incrementalIndexRowComparator;
}
@Override
public int getPriorIndex(IncrementalIndexRow key)
{
// always return EMPTY_ROW_INDEX to indicate that no prior key cause we always add new row
return IncrementalIndexRow.EMPTY_ROW_INDEX;
}
@Override
public long getMinTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts).firstKey();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
}
@Override
public long getMaxTimeMillis()
{
if (sortFacts) {
return ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts).lastKey();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
}
@Override
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending && sortFacts) {
return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
.descendingMap().values(), true).iterator();
}
return timeOrderedConcat(facts.values(), false).iterator();
}
@Override
public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{
ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts).subMap(timeStart, timeEnd);
final ConcurrentMap<Long, Deque<IncrementalIndexRow>> rangeMap = descending ? subMap.descendingMap() : subMap;
return timeOrderedConcat(rangeMap.values(), descending);
}
private Iterable<IncrementalIndexRow> timeOrderedConcat(
final Iterable<Deque<IncrementalIndexRow>> iterable,
final boolean descending
)
{
return () -> Iterators.concat(
Iterators.transform(
iterable.iterator(),
input -> descending ? input.descendingIterator() : input.iterator()
)
);
}
private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
final Collection<Deque<IncrementalIndexRow>> rowGroups
)
{
return rowGroups.stream()
.flatMap(Collection::stream)
.sorted(incrementalIndexRowComparator);
}
@Override
public Iterable<IncrementalIndexRow> keySet()
{
return timeOrderedConcat(facts.values(), false);
}
@Override
public Iterable<IncrementalIndexRow> persistIterable()
{
return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
}
@Override
public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
{
Long time = key.getTimestamp();
Deque<IncrementalIndexRow> rows = facts.get(time);
if (rows == null) {
facts.putIfAbsent(time, new ConcurrentLinkedDeque<>());
// in race condition, rows may be put by other thread, so always get latest status from facts
rows = facts.get(time);
}
// setRowIndex() must be called before rows.add() for visibility of rowIndex from concurrent readers.
key.setRowIndex(rowIndex);
rows.add(key);
// always return EMPTY_ROW_INDEX to indicate that we always add new row
return IncrementalIndexRow.EMPTY_ROW_INDEX;
}
@Override
public void clear()
{
facts.clear();
}
}
private class LongMetricColumnSelector implements LongColumnSelector
{ {
private final IncrementalIndexRowHolder currEntry; private final IncrementalIndexRowHolder currEntry;
private final int metricIndex; private final int metricIndex;
@ -1417,7 +1157,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
} }
} }
private class ObjectMetricColumnSelector extends ObjectColumnSelector private final class ObjectMetricColumnSelector extends ObjectColumnSelector
{ {
private final IncrementalIndexRowHolder currEntry; private final IncrementalIndexRowHolder currEntry;
private final int metricIndex; private final int metricIndex;
@ -1454,7 +1194,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
} }
} }
private class FloatMetricColumnSelector implements FloatColumnSelector private final class FloatMetricColumnSelector implements FloatColumnSelector
{ {
private final IncrementalIndexRowHolder currEntry; private final IncrementalIndexRowHolder currEntry;
private final int metricIndex; private final int metricIndex;
@ -1485,7 +1225,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
} }
} }
private class DoubleMetricColumnSelector implements DoubleColumnSelector private final class DoubleMetricColumnSelector implements DoubleColumnSelector
{ {
private final IncrementalIndexRowHolder currEntry; private final IncrementalIndexRowHolder currEntry;
private final int metricIndex; private final int metricIndex;

View File

@ -42,7 +42,7 @@ public final class IncrementalIndexRow
* rowIndex is not checked in {@link #equals} and {@link #hashCode} on purpose. IncrementalIndexRow acts as a Map key * rowIndex is not checked in {@link #equals} and {@link #hashCode} on purpose. IncrementalIndexRow acts as a Map key
* and "entry" object (rowIndex is the "value") at the same time. This is done to reduce object indirection and * and "entry" object (rowIndex is the "value") at the same time. This is done to reduce object indirection and
* improve locality, and avoid boxing of rowIndex as Integer, when stored in JDK collection: * improve locality, and avoid boxing of rowIndex as Integer, when stored in JDK collection:
* {@link IncrementalIndex.RollupFactsHolder} needs concurrent collections, that are not present in fastutil. * {@link OnheapIncrementalIndex.RollupFactsHolder} needs concurrent collections, that are not present in fastutil.
*/ */
private int rowIndex; private int rowIndex;
private long dimsKeySize; private long dimsKeySize;

View File

@ -47,14 +47,23 @@ import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream;
/** /**
* *
@ -72,7 +81,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION = 100; private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION = 100;
/** /**
* overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object * overhead per {@link ConcurrentSkipListMap.Node} object in facts table
*/ */
private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES; private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES;
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
@ -118,28 +127,19 @@ public class OnheapIncrementalIndex extends IncrementalIndex
OnheapIncrementalIndex( OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema, IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount, int maxRowCount,
long maxBytesInMemory, long maxBytesInMemory,
// preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can have existing metrics // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can
// This is currently only use by auto compaction and should not be use for anything else. // have existing metrics. This is currently only use by auto compaction and should not be use for anything else.
boolean preserveExistingMetrics, boolean preserveExistingMetrics,
boolean useMaxMemoryEstimates boolean useMaxMemoryEstimates
) )
{ {
super( super(incrementalIndexSchema, preserveExistingMetrics, useMaxMemoryEstimates);
incrementalIndexSchema,
deserializeComplexMetrics,
concurrentEventAdd,
preserveExistingMetrics,
useMaxMemoryEstimates
);
this.maxRowCount = maxRowCount; this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(dimsComparator(), getDimensions())
: new PlainFactsHolder(sortFacts, dimsComparator()); : new PlainFactsHolder(dimsComparator());
maxBytesPerRowForAggregators = maxBytesPerRowForAggregators =
useMaxMemoryEstimates ? getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0; useMaxMemoryEstimates ? getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0;
this.useMaxMemoryEstimates = useMaxMemoryEstimates; this.useMaxMemoryEstimates = useMaxMemoryEstimates;
@ -190,9 +190,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override @Override
protected void initAggs( protected void initAggs(
final AggregatorFactory[] metrics, final AggregatorFactory[] metrics,
final Supplier<InputRow> rowSupplier, final Supplier<InputRow> rowSupplier
final boolean deserializeComplexMetrics,
final boolean concurrentEventAdd
) )
{ {
selectors = new HashMap<>(); selectors = new HashMap<>();
@ -200,18 +198,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex
for (AggregatorFactory agg : metrics) { for (AggregatorFactory agg : metrics) {
selectors.put( selectors.put(
agg.getName(), agg.getName(),
new CachingColumnSelectorFactory( new CachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier))
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics),
concurrentEventAdd
)
); );
if (preserveExistingMetrics) { if (preserveExistingMetrics) {
AggregatorFactory combiningAgg = agg.getCombiningFactory(); AggregatorFactory combiningAgg = agg.getCombiningFactory();
combiningAggSelectors.put( combiningAggSelectors.put(
combiningAgg.getName(), combiningAgg.getName(),
new CachingColumnSelectorFactory( new CachingColumnSelectorFactory(
makeColumnSelectorFactory(combiningAgg, rowSupplier, deserializeComplexMetrics), makeColumnSelectorFactory(combiningAgg, rowSupplier)
concurrentEventAdd
) )
); );
} }
@ -550,6 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
* If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator * If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator
* for aggregating from input into output field and the aggregator for combining already aggregated field, as needed * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
*/ */
@Nullable
private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction) private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
{ {
if (preserveExistingMetrics) { if (preserveExistingMetrics) {
@ -605,18 +600,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex
*/ */
static class CachingColumnSelectorFactory implements ColumnSelectorFactory static class CachingColumnSelectorFactory implements ColumnSelectorFactory
{ {
private final Map<String, ColumnValueSelector<?>> columnSelectorMap; private final HashMap<String, ColumnValueSelector<?>> columnSelectorMap;
private final ColumnSelectorFactory delegate; private final ColumnSelectorFactory delegate;
public CachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd) public CachingColumnSelectorFactory(ColumnSelectorFactory delegate)
{ {
this.delegate = delegate; this.delegate = delegate;
this.columnSelectorMap = new HashMap<>();
if (concurrentEventAdd) {
columnSelectorMap = new ConcurrentHashMap<>();
} else {
columnSelectorMap = new HashMap<>();
}
} }
@Override @Override
@ -628,7 +618,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override @Override
public ColumnValueSelector<?> makeColumnValueSelector(String columnName) public ColumnValueSelector<?> makeColumnValueSelector(String columnName)
{ {
ColumnValueSelector existing = columnSelectorMap.get(columnName); ColumnValueSelector<?> existing = columnSelectorMap.get(columnName);
if (existing != null) { if (existing != null) {
return existing; return existing;
} }
@ -656,9 +646,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
{ {
return new OnheapIncrementalIndex( return new OnheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"), Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
deserializeComplexMetrics,
concurrentEventAdd,
sortFacts,
maxRowCount, maxRowCount,
maxBytesInMemory, maxBytesInMemory,
preserveExistingMetrics, preserveExistingMetrics,
@ -734,4 +721,194 @@ public class OnheapIncrementalIndex extends IncrementalIndex
return Objects.hash(preserveExistingMetrics); return Objects.hash(preserveExistingMetrics);
} }
} }
static final class RollupFactsHolder implements FactsHolder
{
// Can't use Set because we need to be able to get from collection
private final ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> facts;
private final List<DimensionDesc> dimensionDescsList;
RollupFactsHolder(
Comparator<IncrementalIndexRow> incrementalIndexRowComparator,
List<DimensionDesc> dimensionDescsList
)
{
this.facts = new ConcurrentSkipListMap<>(incrementalIndexRowComparator);
this.dimensionDescsList = dimensionDescsList;
}
@Override
public int getPriorIndex(IncrementalIndexRow key)
{
IncrementalIndexRow row = facts.get(key);
return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : row.getRowIndex();
}
@Override
public long getMinTimeMillis()
{
return facts.firstKey().getTimestamp();
}
@Override
public long getMaxTimeMillis()
{
return facts.lastKey().getTimestamp();
}
@Override
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending) {
return facts.descendingMap()
.keySet()
.iterator();
}
return keySet().iterator();
}
@Override
public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{
IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new Object[]{}, dimensionDescsList);
IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new Object[]{}, dimensionDescsList);
ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> subMap = facts.subMap(start, end);
ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap = descending ? subMap.descendingMap() : subMap;
return rangeMap.keySet();
}
@Override
public Iterable<IncrementalIndexRow> keySet()
{
return facts.keySet();
}
@Override
public Iterable<IncrementalIndexRow> persistIterable()
{
// with rollup, facts are already pre-sorted so just return keyset
return keySet();
}
@Override
public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
{
// setRowIndex() must be called before facts.putIfAbsent() for visibility of rowIndex from concurrent readers.
key.setRowIndex(rowIndex);
IncrementalIndexRow prev = facts.putIfAbsent(key, key);
return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : prev.getRowIndex();
}
@Override
public void clear()
{
facts.clear();
}
}
static final class PlainFactsHolder implements FactsHolder
{
private final ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> facts;
private final Comparator<IncrementalIndexRow> incrementalIndexRowComparator;
public PlainFactsHolder(Comparator<IncrementalIndexRow> incrementalIndexRowComparator)
{
this.facts = new ConcurrentSkipListMap<>();
this.incrementalIndexRowComparator = incrementalIndexRowComparator;
}
@Override
public int getPriorIndex(IncrementalIndexRow key)
{
// always return EMPTY_ROW_INDEX to indicate that no prior key cause we always add new row
return IncrementalIndexRow.EMPTY_ROW_INDEX;
}
@Override
public long getMinTimeMillis()
{
return facts.firstKey();
}
@Override
public long getMaxTimeMillis()
{
return facts.lastKey();
}
@Override
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending) {
return timeOrderedConcat(facts.descendingMap().values(), true).iterator();
}
return timeOrderedConcat(facts.values(), false).iterator();
}
@Override
public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{
ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap = facts.subMap(timeStart, timeEnd);
final ConcurrentMap<Long, Deque<IncrementalIndexRow>> rangeMap = descending ? subMap.descendingMap() : subMap;
return timeOrderedConcat(rangeMap.values(), descending);
}
private Iterable<IncrementalIndexRow> timeOrderedConcat(
final Iterable<Deque<IncrementalIndexRow>> iterable,
final boolean descending
)
{
return () -> Iterators.concat(
Iterators.transform(
iterable.iterator(),
input -> descending ? input.descendingIterator() : input.iterator()
)
);
}
private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
final Collection<Deque<IncrementalIndexRow>> rowGroups
)
{
return rowGroups.stream()
.flatMap(Collection::stream)
.sorted(incrementalIndexRowComparator);
}
@Override
public Iterable<IncrementalIndexRow> keySet()
{
return timeOrderedConcat(facts.values(), false);
}
@Override
public Iterable<IncrementalIndexRow> persistIterable()
{
return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
}
@Override
public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
{
Long time = key.getTimestamp();
Deque<IncrementalIndexRow> rows = facts.get(time);
if (rows == null) {
facts.putIfAbsent(time, new ConcurrentLinkedDeque<>());
// in race condition, rows may be put by other thread, so always get latest status from facts
rows = facts.get(time);
}
// setRowIndex() must be called before rows.add() for visibility of rowIndex from concurrent readers.
key.setRowIndex(rowIndex);
rows.add(key);
// always return EMPTY_ROW_INDEX to indicate that we always add new row
return IncrementalIndexRow.EMPTY_ROW_INDEX;
}
@Override
public void clear()
{
facts.clear();
}
}
} }

View File

@ -480,7 +480,6 @@ public class AggregationTestHelper implements Closeable
outDir, outDir,
minTimestamp, minTimestamp,
gran, gran,
true,
maxRowCount, maxRowCount,
rollup rollup
); );
@ -498,7 +497,6 @@ public class AggregationTestHelper implements Closeable
File outDir, File outDir,
long minTimestamp, long minTimestamp,
Granularity gran, Granularity gran,
boolean deserializeComplexMetrics,
int maxRowCount, int maxRowCount,
boolean rollup boolean rollup
) throws Exception ) throws Exception
@ -517,7 +515,6 @@ public class AggregationTestHelper implements Closeable
.withRollup(rollup) .withRollup(rollup)
.build() .build()
) )
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount) .setMaxRowCount(maxRowCount)
.build(); .build();
@ -538,7 +535,6 @@ public class AggregationTestHelper implements Closeable
.withRollup(rollup) .withRollup(rollup)
.build() .build()
) )
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount) .setMaxRowCount(maxRowCount)
.build(); .build();
} }
@ -594,7 +590,6 @@ public class AggregationTestHelper implements Closeable
final AggregatorFactory[] metrics, final AggregatorFactory[] metrics,
long minTimestamp, long minTimestamp,
Granularity gran, Granularity gran,
boolean deserializeComplexMetrics,
int maxRowCount, int maxRowCount,
boolean rollup boolean rollup
) throws Exception ) throws Exception
@ -609,7 +604,6 @@ public class AggregationTestHelper implements Closeable
.withRollup(rollup) .withRollup(rollup)
.build() .build()
) )
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount) .setMaxRowCount(maxRowCount)
.build(); .build();
@ -636,7 +630,6 @@ public class AggregationTestHelper implements Closeable
final AggregatorFactory[] metrics, final AggregatorFactory[] metrics,
long minTimestamp, long minTimestamp,
Granularity gran, Granularity gran,
boolean deserializeComplexMetrics,
int maxRowCount, int maxRowCount,
boolean rollup boolean rollup
) throws Exception ) throws Exception
@ -648,7 +641,6 @@ public class AggregationTestHelper implements Closeable
metrics, metrics,
minTimestamp, minTimestamp,
gran, gran,
deserializeComplexMetrics,
maxRowCount, maxRowCount,
rollup rollup
); );

View File

@ -107,7 +107,6 @@ public class StringColumnAggregationTest
new AggregatorFactory[]{new CountAggregatorFactory("count")}, new AggregatorFactory[]{new CountAggregatorFactory("count")},
0, 0,
Granularities.NONE, Granularities.NONE,
false,
100, 100,
false false
); );

View File

@ -101,7 +101,6 @@ public class SimpleTestIndex
}, },
0, 0,
Granularities.NONE, Granularities.NONE,
false,
100, 100,
false false
); );

View File

@ -138,7 +138,6 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH
.withRollup(withRollup) .withRollup(withRollup)
.build() .build()
) )
.setConcurrentEventAdd(true)
.setMaxRowCount(1000) .setMaxRowCount(1000)
.build(); .build();
} }

View File

@ -154,7 +154,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest
.withRollup(withRollup) .withRollup(withRollup)
.build() .build()
) )
.setConcurrentEventAdd(true)
.setMaxRowCount(1000) .setMaxRowCount(1000)
.build(); .build();
} }

View File

@ -130,7 +130,6 @@ public class GroupByMultiSegmentTest
.withRollup(withRollup) .withRollup(withRollup)
.build() .build()
) )
.setConcurrentEventAdd(true)
.setMaxRowCount(1000) .setMaxRowCount(1000)
.build(); .build();
} }

View File

@ -138,7 +138,6 @@ public class GroupByQueryRunnerFactoryTest
{ {
IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder() IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setConcurrentEventAdd(true)
.setMaxRowCount(5000) .setMaxRowCount(5000)
.build(); .build();

View File

@ -132,7 +132,6 @@ public class NestedQueryPushDownTest extends InitializedNullHandlingTest
)) ))
.build() .build()
) )
.setConcurrentEventAdd(true)
.setMaxRowCount(1000) .setMaxRowCount(1000)
.build(); .build();
} }

View File

@ -180,7 +180,7 @@ public class IncrementalIndexCreator implements Closeable
* *
* For example, for a parameterized test with the following constrctor: * For example, for a parameterized test with the following constrctor:
* {@code * {@code
* public IncrementalIndexTest(String indexType, String mode, boolean deserializeComplexMetrics) * public IncrementalIndexTest(String indexType, String mode)
* { * {
* ... * ...
* } * }
@ -188,12 +188,11 @@ public class IncrementalIndexCreator implements Closeable
* *
* we can test all the input combinations as follows: * we can test all the input combinations as follows:
* {@code * {@code
* @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}") * @Parameterized.Parameters(name = "{index}: {0}, {1}")
* public static Collection<?> constructorFeeder() * public static Collection<?> constructorFeeder()
* { * {
* return IncrementalIndexCreator.indexTypeCartesianProduct( * return IncrementalIndexCreator.indexTypeCartesianProduct(
* ImmutableList.of("rollup", "plain"), * ImmutableList.of("rollup", "plain")
* ImmutableList.of(true, false)
* ); * );
* } * }
* } * }

View File

@ -67,8 +67,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
public IncrementalIndexTest( public IncrementalIndexTest(
String indexType, String indexType,
String mode, String mode
boolean deserializeComplexMetrics
) throws JsonProcessingException ) throws JsonProcessingException
{ {
this.mode = mode; this.mode = mode;
@ -101,18 +100,16 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.build(); .build();
indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
.setIndexSchema(schema) .setIndexSchema(schema)
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(1_000_000) .setMaxRowCount(1_000_000)
.build()) .build())
); );
} }
@Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}") @Parameterized.Parameters(name = "{index}: {0}, {1}")
public static Collection<?> constructorFeeder() public static Collection<?> constructorFeeder()
{ {
return IncrementalIndexCreator.indexTypeCartesianProduct( return IncrementalIndexCreator.indexTypeCartesianProduct(
ImmutableList.of("rollup", "plain"), ImmutableList.of("rollup", "plain")
ImmutableList.of(true, false)
); );
} }

View File

@ -110,18 +110,12 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
public MapIncrementalIndex( public MapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema, IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount, int maxRowCount,
long maxBytesInMemory long maxBytesInMemory
) )
{ {
super( super(
incrementalIndexSchema, incrementalIndexSchema,
deserializeComplexMetrics,
concurrentEventAdd,
sortFacts,
maxRowCount, maxRowCount,
maxBytesInMemory, maxBytesInMemory,
false, false,
@ -143,9 +137,6 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
.withQueryGranularity(gran) .withQueryGranularity(gran)
.withMetrics(metrics) .withMetrics(metrics)
.build(), .build(),
true,
false,
true,
maxRowCount, maxRowCount,
maxBytesInMemory, maxBytesInMemory,
false, false,
@ -190,7 +181,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
for (int i = 0; i < metrics.length; i++) { for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i]; final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize( aggs[i] = agg.factorize(
makeColumnSelectorFactory(agg, rowSupplier, getDeserializeComplexMetrics()) makeColumnSelectorFactory(agg, rowSupplier)
); );
} }
Integer rowIndex; Integer rowIndex;