Optimize FileWriteOutBytes to avoid high system cpu usage (#9722)

* optimize FileWriteOutBytes to avoid high sys cpu

* optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException

* optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size

* Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size"

This reverts commit 965f7421

* Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException"

This reverts commit 149e08c0

* optimize FileWriteOutBytes to avoid high sys cpu -- avoid IOEception never thrown check

* Fix size counting to handle IOE in FileWriteOutBytes + tests

* remove unused throws IOException in WriteOutBytes.size()

* Remove redundant throws IOExcpetion clauses

* Parameterize IndexMergeBenchmark

Co-authored-by: huanghui.bigrey <huanghui.bigrey@bytedance.com>
Co-authored-by: Suneet Saldanha <suneet.saldanha@imply.io>
This commit is contained in:
BIGrey 2020-04-24 11:18:42 +08:00 committed by GitHub
parent 4087a015e8
commit c5bfe36011
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 162 additions and 58 deletions

View File

@ -35,11 +35,13 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -66,6 +68,7 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class IndexMergeBenchmark
{
@Param({"5"})
private int numSegments;
@ -78,9 +81,13 @@ public class IndexMergeBenchmark
@Param({"true", "false"})
private boolean rollup;
@Param({"OFF_HEAP", "TMP_FILE", "ON_HEAP"})
private SegmentWriteOutType factoryType;
private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
@ -91,6 +98,7 @@ public class IndexMergeBenchmark
private List<QueryableIndex> indexesToMerge;
private BenchmarkSchemaInfo schemaInfo;
private File tmpDir;
private IndexMergerV9 indexMergerV9;
static {
JSON_MAPPER = new DefaultObjectMapper();
@ -99,23 +107,16 @@ public class IndexMergeBenchmark
JSON_MAPPER.setInjectableValues(injectableValues);
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
() -> 0
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}
@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
log.info("SETUP CALLED AT " + System.currentTimeMillis());
indexMergerV9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, getSegmentWriteOutMediumFactory(factoryType));
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
indexesToMerge = new ArrayList<>();
@ -143,7 +144,7 @@ public class IndexMergeBenchmark
tmpDir = FileUtils.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());
File indexFile = INDEX_MERGER_V9.persist(
File indexFile = indexMergerV9.persist(
incIndex,
tmpDir,
new IndexSpec(),
@ -155,26 +156,6 @@ public class IndexMergeBenchmark
}
}
@TearDown
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}
private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@ -186,7 +167,7 @@ public class IndexMergeBenchmark
try {
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(
File mergedFile = indexMergerV9.mergeQueryableIndex(
indexesToMerge,
rollup,
schemaInfo.getAggsArray(),
@ -199,8 +180,46 @@ public class IndexMergeBenchmark
}
finally {
tmpFile.delete();
}
}
@TearDown
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}
public enum SegmentWriteOutType
{
TMP_FILE,
OFF_HEAP,
ON_HEAP
}
private SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(SegmentWriteOutType type)
{
switch (type) {
case TMP_FILE:
return TmpFileSegmentWriteOutMediumFactory.instance();
case OFF_HEAP:
return OffHeapMemorySegmentWriteOutMediumFactory.instance();
case ON_HEAP:
return OnHeapMemorySegmentWriteOutMediumFactory.instance();
}
throw new RuntimeException("Could not create SegmentWriteOutMediumFactory of type: " + type);
}
private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
}

View File

@ -63,7 +63,7 @@ public class ByteBufferWriter<T> implements Serializer
}
@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return headerOut.size() + valueOut.size();
}

View File

@ -80,7 +80,7 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
}
@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}

View File

@ -81,7 +81,7 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
}
@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}

View File

@ -296,7 +296,7 @@ public class GenericIndexedWriter<T> implements Serializer
}
@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
if (requireMultipleFiles) {
// for multi-file version (version 2), getSerializedSize() returns number of bytes in meta file.
@ -394,7 +394,7 @@ public class GenericIndexedWriter<T> implements Serializer
*
* @throws IOException
*/
private int bagSizePower() throws IOException
private int bagSizePower()
{
long avgObjectSize = (valuesOut.size() + numWritten - 1) / numWritten;
@ -421,7 +421,7 @@ public class GenericIndexedWriter<T> implements Serializer
*
* @throws IOException
*/
private boolean actuallyFits(int powerTwo) throws IOException
private boolean actuallyFits(int powerTwo)
{
long lastValueOffset = 0;
long currentValueOffset = 0;

View File

@ -66,7 +66,7 @@ public class ComplexColumnSerializer<T> implements GenericColumnSerializer<T>
}
@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return writer.getSerializedSize();
}

View File

@ -106,7 +106,7 @@ public class LargeColumnSupportedComplexColumnSerializer<T> implements GenericCo
}
@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return writer.getSerializedSize();
}

View File

