From 62677212cc72e3024cb9f1e72455155e5b746d38 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 6 Aug 2018 14:17:48 -0700 Subject: [PATCH] Order rows during incremental index persist when rollup is disabled. (#6107) * order using IncrementalIndexRowComparator at persist time when rollup is disabled, allowing increased effectiveness of dimension compression, resolves #6066 * fix stuff from review --- .../benchmark/datagen/BenchmarkSchemas.java | 50 ++++++++++++ .../indexing/IndexPersistBenchmark.java | 67 ++++++++-------- .../segment/incremental/IncrementalIndex.java | 66 +++++++++++++--- .../incremental/IncrementalIndexAdapter.java | 2 +- .../IncrementalIndexRowIterator.java | 2 +- .../incremental/OffheapIncrementalIndex.java | 2 +- .../incremental/OnheapIncrementalIndex.java | 2 +- .../segment/data/IncrementalIndexTest.java | 2 +- .../IncrementalIndexAdapterTest.java | 79 +++++++++++++++++++ 9 files changed, 226 insertions(+), 46 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java index d50c1a04307..775e94b353d 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java @@ -157,4 +157,54 @@ public class BenchmarkSchemas ); SCHEMA_MAP.put("simpleFloat", basicSchema); } + + static { // schema with high opportunity for rollup + List rolloColumns = ImmutableList.of( + // dims + BenchmarkColumnSchema.makeEnumerated( + "dimEnumerated", + ValueType.STRING, + false, + 1, + null, + Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"), + Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3) + ), + BenchmarkColumnSchema.makeEnumerated( + "dimEnumerated2", + ValueType.STRING, + false, + 1, + null, + Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null), + Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3) + ), + BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 100, 2.0), + BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 100), + + // metrics + BenchmarkColumnSchema.makeZipf("metLongZipf", ValueType.LONG, true, 1, null, 0, 10000, 2.0), + BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500), + BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true), + BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.5) + ); + + List rolloSchemaIngestAggs = new ArrayList<>(); + rolloSchemaIngestAggs.add(new CountAggregatorFactory("rows")); + rolloSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential")); + rolloSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform")); + rolloSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal")); + rolloSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf")); + rolloSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique")); + + Interval basicSchemaDataInterval = Intervals.utc(0, 1000000); + + BenchmarkSchemaInfo rolloSchema = new BenchmarkSchemaInfo( + rolloColumns, + rolloSchemaIngestAggs, + basicSchemaDataInterval, + true + ); + SCHEMA_MAP.put("rollo", rolloSchema); + } } diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java index 6d5e172fd51..c299b09eae7 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java @@ -32,7 +32,6 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; -import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.serde.ComplexMetrics; @@ -64,44 +63,38 @@ import java.util.concurrent.TimeUnit; @Measurement(iterations = 25) public class IndexPersistBenchmark { - @Param({"75000"}) - private int rowsPerSegment; - - @Param({"basic"}) - private String schema; - - @Param({"true", "false"}) - private boolean rollup; - + public static final ObjectMapper JSON_MAPPER; private static final Logger log = new Logger(IndexPersistBenchmark.class); private static final int RNG_SEED = 9999; - - private IncrementalIndex incIndex; - private ArrayList rows; - private BenchmarkSchemaInfo schemaInfo; - - private static final IndexMergerV9 INDEX_MERGER_V9; private static final IndexIO INDEX_IO; - public static final ObjectMapper JSON_MAPPER; static { JSON_MAPPER = new DefaultObjectMapper(); INDEX_IO = new IndexIO( JSON_MAPPER, OffHeapMemorySegmentWriteOutMediumFactory.instance(), - new ColumnConfig() - { - @Override - public int columnCacheSizeBytes() - { - return 0; - } - } + () -> 0 ); INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); } + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"rollo"}) + private String schema; + + @Param({"true", "false"}) + private boolean rollup; + + @Param({"none", "moderate", "high"}) + private String rollupOpportunity; + + private IncrementalIndex incIndex; + private ArrayList rows; + private BenchmarkSchemaInfo schemaInfo; + @Setup public void setup() { @@ -114,11 +107,23 @@ public class IndexPersistBenchmark rows = new ArrayList(); schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + int valuesPerTimestamp = 1; + switch (rollupOpportunity) { + case "moderate": + valuesPerTimestamp = 1000; + break; + case "high": + valuesPerTimestamp = 10000; + break; + + } + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( schemaInfo.getColumnSchemas(), RNG_SEED, - schemaInfo.getDataInterval(), - rowsPerSegment + schemaInfo.getDataInterval().getStartMillis(), + valuesPerTimestamp, + 1000.0 ); for (int i = 0; i < rowsPerSegment; i++) { @@ -128,8 +133,6 @@ public class IndexPersistBenchmark } rows.add(row); } - - } @Setup(Level.Iteration) @@ -154,9 +157,9 @@ public class IndexPersistBenchmark return new IncrementalIndex.Builder() .setIndexSchema( new IncrementalIndexSchema.Builder() - .withMetrics(schemaInfo.getAggsArray()) - .withRollup(rollup) - .build() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() ) .setReportParseExceptions(false) .setMaxRowCount(rowsPerSegment) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index cdf8957b493..0faeaef578d 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -80,6 +80,7 @@ import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.Deque; import java.util.Iterator; @@ -94,6 +95,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; /** */ @@ -366,9 +368,24 @@ public abstract class IncrementalIndex extends AbstractIndex imp @VisibleForTesting public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics) { - this.incrementalIndexSchema = new IncrementalIndexSchema.Builder() - .withMetrics(metrics) - .build(); + return setSimpleTestingIndexSchema(null, metrics); + } + + + /** + * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the + * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you + * would use it in production settings. + * + * @param metrics variable array of {@link AggregatorFactory} metrics + * + * @return this + */ + @VisibleForTesting + public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics) + { + IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics); + this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build(); return this; } @@ -1202,6 +1219,12 @@ public abstract class IncrementalIndex extends AbstractIndex imp Iterable keySet(); + /** + * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator} + * @return + */ + Iterable persistIterable(); + /** * @return the previous rowIndex associated with the specified key, or * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key. @@ -1289,6 +1312,13 @@ public abstract class IncrementalIndex extends AbstractIndex imp return facts.keySet(); } + @Override + public Iterable persistIterable() + { + // with rollup, facts are already pre-sorted so just return keyset + return keySet(); + } + @Override public int putIfAbsent(IncrementalIndexRow key, int rowIndex) { @@ -1310,7 +1340,9 @@ public abstract class IncrementalIndex extends AbstractIndex imp private final boolean sortFacts; private final ConcurrentMap> facts; - public PlainFactsHolder(boolean sortFacts) + private final Comparator incrementalIndexRowComparator; + + public PlainFactsHolder(boolean sortFacts, Comparator incrementalIndexRowComparator) { this.sortFacts = sortFacts; if (sortFacts) { @@ -1318,6 +1350,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp } else { this.facts = new ConcurrentHashMap<>(); } + this.incrementalIndexRowComparator = incrementalIndexRowComparator; } @Override @@ -1351,10 +1384,10 @@ public abstract class IncrementalIndex extends AbstractIndex imp public Iterator iterator(boolean descending) { if (descending && sortFacts) { - return concat(((ConcurrentNavigableMap>) facts) + return timeOrderedConcat(((ConcurrentNavigableMap>) facts) .descendingMap().values(), true).iterator(); } - return concat(facts.values(), false).iterator(); + return timeOrderedConcat(facts.values(), false).iterator(); } @Override @@ -1363,10 +1396,10 @@ public abstract class IncrementalIndex extends AbstractIndex imp ConcurrentNavigableMap> subMap = ((ConcurrentNavigableMap>) facts).subMap(timeStart, timeEnd); final Map> rangeMap = descending ? subMap.descendingMap() : subMap; - return concat(rangeMap.values(), descending); + return timeOrderedConcat(rangeMap.values(), descending); } - private Iterable concat( + private Iterable timeOrderedConcat( final Iterable> iterable, final boolean descending ) @@ -1379,10 +1412,25 @@ public abstract class IncrementalIndex extends AbstractIndex imp ); } + private Stream timeAndDimsOrderedConcat( + final Collection> rowGroups + ) + { + return rowGroups.stream() + .flatMap(Collection::stream) + .sorted(incrementalIndexRowComparator); + } + @Override public Iterable keySet() { - return concat(facts.values(), false); + return timeOrderedConcat(facts.values(), false); + } + + @Override + public Iterable persistIterable() + { + return () -> timeAndDimsOrderedConcat(facts.values()).iterator(); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 40ad4ecbe7b..59856fab3ba 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -93,7 +93,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter ) { int rowNum = 0; - for (IncrementalIndexRow row : index.getFacts().keySet()) { + for (IncrementalIndexRow row : index.getFacts().persistIterable()) { final Object[] dims = row.getDims(); for (IncrementalIndex.DimensionDesc dimension : dimensions) { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java index 803460cb8ad..f3634c2882f 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java @@ -50,7 +50,7 @@ class IncrementalIndexRowIterator implements TransformableRowIterator IncrementalIndexRowIterator(IncrementalIndex incrementalIndex) { - this.timeAndDimsIterator = incrementalIndex.getFacts().keySet().iterator(); + this.timeAndDimsIterator = incrementalIndex.getFacts().persistIterable().iterator(); this.currentRowPointer = makeRowPointer(incrementalIndex, currentRowHolder, currentRowNumCounter); // markedRowPointer doesn't actually need to be a RowPointer (just a TimeAndDimsPointer), but we create a RowPointer // in order to reuse the makeRowPointer() method. Passing a dummy RowNumCounter. diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 472adfdc673..198137bb8ba 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -84,7 +84,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex this.bufferPool = bufferPool; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) - : new PlainFactsHolder(sortFacts); + : new PlainFactsHolder(sortFacts, dimsComparator()); //check that stupid pool gives buffers that can hold at least one row's aggregators ResourceHolder bb = bufferPool.take(); diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 0b86dd05ce7..c08f57999b8 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -79,7 +79,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex this.maxRowCount = maxRowCount; this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) - : new PlainFactsHolder(sortFacts); + : new PlainFactsHolder(sortFacts, dimsComparator()); maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema); } diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 0d683d45ec2..0ee0d071cc8 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -240,7 +240,7 @@ public class IncrementalIndexTest } return new IncrementalIndex.Builder() - .setSimpleTestingIndexSchema(aggregatorFactories) + .setSimpleTestingIndexSchema(false, aggregatorFactories) .setMaxRowCount(1000000) .buildOnheap(); } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java index 55f05460b3e..7c24bcef029 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java @@ -19,6 +19,7 @@ package io.druid.segment.incremental; +import io.druid.java.util.common.StringUtils; import io.druid.segment.IndexSpec; import io.druid.segment.IndexableAdapter; import io.druid.segment.RowIterator; @@ -32,6 +33,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; public class IncrementalIndexAdapterTest { @@ -83,4 +85,81 @@ public class IncrementalIndexAdapterTest Assert.assertEquals(0, (long) rowNums.get(0)); Assert.assertEquals(1, (long) rowNums.get(1)); } + + @Test + public void testGetRowsIterableNoRollup() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createNoRollupIndex(null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + + ArrayList dim1Vals = new ArrayList<>(); + for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) { + dim1Vals.add(((int[]) row.getDims()[0])[0]); + } + ArrayList dim2Vals = new ArrayList<>(); + for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) { + dim2Vals.add(((int[]) row.getDims()[1])[0]); + } + + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + INDEX_SPEC.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + RowIterator rows = incrementalAdapter.getRows(); + List rowStrings = new ArrayList<>(); + while (rows.moveToNext()) { + rowStrings.add(rows.getPointer().toString()); + } + + Function getExpected = (rowNumber) -> { + if (rowNumber < 3) { + return StringUtils.format( + "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=1, dim2=2}, metrics={count=1}}", + rowNumber, + timestamp + ); + } else { + return StringUtils.format( + "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=3, dim2=4}, metrics={count=1}}", + rowNumber, + timestamp + ); + } + }; + + + // without sorting, output would be + // RowPointer{indexNum=0, rowNumber=0, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=1, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=2, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=3, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=4, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=5, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}} + // but with sorting, output should be + // RowPointer{indexNum=0, rowNumber=0, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=1, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=2, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=3, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=4, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}} + // RowPointer{indexNum=0, rowNumber=5, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}} + + Assert.assertEquals(6, rowStrings.size()); + for (int i = 0; i < 6; i++) { + if (i % 2 == 0) { + Assert.assertEquals(0, (long) dim1Vals.get(i)); + Assert.assertEquals(0, (long) dim2Vals.get(i)); + } else { + Assert.assertEquals(1, (long) dim1Vals.get(i)); + Assert.assertEquals(1, (long) dim2Vals.get(i)); + } + Assert.assertEquals(getExpected.apply(i), rowStrings.get(i)); + } + } }