From 6827c093119aa76f5459e4121de74f839b7949ef Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 6 Sep 2016 14:37:55 -0700 Subject: [PATCH] GroupByBenchmark: Fix queryable index generation, improve memory use. (#3431) With the old code, all on-disk segments were the same. Now they're different. This will end up altering benchmark results for queryMultiQueryableIndex, likely making them slower (since values won't group as well as they used to). The memory changes will help test with larger/more segments, since we won't have to hold them all in memory at once. --- .../benchmark/query/GroupByBenchmark.java | 130 ++++++++++++------ 1 file changed, 87 insertions(+), 43 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 16efbbba13b..077affbf96b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.hash.Hashing; @@ -74,9 +75,11 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; +import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; @@ -84,6 +87,7 @@ import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; @@ -128,8 +132,9 @@ public class GroupByBenchmark private static final IndexIO INDEX_IO; public static final ObjectMapper JSON_MAPPER; - private List incIndexes; - private List qIndexes; + private File tmpDir; + private IncrementalIndex anIncrementalIndex; + private List queryableIndexes; private QueryRunnerFactory factory; @@ -231,8 +236,7 @@ public class GroupByBenchmark SCHEMA_QUERY_MAP.put("basic", basicQueries); } - - @Setup + @Setup(Level.Trial) public void setup() throws IOException { log.info("SETUP CALLED AT " + +System.currentTimeMillis()); @@ -251,54 +255,65 @@ public class GroupByBenchmark schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName); - incIndexes = new ArrayList<>(); + final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + 1, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + tmpDir = Files.createTempDir(); + log.info("Using temp dir: %s", tmpDir.getAbsolutePath()); + + // queryableIndexes -> numSegments worth of on-disk segments + // anIncrementalIndex -> the last incremental index + anIncrementalIndex = null; + queryableIndexes = new ArrayList<>(numSegments); + for (int i = 0; i < numSegments; i++) { - log.info("Generating rows for segment " + i); - BenchmarkDataGenerator gen = new BenchmarkDataGenerator( - schemaInfo.getColumnSchemas(), - RNG_SEED + 1, - schemaInfo.getDataInterval(), - rowsPerSegment + log.info("Generating rows for segment %d/%d", i + 1, numSegments); + + final IncrementalIndex index = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + final InputRow row = dataGenerator.nextRow(); + if (j % 20000 == 0) { + log.info("%,d/%,d rows generated.", i * rowsPerSegment + j, rowsPerSegment * numSegments); + } + index.add(row); + } + + log.info( + "%,d/%,d rows generated, persisting segment %d/%d.", + (i + 1) * rowsPerSegment, + rowsPerSegment * numSegments, + i + 1, + numSegments ); - IncrementalIndex incIndex = makeIncIndex(); - incIndexes.add(incIndex); - for (int j = 0; j < rowsPerSegment; j++) { - InputRow row = gen.nextRow(); - if (j % 10000 == 0) { - log.info(j + " rows generated."); - } - incIndex.add(row); - } - log.info(rowsPerSegment + " rows generated"); - - } - - IncrementalIndex incIndex = incIndexes.get(0); - - File tmpFile = Files.createTempDir(); - log.info("Using temp dir: " + tmpFile.getAbsolutePath()); - tmpFile.deleteOnExit(); - - qIndexes = new ArrayList<>(); - for (int i = 0; i < numSegments; i++) { - File indexFile = INDEX_MERGER_V9.persist( - incIndex, - tmpFile, + final File file = INDEX_MERGER_V9.persist( + index, + new File(tmpDir, String.valueOf(i)), new IndexSpec() ); - QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); - qIndexes.add(qIndex); + + queryableIndexes.add(INDEX_IO.loadIndex(file)); + + if (i == numSegments - 1) { + anIncrementalIndex = index; + } else { + index.close(); + } } StupidPool bufferPool = new StupidPool<>( - new OffheapBufferGenerator("compute", 250000000), + new OffheapBufferGenerator("compute", 250_000_000), Integer.MAX_VALUE ); // limit of 2 is required since we simulate both historical merge and broker merge in the same process BlockingPool mergePool = new BlockingPool<>( - new OffheapBufferGenerator("merge", 250000000), + new OffheapBufferGenerator("merge", 250_000_000), 2 ); final GroupByQueryConfig config = new GroupByQueryConfig() @@ -314,6 +329,12 @@ public class GroupByBenchmark { return initialBuckets; } + + @Override + public long getMaxOnDiskStorage() + { + return 0L; + } }; config.setSingleThreaded(false); config.setMaxIntermediateRows(Integer.MAX_VALUE); @@ -380,6 +401,30 @@ public class GroupByBenchmark ); } + @TearDown(Level.Trial) + public void tearDown() + { + try { + if (anIncrementalIndex != null) { + anIncrementalIndex.close(); + } + + if (queryableIndexes != null) { + for (QueryableIndex index : queryableIndexes) { + index.close(); + } + } + + if (tmpDir != null) { + FileUtils.deleteDirectory(tmpDir); + } + } + catch (IOException e) { + log.warn(e, "Failed to tear down, temp dir was: %s", tmpDir); + throw Throwables.propagate(e); + } + } + private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) { QueryToolChest toolChest = factory.getToolchest(); @@ -392,7 +437,6 @@ public class GroupByBenchmark return Sequences.toList(queryResult, Lists.newArrayList()); } - @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @@ -401,7 +445,7 @@ public class GroupByBenchmark QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( factory, "incIndex", - new IncrementalIndexSegment(incIndexes.get(0), "incIndex") + new IncrementalIndexSegment(anIncrementalIndex, "incIndex") ); List results = GroupByBenchmark.runQuery(factory, runner, query); @@ -419,7 +463,7 @@ public class GroupByBenchmark QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( factory, "qIndex", - new QueryableIndexSegment("qIndex", qIndexes.get(0)) + new QueryableIndexSegment("qIndex", queryableIndexes.get(0)) ); List results = GroupByBenchmark.runQuery(factory, runner, query); @@ -442,7 +486,7 @@ public class GroupByBenchmark QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( factory, segmentName, - new QueryableIndexSegment(segmentName, qIndexes.get(i)) + new QueryableIndexSegment(segmentName, queryableIndexes.get(i)) ); singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); }