From 02dfd5cd809a59dca062573b09e01cb314b8e974 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 1 Mar 2016 10:10:50 -0600 Subject: [PATCH] update IncrementalIndex to support unsorted facts map that can be used in groupBy merging to improve performance --- .../IncrementalIndexAddRowsBenchmark.java | 1 + .../query/groupby/GroupByQueryHelper.java | 2 ++ .../segment/incremental/IncrementalIndex.java | 35 +++++++++++++++---- .../incremental/OffheapIncrementalIndex.java | 20 ++++++++--- .../incremental/OnheapIncrementalIndex.java | 21 +++++++---- .../druid/query/MultiValuedDimensionTest.java | 1 + .../aggregation/AggregationTestHelper.java | 5 ++- .../firehose/IngestSegmentFirehoseTest.java | 2 +- 8 files changed, 65 insertions(+), 22 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java index 5580a46668b..c9d10d25571 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java @@ -126,6 +126,7 @@ public class IncrementalIndexAddRowsBenchmark aggs, false, false, + true, maxRows ); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 4a94f8f5c04..ae6d52ce3c2 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -90,6 +90,7 @@ public class GroupByQueryHelper aggs.toArray(new AggregatorFactory[aggs.size()]), false, true, + true, Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()), bufferPool ); @@ -102,6 +103,7 @@ public class GroupByQueryHelper aggs.toArray(new AggregatorFactory[aggs.size()]), false, true, + true, Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()) ); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 97888fd0ff1..b617f11082b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -71,6 +71,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -340,6 +341,7 @@ public abstract class IncrementalIndex implements Iterable, private final AggregatorType[] aggs; private final boolean deserializeComplexMetrics; private final boolean reportParseExceptions; + private final boolean sortFacts; private final Metadata metadata; private final Map metricDescs; @@ -374,7 +376,8 @@ public abstract class IncrementalIndex implements Iterable, public IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, - final boolean reportParseExceptions + final boolean reportParseExceptions, + final boolean sortFacts ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -383,6 +386,7 @@ public abstract class IncrementalIndex implements Iterable, this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; this.reportParseExceptions = reportParseExceptions; + this.sortFacts = sortFacts; this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics)); @@ -441,7 +445,7 @@ public abstract class IncrementalIndex implements Iterable, // use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation protected abstract DimDim makeDimDim(String dimension, Object lock); - public abstract ConcurrentNavigableMap getFacts(); + public abstract ConcurrentMap getFacts(); public abstract boolean canAppendRow(); @@ -673,12 +677,20 @@ public abstract class IncrementalIndex implements Iterable, private long getMinTimeMillis() { - return getFacts().firstKey().getTimestamp(); + if (sortFacts) { + return ((ConcurrentNavigableMap) getFacts()).firstKey().getTimestamp(); + } else { + throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); + } } private long getMaxTimeMillis() { - return getFacts().lastKey().getTimestamp(); + if (sortFacts) { + return ((ConcurrentNavigableMap) getFacts()).lastKey().getTimestamp(); + } else { + throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); + } } private int[] getDimVals(final DimDim dimLookup, final List dimValues) @@ -831,7 +843,11 @@ public abstract class IncrementalIndex implements Iterable, public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) { - return getFacts().subMap(start, end); + if (sortFacts) { + return ((ConcurrentNavigableMap) getFacts()).subMap(start, end); + } else { + throw new UnsupportedOperationException("can't get subMap from unsorted facts data."); + } } public Metadata getMetadata() @@ -862,7 +878,14 @@ public abstract class IncrementalIndex implements Iterable, public Iterator iterator() { final List dimensions = getDimensions(); - final ConcurrentNavigableMap facts = descending ? getFacts().descendingMap() : getFacts(); + + Map facts = null; + if (descending && sortFacts) { + facts = ((ConcurrentNavigableMap) getFacts()).descendingMap(); + } else { + facts = getFacts(); + } + return Iterators.transform( facts.entrySet().iterator(), new Function, Row>() diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 47f4c5ca4f1..7aabc4ac06b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -38,7 +38,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -51,7 +52,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex private final List> aggBuffers = new ArrayList<>(); private final List indexAndOffsets = new ArrayList<>(); - private final ConcurrentNavigableMap facts; + private final ConcurrentMap facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); @@ -71,14 +72,20 @@ public class OffheapIncrementalIndex extends IncrementalIndex IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount, StupidPool bufferPool ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts); this.maxRowCount = maxRowCount; this.bufferPool = bufferPool; - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + + if (sortFacts) { + this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + } else { + this.facts = new ConcurrentHashMap<>(); + } //check that stupid pool gives buffers that can hold at least one row's aggregators ResourceHolder bb = bufferPool.take(); @@ -100,6 +107,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount, StupidPool bufferPool ) @@ -111,6 +119,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex .build(), deserializeComplexMetrics, reportParseExceptions, + sortFacts, maxRowCount, bufferPool ); @@ -131,13 +140,14 @@ public class OffheapIncrementalIndex extends IncrementalIndex .build(), true, true, + true, maxRowCount, bufferPool ); } @Override - public ConcurrentNavigableMap getFacts() + public ConcurrentMap getFacts() { return facts; } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 5f4f58e0e18..a19cb30a532 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -33,13 +33,11 @@ import io.druid.segment.DimensionSelector; import io.druid.segment.FloatColumnSelector; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.ValueType; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -48,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class OnheapIncrementalIndex extends IncrementalIndex { private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); - private final ConcurrentNavigableMap facts; + private final ConcurrentMap facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); protected final int maxRowCount; private volatile Map selectors; @@ -59,12 +57,18 @@ public class OnheapIncrementalIndex extends IncrementalIndex IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts); this.maxRowCount = maxRowCount; - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + + if (sortFacts) { + this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + } else { + this.facts = new ConcurrentHashMap<>(); + } } public OnheapIncrementalIndex( @@ -73,6 +77,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean sortFacts, int maxRowCount ) { @@ -83,6 +88,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex .build(), deserializeComplexMetrics, reportParseExceptions, + sortFacts, maxRowCount ); } @@ -101,6 +107,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex .build(), true, true, + true, maxRowCount ); } @@ -111,11 +118,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex int maxRowCount ) { - this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount); + this(incrementalIndexSchema, true, reportParseExceptions, true, maxRowCount); } @Override - public ConcurrentNavigableMap getFacts() + public ConcurrentMap getFacts() { return facts; } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index c56b4521bfc..9c68fade306 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -99,6 +99,7 @@ public class MultiValuedDimensionTest }, true, true, + true, 5000 ); diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 163a4a3b886..9b5a20df901 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -71,7 +71,6 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; @@ -311,7 +310,7 @@ public class AggregationTestHelper List toMerge = new ArrayList<>(); try { - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount); while (rows.hasNext()) { Object row = rows.next(); if (!index.canAppendRow()) { @@ -319,7 +318,7 @@ public class AggregationTestHelper toMerge.add(tmp); indexMerger.persist(index, tmp, new IndexSpec()); index.close(); - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount); } if (row instanceof String && parser instanceof StringInputRowParser) { //Note: this is required because StringInputRowParser is InputRowParser as opposed to diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index fb219618077..7d2676560b3 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -120,7 +120,7 @@ public class IngestSegmentFirehoseTest IncrementalIndex index = null; try { - index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, 5000); + index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, true, 5000); for (String line : rows) { index.add(parser.parse(line)); }