diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java index 5f330ee7cf1..6b3b902c3ad 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java @@ -35,11 +35,13 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -66,6 +68,7 @@ import java.util.concurrent.TimeUnit; @Measurement(iterations = 25) public class IndexMergeBenchmark { + @Param({"5"}) private int numSegments; @@ -78,9 +81,13 @@ public class IndexMergeBenchmark @Param({"true", "false"}) private boolean rollup; + @Param({"OFF_HEAP", "TMP_FILE", "ON_HEAP"}) + private SegmentWriteOutType factoryType; + + private static final Logger log = new Logger(IndexMergeBenchmark.class); private static final int RNG_SEED = 9999; - private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; public static final ObjectMapper JSON_MAPPER; @@ -91,6 +98,7 @@ public class IndexMergeBenchmark private List indexesToMerge; private BenchmarkSchemaInfo schemaInfo; private File tmpDir; + private IndexMergerV9 indexMergerV9; static { JSON_MAPPER = new DefaultObjectMapper(); @@ -99,23 +107,16 @@ public class IndexMergeBenchmark JSON_MAPPER.setInjectableValues(injectableValues); INDEX_IO = new IndexIO( JSON_MAPPER, - new ColumnConfig() - { - @Override - public int columnCacheSizeBytes() - { - return 0; - } - } + () -> 0 ); - INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); } @Setup public void setup() throws IOException { - log.info("SETUP CALLED AT " + System.currentTimeMillis()); + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + indexMergerV9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, getSegmentWriteOutMediumFactory(factoryType)); ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde()); indexesToMerge = new ArrayList<>(); @@ -143,7 +144,7 @@ public class IndexMergeBenchmark tmpDir = FileUtils.createTempDir(); log.info("Using temp dir: " + tmpDir.getAbsolutePath()); - File indexFile = INDEX_MERGER_V9.persist( + File indexFile = indexMergerV9.persist( incIndex, tmpDir, new IndexSpec(), @@ -155,26 +156,6 @@ public class IndexMergeBenchmark } } - @TearDown - public void tearDown() throws IOException - { - FileUtils.deleteDirectory(tmpDir); - } - - private IncrementalIndex makeIncIndex() - { - return new IncrementalIndex.Builder() - .setIndexSchema( - new IncrementalIndexSchema.Builder() - .withMetrics(schemaInfo.getAggsArray()) - .withRollup(rollup) - .build() - ) - .setReportParseExceptions(false) - .setMaxRowCount(rowsPerSegment) - .buildOnheap(); - } - @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @@ -186,7 +167,7 @@ public class IndexMergeBenchmark try { log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); - File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex( + File mergedFile = indexMergerV9.mergeQueryableIndex( indexesToMerge, rollup, schemaInfo.getAggsArray(), @@ -199,8 +180,46 @@ public class IndexMergeBenchmark } finally { tmpFile.delete(); - } + } + @TearDown + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(tmpDir); + } + + public enum SegmentWriteOutType + { + TMP_FILE, + OFF_HEAP, + ON_HEAP + } + + private SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(SegmentWriteOutType type) + { + switch (type) { + case TMP_FILE: + return TmpFileSegmentWriteOutMediumFactory.instance(); + case OFF_HEAP: + return OffHeapMemorySegmentWriteOutMediumFactory.instance(); + case ON_HEAP: + return OnHeapMemorySegmentWriteOutMediumFactory.instance(); + } + throw new RuntimeException("Could not create SegmentWriteOutMediumFactory of type: " + type); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java index ee1e7813dca..bda81150a68 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java @@ -63,7 +63,7 @@ public class ByteBufferWriter implements Serializer } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return headerOut.size() + valueOut.size(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java index b33031966cb..6265535a734 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java @@ -80,7 +80,7 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return META_SERDE_HELPER.size(this) + valuesOut.size(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java index 75b7290c67f..a933265d9e1 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java @@ -81,7 +81,7 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return META_SERDE_HELPER.size(this) + valuesOut.size(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 95bc141d2a5..d4254ddd98d 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -296,7 +296,7 @@ public class GenericIndexedWriter implements Serializer } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { if (requireMultipleFiles) { // for multi-file version (version 2), getSerializedSize() returns number of bytes in meta file. @@ -394,7 +394,7 @@ public class GenericIndexedWriter implements Serializer * * @throws IOException */ - private int bagSizePower() throws IOException + private int bagSizePower() { long avgObjectSize = (valuesOut.size() + numWritten - 1) / numWritten; @@ -421,7 +421,7 @@ public class GenericIndexedWriter implements Serializer * * @throws IOException */ - private boolean actuallyFits(int powerTwo) throws IOException + private boolean actuallyFits(int powerTwo) { long lastValueOffset = 0; long currentValueOffset = 0; diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java index 1441c782f18..c7a2d592309 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java @@ -66,7 +66,7 @@ public class ComplexColumnSerializer implements GenericColumnSerializer } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return writer.getSerializedSize(); } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java index bd68aef5f96..c23c97cb3de 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java @@ -106,7 +106,7 @@ public class LargeColumnSupportedComplexColumnSerializer implements GenericCo } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return writer.getSerializedSize(); } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java index ddda31f41cf..113821cee15 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java @@ -117,7 +117,7 @@ public final class MetaSerdeHelper public interface FieldWriter { - void writeTo(ByteBuffer buffer, T x) throws IOException; + void writeTo(ByteBuffer buffer, T x); int size(T x); } @@ -125,10 +125,10 @@ public final class MetaSerdeHelper @FunctionalInterface public interface IntFieldWriter extends FieldWriter { - int getField(T x) throws IOException; + int getField(T x); @Override - default void writeTo(ByteBuffer buffer, T x) throws IOException + default void writeTo(ByteBuffer buffer, T x) { buffer.putInt(getField(x)); } diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index 9ab579da13d..b12b15e518b 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes { private final File file; private final FileChannel ch; + private long writeOutBytes; /** Purposely big-endian, for {@link #writeInt(int)} implementation */ private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer @@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes { this.file = file; this.ch = ch; + this.writeOutBytes = 0L; } private void flushIfNeeded(int bytesNeeded) throws IOException @@ -66,6 +68,7 @@ final class FileWriteOutBytes extends WriteOutBytes { flushIfNeeded(1); buffer.put((byte) b); + writeOutBytes++; } @Override @@ -73,6 +76,7 @@ final class FileWriteOutBytes extends WriteOutBytes { flushIfNeeded(Integer.BYTES); buffer.putInt(v); + writeOutBytes += Integer.BYTES; } @Override @@ -85,6 +89,7 @@ final class FileWriteOutBytes extends WriteOutBytes try { src.limit(src.position() + buffer.capacity()); buffer.put(src); + writeOutBytes += buffer.capacity(); flush(); } finally { @@ -92,7 +97,9 @@ final class FileWriteOutBytes extends WriteOutBytes src.limit(srcLimit); } } + int remaining = src.remaining(); buffer.put(src); + writeOutBytes += remaining; return len; } @@ -103,10 +110,9 @@ final class FileWriteOutBytes extends WriteOutBytes } @Override - public long size() throws IOException + public long size() { - flush(); - return ch.size(); + return writeOutBytes; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java index f6a5e71ce93..e1c2972b148 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java @@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte /** * Returns the number of bytes written to this WriteOutBytes so far. */ - public abstract long size() throws IOException; + public abstract long size(); /** * Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel. diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java index cfaa4181cdb..8501fa61eea 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.writeout; import org.easymock.EasyMock; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -36,28 +37,106 @@ public class FileWriteOutBytesTest @Before public void setUp() { - this.mockFileChannel = EasyMock.mock(FileChannel.class); - this.fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel); + mockFileChannel = EasyMock.mock(FileChannel.class); + fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel); } @Test - public void testWrite4KBInts() throws IOException + public void write4KBIntsShouldNotFlush() throws IOException { // Write 4KB of ints and expect the write operation of the file channel will be triggered only once. - EasyMock.expect(this.mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; int remaining = buffer.remaining(); buffer.position(remaining); return remaining; }).times(1); - EasyMock.replay(this.mockFileChannel); + EasyMock.replay(mockFileChannel); final int writeBytes = 4096; final int numOfInt = writeBytes / Integer.BYTES; for (int i = 0; i < numOfInt; i++) { - this.fileWriteOutBytes.writeInt(i); + fileWriteOutBytes.writeInt(i); } - this.fileWriteOutBytes.flush(); - EasyMock.verify(this.mockFileChannel); + // no need to flush up to 4KB + // the first byte after 4KB will cause a flush + fileWriteOutBytes.write(1); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeShouldIncrementSize() throws IOException + { + fileWriteOutBytes.write(1); + Assert.assertEquals(1, fileWriteOutBytes.size()); + } + + @Test + public void writeIntShouldIncrementSize() throws IOException + { + fileWriteOutBytes.writeInt(1); + Assert.assertEquals(4, fileWriteOutBytes.size()); + } + + @Test + public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).times(1); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 + 1); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).once(); + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new IOException()) + .once(); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1); + try { + fileWriteOutBytes.write(src); + Assert.fail("IOException should have been thrown."); + } + catch (IOException e) { + // The second invocation to flush bytes fails. So the size should count what has already been put successfully + Assert.assertEquals(4096 * 2, fileWriteOutBytes.size()); + } + } + + @Test + public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + ByteBuffer src = ByteBuffer.allocate(4096); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + } + @Test + public void sizeDoesNotFlush() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new AssertionError("file channel should not have been written to.")); + EasyMock.replay(mockFileChannel); + long size = fileWriteOutBytes.size(); + Assert.assertEquals(0, size); + fileWriteOutBytes.writeInt(10); + size = fileWriteOutBytes.size(); + Assert.assertEquals(4, size); } }