GroupByBenchmark: Fix queryable index generation, improve memory use. (#3431)

With the old code, all on-disk segments were the same. Now they're different.
This will end up altering benchmark results for queryMultiQueryableIndex,
likely making them slower (since values won't group as well as they used to).

The memory changes will help test with larger/more segments, since we won't
have to hold them all in memory at once.
This commit is contained in:
Gian Merlino 2016-09-06 14:37:55 -07:00 committed by GitHub
parent 5b1ae21bd1
commit 6827c09311
1 changed files with 87 additions and 43 deletions

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
@ -74,9 +75,11 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.commons.io.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
@ -84,6 +87,7 @@ import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
@ -128,8 +132,9 @@ public class GroupByBenchmark
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private List<IncrementalIndex> incIndexes;
private List<QueryableIndex> qIndexes;
private File tmpDir;
private IncrementalIndex anIncrementalIndex;
private List<QueryableIndex> queryableIndexes;
private QueryRunnerFactory factory;
@ -231,8 +236,7 @@ public class GroupByBenchmark
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
@Setup
@Setup(Level.Trial)
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
@ -251,54 +255,65 @@ public class GroupByBenchmark
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
incIndexes = new ArrayList<>();
final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + 1,
schemaInfo.getDataInterval(),
rowsPerSegment
);
tmpDir = Files.createTempDir();
log.info("Using temp dir: %s", tmpDir.getAbsolutePath());
// queryableIndexes -> numSegments worth of on-disk segments
// anIncrementalIndex -> the last incremental index
anIncrementalIndex = null;
queryableIndexes = new ArrayList<>(numSegments);
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + 1,
schemaInfo.getDataInterval(),
rowsPerSegment
log.info("Generating rows for segment %d/%d", i + 1, numSegments);
final IncrementalIndex index = makeIncIndex();
for (int j = 0; j < rowsPerSegment; j++) {
final InputRow row = dataGenerator.nextRow();
if (j % 20000 == 0) {
log.info("%,d/%,d rows generated.", i * rowsPerSegment + j, rowsPerSegment * numSegments);
}
index.add(row);
}
log.info(
"%,d/%,d rows generated, persisting segment %d/%d.",
(i + 1) * rowsPerSegment,
rowsPerSegment * numSegments,
i + 1,
numSegments
);
IncrementalIndex incIndex = makeIncIndex();
incIndexes.add(incIndex);
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
}
log.info(rowsPerSegment + " rows generated");
}
IncrementalIndex incIndex = incIndexes.get(0);
File tmpFile = Files.createTempDir();
log.info("Using temp dir: " + tmpFile.getAbsolutePath());
tmpFile.deleteOnExit();
qIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
File indexFile = INDEX_MERGER_V9.persist(
incIndex,
tmpFile,
final File file = INDEX_MERGER_V9.persist(
index,
new File(tmpDir, String.valueOf(i)),
new IndexSpec()
);
QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
qIndexes.add(qIndex);
queryableIndexes.add(INDEX_IO.loadIndex(file));
if (i == numSegments - 1) {
anIncrementalIndex = index;
} else {
index.close();
}
}
StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
new OffheapBufferGenerator("compute", 250000000),
new OffheapBufferGenerator("compute", 250_000_000),
Integer.MAX_VALUE
);
// limit of 2 is required since we simulate both historical merge and broker merge in the same process
BlockingPool<ByteBuffer> mergePool = new BlockingPool<>(
new OffheapBufferGenerator("merge", 250000000),
new OffheapBufferGenerator("merge", 250_000_000),
2
);
final GroupByQueryConfig config = new GroupByQueryConfig()
@ -314,6 +329,12 @@ public class GroupByBenchmark
{
return initialBuckets;
}
@Override
public long getMaxOnDiskStorage()
{
return 0L;
}
};
config.setSingleThreaded(false);
config.setMaxIntermediateRows(Integer.MAX_VALUE);
@ -380,6 +401,30 @@ public class GroupByBenchmark
);
}
@TearDown(Level.Trial)
public void tearDown()
{
try {
if (anIncrementalIndex != null) {
anIncrementalIndex.close();
}
if (queryableIndexes != null) {
for (QueryableIndex index : queryableIndexes) {
index.close();
}
}
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
}
catch (IOException e) {
log.warn(e, "Failed to tear down, temp dir was: %s", tmpDir);
throw Throwables.propagate(e);
}
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
{
QueryToolChest toolChest = factory.getToolchest();
@ -392,7 +437,6 @@ public class GroupByBenchmark
return Sequences.toList(queryResult, Lists.<T>newArrayList());
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@ -401,7 +445,7 @@ public class GroupByBenchmark
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"incIndex",
new IncrementalIndexSegment(incIndexes.get(0), "incIndex")
new IncrementalIndexSegment(anIncrementalIndex, "incIndex")
);
List<Row> results = GroupByBenchmark.runQuery(factory, runner, query);
@ -419,7 +463,7 @@ public class GroupByBenchmark
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
"qIndex",
new QueryableIndexSegment("qIndex", qIndexes.get(0))
new QueryableIndexSegment("qIndex", queryableIndexes.get(0))
);
List<Row> results = GroupByBenchmark.runQuery(factory, runner, query);
@ -442,7 +486,7 @@ public class GroupByBenchmark
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, qIndexes.get(i))
new QueryableIndexSegment(segmentName, queryableIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}