From 0516d0dae432058848806e49e71ac73333bf751c Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 29 Nov 2023 14:46:16 -0800 Subject: [PATCH] simplify IncrementalIndex since group-by v1 has been removed (#15448) --- .../GroupByTypeInterfaceBenchmark.java | 1 - .../IncrementalIndexRowTypeBenchmark.java | 1 - .../benchmark/query/GroupByBenchmark.java | 1 - .../DistinctCountGroupByQueryTest.java | 1 - .../apache/druid/indexer/InputRowSerde.java | 2 +- .../incremental/AppendableIndexBuilder.java | 15 - .../segment/incremental/IncrementalIndex.java | 284 +----------------- .../incremental/IncrementalIndexRow.java | 2 +- .../incremental/OnheapIncrementalIndex.java | 249 ++++++++++++--- .../aggregation/AggregationTestHelper.java | 8 - .../StringColumnAggregationTest.java | 1 - .../aggregation/mean/SimpleTestIndex.java | 1 - ...ByLimitPushDownInsufficientBufferTest.java | 1 - ...roupByLimitPushDownMultiNodeMergeTest.java | 1 - .../groupby/GroupByMultiSegmentTest.java | 1 - .../GroupByQueryRunnerFactoryTest.java | 1 - .../groupby/NestedQueryPushDownTest.java | 1 - .../incremental/IncrementalIndexCreator.java | 7 +- .../incremental/IncrementalIndexTest.java | 9 +- .../OnheapIncrementalIndexBenchmark.java | 11 +- 20 files changed, 234 insertions(+), 364 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 2d902c12163..0fe549834f0 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -392,7 +392,6 @@ public class GroupByTypeInterfaceBenchmark { return new OnheapIncrementalIndex.Builder() .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) - .setConcurrentEventAdd(true) .setMaxRowCount(rowsPerSegment) .build(); } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java index 9687a888d91..395f9d6d7c7 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java @@ -135,7 +135,6 @@ public class IncrementalIndexRowTypeBenchmark { return appendableIndexSpec.builder() .setSimpleTestingIndexSchema(aggs) - .setDeserializeComplexMetrics(false) .setMaxRowCount(rowsPerSegment) .build(); } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java index d355dd2d005..efb6bed6cb6 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java @@ -608,7 +608,6 @@ public class GroupByBenchmark .withRollup(withRollup) .build() ) - .setConcurrentEventAdd(true) .setMaxRowCount(rowsPerSegment) .build(); } diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java index c7946255b07..eaef749c324 100644 --- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java @@ -88,7 +88,6 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest .withMetrics(new CountAggregatorFactory("cnt")) .build() ) - .setConcurrentEventAdd(true) .setMaxRowCount(1000) .build(); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java index 7be4da386d8..22b140222c7 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java @@ -330,7 +330,7 @@ public class InputRowSerde writeString(k, out); try (Aggregator agg = aggFactory.factorize( - IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, aggFactory, supplier, true) + IncrementalIndex.makeColumnSelectorFactory(VirtualColumns.EMPTY, aggFactory, supplier) )) { try { agg.aggregate(); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java index aa739b3f744..777c4cc1fd7 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java @@ -29,9 +29,6 @@ public abstract class AppendableIndexBuilder { @Nullable protected IncrementalIndexSchema incrementalIndexSchema = null; - protected boolean deserializeComplexMetrics = true; - protected boolean concurrentEventAdd = false; - protected boolean sortFacts = true; protected int maxRowCount = 0; protected long maxBytesInMemory = 0; // When set to true, for any row that already has metric (with the same name defined in metricSpec), @@ -88,18 +85,6 @@ public abstract class AppendableIndexBuilder return this; } - public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics) - { - this.deserializeComplexMetrics = deserializeComplexMetrics; - return this; - } - - public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd) - { - this.concurrentEventAdd = concurrentEventAdd; - return this; - } - public AppendableIndexBuilder setMaxRowCount(final int maxRowCount) { this.maxRowCount = maxRowCount; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index db72ed37171..eca175267a7 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -25,7 +25,6 @@ import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; @@ -84,23 +83,15 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; -import java.util.Collection; import java.util.Comparator; -import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Stream; public abstract class IncrementalIndex implements Iterable, Closeable, ColumnInspector { @@ -109,22 +100,19 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu * * @param agg the aggregator * @param in ingestion-time input row supplier - * @param deserializeComplexMetrics whether complex objects should be deserialized by a {@link ComplexMetricExtractor} - * * @return column selector factory */ public static ColumnSelectorFactory makeColumnSelectorFactory( final VirtualColumns virtualColumns, final AggregatorFactory agg, - final Supplier in, - final boolean deserializeComplexMetrics + final Supplier in ) { // we use RowSignature.empty() because ColumnInspector here should be the InputRow schema, not the // IncrementalIndex schema, because we are reading values from the InputRow final RowBasedColumnSelectorFactory baseSelectorFactory = RowBasedColumnSelectorFactory.create( RowAdapters.standardRow(), - in::get, + in, RowSignature.empty(), true, true @@ -135,11 +123,9 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu @Override public ColumnValueSelector makeColumnValueSelector(final String column) { - final boolean isComplexMetric = agg.getIntermediateType().is(ValueType.COMPLEX); - final ColumnValueSelector selector = baseSelectorFactory.makeColumnValueSelector(column); - if (!isComplexMetric || !deserializeComplexMetrics) { + if (!agg.getIntermediateType().is(ValueType.COMPLEX)) { return selector; } else { // Wrap selector in a special one that uses ComplexMetricSerde to modify incoming objects. @@ -226,7 +212,6 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu private final List> rowTransformers; private final VirtualColumns virtualColumns; private final AggregatorFactory[] metrics; - private final boolean deserializeComplexMetrics; private final Metadata metadata; protected final boolean preserveExistingMetrics; @@ -252,16 +237,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu /** - * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that - * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. - *

- * Set concurrentEventAdd to true to indicate that adding of input row should be thread-safe (for example, groupBy - * where the multiple threads can add concurrently to the IncrementalIndex). - * * @param incrementalIndexSchema the schema to use for incremental index - * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input - * value for aggregators that return metrics other than float. - * @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe * @param preserveExistingMetrics When set to true, for any row that already has metric * (with the same name defined in metricSpec), the metric aggregator in metricSpec * is skipped and the existing metric is unchanged. If the row does not already have @@ -273,8 +249,6 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu */ protected IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, - final boolean deserializeComplexMetrics, - final boolean concurrentEventAdd, final boolean preserveExistingMetrics, final boolean useMaxMemoryEstimates ) @@ -285,7 +259,6 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu this.virtualColumns = incrementalIndexSchema.getVirtualColumns(); this.metrics = incrementalIndexSchema.getMetrics(); this.rowTransformers = new CopyOnWriteArrayList<>(); - this.deserializeComplexMetrics = deserializeComplexMetrics; this.preserveExistingMetrics = preserveExistingMetrics; this.useMaxMemoryEstimates = useMaxMemoryEstimates; this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec() @@ -303,7 +276,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu this.rollup ); - initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); + initAggs(metrics, rowSupplier); for (AggregatorFactory metric : metrics) { MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric); @@ -359,9 +332,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu protected abstract void initAggs( AggregatorFactory[] metrics, - Supplier rowSupplier, - boolean deserializeComplexMetrics, - boolean concurrentEventAdd + Supplier rowSupplier ); // Note: This method needs to be thread safe. @@ -740,11 +711,6 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu return numEntries.get(); } - boolean getDeserializeComplexMetrics() - { - return deserializeComplexMetrics; - } - AtomicInteger getNumEntries() { return numEntries; @@ -1054,11 +1020,10 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu protected ColumnSelectorFactory makeColumnSelectorFactory( final AggregatorFactory agg, - final Supplier in, - final boolean deserializeComplexMetrics + final Supplier in ) { - return makeColumnSelectorFactory(virtualColumns, agg, in, deserializeComplexMetrics); + return makeColumnSelectorFactory(virtualColumns, agg, in); } protected final Comparator dimsComparator() @@ -1127,7 +1092,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu return true; } - interface FactsHolder + public interface FactsHolder { /** * @return the previous rowIndex associated with the specified key, or @@ -1161,232 +1126,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu void clear(); } - static class RollupFactsHolder implements FactsHolder - { - private final boolean sortFacts; - // Can't use Set because we need to be able to get from collection - private final ConcurrentMap facts; - private final List dimensionDescsList; - - RollupFactsHolder( - boolean sortFacts, - Comparator incrementalIndexRowComparator, - List dimensionDescsList - ) - { - this.sortFacts = sortFacts; - if (sortFacts) { - this.facts = new ConcurrentSkipListMap<>(incrementalIndexRowComparator); - } else { - this.facts = new ConcurrentHashMap<>(); - } - this.dimensionDescsList = dimensionDescsList; - } - - @Override - public int getPriorIndex(IncrementalIndexRow key) - { - IncrementalIndexRow row = facts.get(key); - return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : row.getRowIndex(); - } - - @Override - public long getMinTimeMillis() - { - if (sortFacts) { - return ((ConcurrentNavigableMap) facts).firstKey().getTimestamp(); - } else { - throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); - } - } - - @Override - public long getMaxTimeMillis() - { - if (sortFacts) { - return ((ConcurrentNavigableMap) facts).lastKey().getTimestamp(); - } else { - throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); - } - } - - @Override - public Iterator iterator(boolean descending) - { - if (descending && sortFacts) { - return ((ConcurrentNavigableMap) facts).descendingMap() - .keySet() - .iterator(); - } - return keySet().iterator(); - } - - @Override - public Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd) - { - if (!sortFacts) { - throw new UnsupportedOperationException("can't get timeRange from unsorted facts data."); - } - IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new Object[]{}, dimensionDescsList); - IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new Object[]{}, dimensionDescsList); - ConcurrentNavigableMap subMap = - ((ConcurrentNavigableMap) facts).subMap(start, end); - ConcurrentMap rangeMap = descending ? subMap.descendingMap() : subMap; - return rangeMap.keySet(); - } - - @Override - public Iterable keySet() - { - return facts.keySet(); - } - - @Override - public Iterable persistIterable() - { - // with rollup, facts are already pre-sorted so just return keyset - return keySet(); - } - - @Override - public int putIfAbsent(IncrementalIndexRow key, int rowIndex) - { - // setRowIndex() must be called before facts.putIfAbsent() for visibility of rowIndex from concurrent readers. - key.setRowIndex(rowIndex); - IncrementalIndexRow prev = facts.putIfAbsent(key, key); - return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : prev.getRowIndex(); - } - - @Override - public void clear() - { - facts.clear(); - } - } - - static class PlainFactsHolder implements FactsHolder - { - private final boolean sortFacts; - private final ConcurrentMap> facts; - - private final Comparator incrementalIndexRowComparator; - - public PlainFactsHolder(boolean sortFacts, Comparator incrementalIndexRowComparator) - { - this.sortFacts = sortFacts; - if (sortFacts) { - this.facts = new ConcurrentSkipListMap<>(); - } else { - this.facts = new ConcurrentHashMap<>(); - } - this.incrementalIndexRowComparator = incrementalIndexRowComparator; - } - - @Override - public int getPriorIndex(IncrementalIndexRow key) - { - // always return EMPTY_ROW_INDEX to indicate that no prior key cause we always add new row - return IncrementalIndexRow.EMPTY_ROW_INDEX; - } - - @Override - public long getMinTimeMillis() - { - if (sortFacts) { - return ((ConcurrentNavigableMap>) facts).firstKey(); - } else { - throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); - } - } - - @Override - public long getMaxTimeMillis() - { - if (sortFacts) { - return ((ConcurrentNavigableMap>) facts).lastKey(); - } else { - throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); - } - } - - @Override - public Iterator iterator(boolean descending) - { - if (descending && sortFacts) { - return timeOrderedConcat(((ConcurrentNavigableMap>) facts) - .descendingMap().values(), true).iterator(); - } - return timeOrderedConcat(facts.values(), false).iterator(); - } - - @Override - public Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd) - { - ConcurrentNavigableMap> subMap = - ((ConcurrentNavigableMap>) facts).subMap(timeStart, timeEnd); - final ConcurrentMap> rangeMap = descending ? subMap.descendingMap() : subMap; - return timeOrderedConcat(rangeMap.values(), descending); - } - - private Iterable timeOrderedConcat( - final Iterable> iterable, - final boolean descending - ) - { - return () -> Iterators.concat( - Iterators.transform( - iterable.iterator(), - input -> descending ? input.descendingIterator() : input.iterator() - ) - ); - } - - private Stream timeAndDimsOrderedConcat( - final Collection> rowGroups - ) - { - return rowGroups.stream() - .flatMap(Collection::stream) - .sorted(incrementalIndexRowComparator); - } - - @Override - public Iterable keySet() - { - return timeOrderedConcat(facts.values(), false); - } - - @Override - public Iterable persistIterable() - { - return () -> timeAndDimsOrderedConcat(facts.values()).iterator(); - } - - @Override - public int putIfAbsent(IncrementalIndexRow key, int rowIndex) - { - Long time = key.getTimestamp(); - Deque rows = facts.get(time); - if (rows == null) { - facts.putIfAbsent(time, new ConcurrentLinkedDeque<>()); - // in race condition, rows may be put by other thread, so always get latest status from facts - rows = facts.get(time); - } - // setRowIndex() must be called before rows.add() for visibility of rowIndex from concurrent readers. - key.setRowIndex(rowIndex); - rows.add(key); - // always return EMPTY_ROW_INDEX to indicate that we always add new row - return IncrementalIndexRow.EMPTY_ROW_INDEX; - } - - @Override - public void clear() - { - facts.clear(); - } - } - - private class LongMetricColumnSelector implements LongColumnSelector + private final class LongMetricColumnSelector implements LongColumnSelector { private final IncrementalIndexRowHolder currEntry; private final int metricIndex; @@ -1417,7 +1157,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu } } - private class ObjectMetricColumnSelector extends ObjectColumnSelector + private final class ObjectMetricColumnSelector extends ObjectColumnSelector { private final IncrementalIndexRowHolder currEntry; private final int metricIndex; @@ -1454,7 +1194,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu } } - private class FloatMetricColumnSelector implements FloatColumnSelector + private final class FloatMetricColumnSelector implements FloatColumnSelector { private final IncrementalIndexRowHolder currEntry; private final int metricIndex; @@ -1485,7 +1225,7 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu } } - private class DoubleMetricColumnSelector implements DoubleColumnSelector + private final class DoubleMetricColumnSelector implements DoubleColumnSelector { private final IncrementalIndexRowHolder currEntry; private final int metricIndex; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java index 987ee5f8bf7..70a589f3950 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java @@ -42,7 +42,7 @@ public final class IncrementalIndexRow * rowIndex is not checked in {@link #equals} and {@link #hashCode} on purpose. IncrementalIndexRow acts as a Map key * and "entry" object (rowIndex is the "value") at the same time. This is done to reduce object indirection and * improve locality, and avoid boxing of rowIndex as Integer, when stored in JDK collection: - * {@link IncrementalIndex.RollupFactsHolder} needs concurrent collections, that are not present in fastutil. + * {@link OnheapIncrementalIndex.RollupFactsHolder} needs concurrent collections, that are not present in fastutil. */ private int rowIndex; private long dimsKeySize; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 8e253f8e903..3449226e4ce 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -47,14 +47,23 @@ import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.stream.Stream; /** * @@ -72,7 +81,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION = 100; /** - * overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object + * overhead per {@link ConcurrentSkipListMap.Node} object in facts table */ private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES; private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); @@ -118,28 +127,19 @@ public class OnheapIncrementalIndex extends IncrementalIndex OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, - boolean deserializeComplexMetrics, - boolean concurrentEventAdd, - boolean sortFacts, int maxRowCount, long maxBytesInMemory, - // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can have existing metrics - // This is currently only use by auto compaction and should not be use for anything else. + // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can + // have existing metrics. This is currently only use by auto compaction and should not be use for anything else. boolean preserveExistingMetrics, boolean useMaxMemoryEstimates ) { - super( - incrementalIndexSchema, - deserializeComplexMetrics, - concurrentEventAdd, - preserveExistingMetrics, - useMaxMemoryEstimates - ); + super(incrementalIndexSchema, preserveExistingMetrics, useMaxMemoryEstimates); this.maxRowCount = maxRowCount; this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; - this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) - : new PlainFactsHolder(sortFacts, dimsComparator()); + this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(dimsComparator(), getDimensions()) + : new PlainFactsHolder(dimsComparator()); maxBytesPerRowForAggregators = useMaxMemoryEstimates ? getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0; this.useMaxMemoryEstimates = useMaxMemoryEstimates; @@ -190,9 +190,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override protected void initAggs( final AggregatorFactory[] metrics, - final Supplier rowSupplier, - final boolean deserializeComplexMetrics, - final boolean concurrentEventAdd + final Supplier rowSupplier ) { selectors = new HashMap<>(); @@ -200,18 +198,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex for (AggregatorFactory agg : metrics) { selectors.put( agg.getName(), - new CachingColumnSelectorFactory( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics), - concurrentEventAdd - ) + new CachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier)) ); if (preserveExistingMetrics) { AggregatorFactory combiningAgg = agg.getCombiningFactory(); combiningAggSelectors.put( combiningAgg.getName(), new CachingColumnSelectorFactory( - makeColumnSelectorFactory(combiningAgg, rowSupplier, deserializeComplexMetrics), - concurrentEventAdd + makeColumnSelectorFactory(combiningAgg, rowSupplier) ) ); } @@ -550,6 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex * If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed */ + @Nullable private Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function getMetricTypeFunction) { if (preserveExistingMetrics) { @@ -605,18 +600,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex */ static class CachingColumnSelectorFactory implements ColumnSelectorFactory { - private final Map> columnSelectorMap; + private final HashMap> columnSelectorMap; private final ColumnSelectorFactory delegate; - public CachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd) + public CachingColumnSelectorFactory(ColumnSelectorFactory delegate) { this.delegate = delegate; - - if (concurrentEventAdd) { - columnSelectorMap = new ConcurrentHashMap<>(); - } else { - columnSelectorMap = new HashMap<>(); - } + this.columnSelectorMap = new HashMap<>(); } @Override @@ -628,7 +618,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override public ColumnValueSelector makeColumnValueSelector(String columnName) { - ColumnValueSelector existing = columnSelectorMap.get(columnName); + ColumnValueSelector existing = columnSelectorMap.get(columnName); if (existing != null) { return existing; } @@ -656,9 +646,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex { return new OnheapIncrementalIndex( Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"), - deserializeComplexMetrics, - concurrentEventAdd, - sortFacts, maxRowCount, maxBytesInMemory, preserveExistingMetrics, @@ -734,4 +721,194 @@ public class OnheapIncrementalIndex extends IncrementalIndex return Objects.hash(preserveExistingMetrics); } } + + static final class RollupFactsHolder implements FactsHolder + { + // Can't use Set because we need to be able to get from collection + private final ConcurrentNavigableMap facts; + private final List dimensionDescsList; + + RollupFactsHolder( + Comparator incrementalIndexRowComparator, + List dimensionDescsList + ) + { + this.facts = new ConcurrentSkipListMap<>(incrementalIndexRowComparator); + this.dimensionDescsList = dimensionDescsList; + } + + @Override + public int getPriorIndex(IncrementalIndexRow key) + { + IncrementalIndexRow row = facts.get(key); + return row == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : row.getRowIndex(); + } + + @Override + public long getMinTimeMillis() + { + return facts.firstKey().getTimestamp(); + } + + @Override + public long getMaxTimeMillis() + { + return facts.lastKey().getTimestamp(); + } + + @Override + public Iterator iterator(boolean descending) + { + if (descending) { + return facts.descendingMap() + .keySet() + .iterator(); + } + return keySet().iterator(); + } + + @Override + public Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd) + { + IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new Object[]{}, dimensionDescsList); + IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new Object[]{}, dimensionDescsList); + ConcurrentNavigableMap subMap = facts.subMap(start, end); + ConcurrentMap rangeMap = descending ? subMap.descendingMap() : subMap; + return rangeMap.keySet(); + } + + @Override + public Iterable keySet() + { + return facts.keySet(); + } + + @Override + public Iterable persistIterable() + { + // with rollup, facts are already pre-sorted so just return keyset + return keySet(); + } + + @Override + public int putIfAbsent(IncrementalIndexRow key, int rowIndex) + { + // setRowIndex() must be called before facts.putIfAbsent() for visibility of rowIndex from concurrent readers. + key.setRowIndex(rowIndex); + IncrementalIndexRow prev = facts.putIfAbsent(key, key); + return prev == null ? IncrementalIndexRow.EMPTY_ROW_INDEX : prev.getRowIndex(); + } + + @Override + public void clear() + { + facts.clear(); + } + } + + static final class PlainFactsHolder implements FactsHolder + { + private final ConcurrentNavigableMap> facts; + + private final Comparator incrementalIndexRowComparator; + + public PlainFactsHolder(Comparator incrementalIndexRowComparator) + { + this.facts = new ConcurrentSkipListMap<>(); + this.incrementalIndexRowComparator = incrementalIndexRowComparator; + } + + @Override + public int getPriorIndex(IncrementalIndexRow key) + { + // always return EMPTY_ROW_INDEX to indicate that no prior key cause we always add new row + return IncrementalIndexRow.EMPTY_ROW_INDEX; + } + + @Override + public long getMinTimeMillis() + { + return facts.firstKey(); + } + + @Override + public long getMaxTimeMillis() + { + return facts.lastKey(); + } + + @Override + public Iterator iterator(boolean descending) + { + if (descending) { + return timeOrderedConcat(facts.descendingMap().values(), true).iterator(); + } + return timeOrderedConcat(facts.values(), false).iterator(); + } + + @Override + public Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd) + { + ConcurrentNavigableMap> subMap = facts.subMap(timeStart, timeEnd); + final ConcurrentMap> rangeMap = descending ? subMap.descendingMap() : subMap; + return timeOrderedConcat(rangeMap.values(), descending); + } + + private Iterable timeOrderedConcat( + final Iterable> iterable, + final boolean descending + ) + { + return () -> Iterators.concat( + Iterators.transform( + iterable.iterator(), + input -> descending ? input.descendingIterator() : input.iterator() + ) + ); + } + + private Stream timeAndDimsOrderedConcat( + final Collection> rowGroups + ) + { + return rowGroups.stream() + .flatMap(Collection::stream) + .sorted(incrementalIndexRowComparator); + } + + @Override + public Iterable keySet() + { + return timeOrderedConcat(facts.values(), false); + } + + @Override + public Iterable persistIterable() + { + return () -> timeAndDimsOrderedConcat(facts.values()).iterator(); + } + + @Override + public int putIfAbsent(IncrementalIndexRow key, int rowIndex) + { + Long time = key.getTimestamp(); + Deque rows = facts.get(time); + if (rows == null) { + facts.putIfAbsent(time, new ConcurrentLinkedDeque<>()); + // in race condition, rows may be put by other thread, so always get latest status from facts + rows = facts.get(time); + } + // setRowIndex() must be called before rows.add() for visibility of rowIndex from concurrent readers. + key.setRowIndex(rowIndex); + rows.add(key); + // always return EMPTY_ROW_INDEX to indicate that we always add new row + return IncrementalIndexRow.EMPTY_ROW_INDEX; + } + + @Override + public void clear() + { + facts.clear(); + } + } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index c60520fbfa4..fab9357b051 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -480,7 +480,6 @@ public class AggregationTestHelper implements Closeable outDir, minTimestamp, gran, - true, maxRowCount, rollup ); @@ -498,7 +497,6 @@ public class AggregationTestHelper implements Closeable File outDir, long minTimestamp, Granularity gran, - boolean deserializeComplexMetrics, int maxRowCount, boolean rollup ) throws Exception @@ -517,7 +515,6 @@ public class AggregationTestHelper implements Closeable .withRollup(rollup) .build() ) - .setDeserializeComplexMetrics(deserializeComplexMetrics) .setMaxRowCount(maxRowCount) .build(); @@ -538,7 +535,6 @@ public class AggregationTestHelper implements Closeable .withRollup(rollup) .build() ) - .setDeserializeComplexMetrics(deserializeComplexMetrics) .setMaxRowCount(maxRowCount) .build(); } @@ -594,7 +590,6 @@ public class AggregationTestHelper implements Closeable final AggregatorFactory[] metrics, long minTimestamp, Granularity gran, - boolean deserializeComplexMetrics, int maxRowCount, boolean rollup ) throws Exception @@ -609,7 +604,6 @@ public class AggregationTestHelper implements Closeable .withRollup(rollup) .build() ) - .setDeserializeComplexMetrics(deserializeComplexMetrics) .setMaxRowCount(maxRowCount) .build(); @@ -636,7 +630,6 @@ public class AggregationTestHelper implements Closeable final AggregatorFactory[] metrics, long minTimestamp, Granularity gran, - boolean deserializeComplexMetrics, int maxRowCount, boolean rollup ) throws Exception @@ -648,7 +641,6 @@ public class AggregationTestHelper implements Closeable metrics, minTimestamp, gran, - deserializeComplexMetrics, maxRowCount, rollup ); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java index 2e516cebf63..b54a5e5db3b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/StringColumnAggregationTest.java @@ -107,7 +107,6 @@ public class StringColumnAggregationTest new AggregatorFactory[]{new CountAggregatorFactory("count")}, 0, Granularities.NONE, - false, 100, false ); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java index 786a6fc176c..1e9bca48216 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/mean/SimpleTestIndex.java @@ -101,7 +101,6 @@ public class SimpleTestIndex }, 0, Granularities.NONE, - false, 100, false ); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index 391a490e2da..2dfe2b7dfbf 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -138,7 +138,6 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH .withRollup(withRollup) .build() ) - .setConcurrentEventAdd(true) .setMaxRowCount(1000) .build(); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index 8cd451eb009..28ff970efdf 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -154,7 +154,6 @@ public class GroupByLimitPushDownMultiNodeMergeTest .withRollup(withRollup) .build() ) - .setConcurrentEventAdd(true) .setMaxRowCount(1000) .build(); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index 05aa06db0a5..0284edd4127 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -130,7 +130,6 @@ public class GroupByMultiSegmentTest .withRollup(withRollup) .build() ) - .setConcurrentEventAdd(true) .setMaxRowCount(1000) .build(); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index 3892a2018a0..266691ee294 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -138,7 +138,6 @@ public class GroupByQueryRunnerFactoryTest { IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder() .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) - .setConcurrentEventAdd(true) .setMaxRowCount(5000) .build(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java index 8bf2785e2b7..06802b55694 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java @@ -132,7 +132,6 @@ public class NestedQueryPushDownTest extends InitializedNullHandlingTest )) .build() ) - .setConcurrentEventAdd(true) .setMaxRowCount(1000) .build(); } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java index 4f7eb9b8317..d7b51ab0f8b 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java @@ -180,7 +180,7 @@ public class IncrementalIndexCreator implements Closeable * * For example, for a parameterized test with the following constrctor: * {@code - * public IncrementalIndexTest(String indexType, String mode, boolean deserializeComplexMetrics) + * public IncrementalIndexTest(String indexType, String mode) * { * ... * } @@ -188,12 +188,11 @@ public class IncrementalIndexCreator implements Closeable * * we can test all the input combinations as follows: * {@code - * @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}") + * @Parameterized.Parameters(name = "{index}: {0}, {1}") * public static Collection constructorFeeder() * { * return IncrementalIndexCreator.indexTypeCartesianProduct( - * ImmutableList.of("rollup", "plain"), - * ImmutableList.of(true, false) + * ImmutableList.of("rollup", "plain") * ); * } * } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java index 0b45026e829..c83c4f0da4c 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java @@ -67,8 +67,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest public IncrementalIndexTest( String indexType, - String mode, - boolean deserializeComplexMetrics + String mode ) throws JsonProcessingException { this.mode = mode; @@ -101,18 +100,16 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest .build(); indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder .setIndexSchema(schema) - .setDeserializeComplexMetrics(deserializeComplexMetrics) .setMaxRowCount(1_000_000) .build()) ); } - @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}") + @Parameterized.Parameters(name = "{index}: {0}, {1}") public static Collection constructorFeeder() { return IncrementalIndexCreator.indexTypeCartesianProduct( - ImmutableList.of("rollup", "plain"), - ImmutableList.of(true, false) + ImmutableList.of("rollup", "plain") ); } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 02d43e27506..3d0674d2845 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -110,18 +110,12 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark public MapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, - boolean deserializeComplexMetrics, - boolean concurrentEventAdd, - boolean sortFacts, int maxRowCount, long maxBytesInMemory ) { super( incrementalIndexSchema, - deserializeComplexMetrics, - concurrentEventAdd, - sortFacts, maxRowCount, maxBytesInMemory, false, @@ -143,9 +137,6 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark .withQueryGranularity(gran) .withMetrics(metrics) .build(), - true, - false, - true, maxRowCount, maxBytesInMemory, false, @@ -190,7 +181,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( - makeColumnSelectorFactory(agg, rowSupplier, getDeserializeComplexMetrics()) + makeColumnSelectorFactory(agg, rowSupplier) ); } Integer rowIndex;