@ -117,7 +117,7 @@ public final class MetaSerdeHelper<T>
public interface FieldWriter<T>
{
void writeTo(ByteBuffer buffer, T x) throws IOException;
void writeTo(ByteBuffer buffer, T x);
int size(T x);
}
@ -125,10 +125,10 @@ public final class MetaSerdeHelper<T>
@FunctionalInterface
public interface IntFieldWriter<T> extends FieldWriter<T>
{
int getField(T x) throws IOException;
int getField(T x);
@Override
default void writeTo(ByteBuffer buffer, T x) throws IOException
default void writeTo(ByteBuffer buffer, T x)
{
buffer.putInt(getField(x));
}

View File

@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
private final File file;
private final FileChannel ch;
private long writeOutBytes;
/** Purposely big-endian, for {@link #writeInt(int)} implementation */
private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer
@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
this.file = file;
this.ch = ch;
this.writeOutBytes = 0L;
}
private void flushIfNeeded(int bytesNeeded) throws IOException
@ -66,6 +68,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
flushIfNeeded(1);
buffer.put((byte) b);
writeOutBytes++;
}
@Override
@ -73,6 +76,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
flushIfNeeded(Integer.BYTES);
buffer.putInt(v);
writeOutBytes += Integer.BYTES;
}
@Override
@ -85,6 +89,7 @@ final class FileWriteOutBytes extends WriteOutBytes
try {
src.limit(src.position() + buffer.capacity());
buffer.put(src);
writeOutBytes += buffer.capacity();
flush();
}
finally {
@ -92,7 +97,9 @@ final class FileWriteOutBytes extends WriteOutBytes
src.limit(srcLimit);
}
}
int remaining = src.remaining();
buffer.put(src);
writeOutBytes += remaining;
return len;
}
@ -103,10 +110,9 @@ final class FileWriteOutBytes extends WriteOutBytes
}
@Override
public long size() throws IOException
public long size()
{
flush();
return ch.size();
return writeOutBytes;
}
@Override

View File

@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte
/**
* Returns the number of bytes written to this WriteOutBytes so far.
*/
public abstract long size() throws IOException;
public abstract long size();
/**
* Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel.

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.writeout;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -36,28 +37,106 @@ public class FileWriteOutBytesTest
@Before
public void setUp()
{
this.mockFileChannel = EasyMock.mock(FileChannel.class);
this.fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel);
mockFileChannel = EasyMock.mock(FileChannel.class);
fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel);
}
@Test
public void testWrite4KBInts() throws IOException
public void write4KBIntsShouldNotFlush() throws IOException
{
// Write 4KB of ints and expect the write operation of the file channel will be triggered only once.
EasyMock.expect(this.mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
.andAnswer(() -> {
ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
int remaining = buffer.remaining();
buffer.position(remaining);
return remaining;
}).times(1);
EasyMock.replay(this.mockFileChannel);
EasyMock.replay(mockFileChannel);
final int writeBytes = 4096;
final int numOfInt = writeBytes / Integer.BYTES;
for (int i = 0; i < numOfInt; i++) {
this.fileWriteOutBytes.writeInt(i);
fileWriteOutBytes.writeInt(i);
}
this.fileWriteOutBytes.flush();
EasyMock.verify(this.mockFileChannel);
// no need to flush up to 4KB
// the first byte after 4KB will cause a flush
fileWriteOutBytes.write(1);
EasyMock.verify(mockFileChannel);
}
@Test
public void writeShouldIncrementSize() throws IOException
{
fileWriteOutBytes.write(1);
Assert.assertEquals(1, fileWriteOutBytes.size());
}
@Test
public void writeIntShouldIncrementSize() throws IOException
{
fileWriteOutBytes.writeInt(1);
Assert.assertEquals(4, fileWriteOutBytes.size());
}
@Test
public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException
{
EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
.andAnswer(() -> {
ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
int remaining = buffer.remaining();
buffer.position(remaining);
return remaining;
}).times(1);
EasyMock.replay(mockFileChannel);
ByteBuffer src = ByteBuffer.allocate(4096 + 1);
fileWriteOutBytes.write(src);
Assert.assertEquals(src.capacity(), fileWriteOutBytes.size());
EasyMock.verify(mockFileChannel);
}
@Test
public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException
{
EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
.andAnswer(() -> {
ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
int remaining = buffer.remaining();
buffer.position(remaining);
return remaining;
}).once();
EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
.andThrow(new IOException())
.once();
EasyMock.replay(mockFileChannel);
ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1);
try {
fileWriteOutBytes.write(src);
Assert.fail("IOException should have been thrown.");
}
catch (IOException e) {
// The second invocation to flush bytes fails. So the size should count what has already been put successfully
Assert.assertEquals(4096 * 2, fileWriteOutBytes.size());
}
}
@Test
public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException
{
ByteBuffer src = ByteBuffer.allocate(4096);
fileWriteOutBytes.write(src);
Assert.assertEquals(src.capacity(), fileWriteOutBytes.size());
}
@Test
public void sizeDoesNotFlush() throws IOException
{
EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
.andThrow(new AssertionError("file channel should not have been written to."));
EasyMock.replay(mockFileChannel);
long size = fileWriteOutBytes.size();
Assert.assertEquals(0, size);
fileWriteOutBytes.writeInt(10);
size = fileWriteOutBytes.size();
Assert.assertEquals(4, size);
}
}