HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)

This commit is contained in:
chenxu14 2019-09-09 17:38:33 +08:00 committed by huzheng
parent a8cdee6122
commit 4282d70212
10 changed files with 241 additions and 78 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
@ -450,10 +451,37 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
*/
public abstract int read(ReadableByteChannel channel) throws IOException;
/**
* Reads bytes from FileChannel into this ByteBuff
*/
public abstract int read(FileChannel channel, long offset) throws IOException;
/**
* Write this ByteBuff's data into target file
*/
public abstract int write(FileChannel channel, long offset) throws IOException;
/**
* function interface for Channel read
*/
@FunctionalInterface
interface ChannelReader {
int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
}
static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
return channel.read(buf);
};
static final ChannelReader FILE_READER = (channel, buf, offset) -> {
return ((FileChannel)channel).read(buf, offset);
};
// static helper methods
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
ChannelReader reader) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
return channel.read(buf);
return reader.read(channel, buf, offset);
}
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
@ -463,7 +491,8 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = channel.read(buf);
offset += ret;
ret = reader.read(channel, buf, offset);
if (ret < ioSize) {
break;
}

View File

@ -24,7 +24,10 @@ import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff {
private int markedItemIndex = -1;
private final int[] itemBeginPos;
private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
@Override
public boolean hasNext() {
return curItemIndex < limitedItemIndex ||
(curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
}
@Override
public ByteBuffer next() {
if (curItemIndex >= items.length) {
throw new NoSuchElementException("items overflow");
}
curItem = items[curItemIndex++];
return curItem;
}
};
public MultiByteBuff(ByteBuffer... items) {
this(NONE, items);
}
@ -1064,23 +1084,44 @@ public class MultiByteBuff extends ByteBuff {
return output;
}
@Override
public int read(ReadableByteChannel channel) throws IOException {
private int internalRead(ReadableByteChannel channel, long offset,
ChannelReader reader) throws IOException {
checkRefCount();
int total = 0;
while (true) {
// Read max possible into the current BB
int len = channelRead(channel, this.curItem);
if (len > 0)
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
int len = read(channel, buffer, offset, reader);
if (len > 0) {
total += len;
if (this.curItem.hasRemaining()) {
// We were not able to read enough to fill the current BB itself. Means there is no point in
// doing more reads from Channel. Only this much there for now.
offset += len;
}
if (buffer.hasRemaining()) {
break;
} else {
if (this.curItemIndex >= this.limitedItemIndex) break;
this.curItemIndex++;
this.curItem = this.items[this.curItemIndex];
}
}
return total;
}
@Override
public int read(ReadableByteChannel channel) throws IOException {
return internalRead(channel, 0, CHANNEL_READER);
}
@Override
public int read(FileChannel channel, long offset) throws IOException {
return internalRead(channel, offset, FILE_READER);
}
@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
while (buffer.hasRemaining()) {
int len = channel.write(buffer, offset);
total += len;
offset += len;
}
}
return total;

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
@ -371,7 +372,25 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
return channelRead(channel, buf);
return read(channel, buf, 0, CHANNEL_READER);
}
@Override
public int read(FileChannel channel, long offset) throws IOException {
checkRefCount();
return read(channel, buf, offset, FILE_READER);
}
@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while(buf.hasRemaining()) {
int len = channel.write(buf, offset);
total += len;
offset += len;
}
return total;
}
@Override

View File

@ -502,8 +502,11 @@ public class BucketCache implements BlockCache, HeapSize {
// block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
// the same BucketEntry, then all of the three will share the same refCnt.
Cacheable cachedBlock = ioEngine.read(bucketEntry);
// RPC start to reference, so retain here.
cachedBlock.retain();
if (ioEngine.usesSharedMemory()) {
// If IOEngine use shared memory, cachedBlock and BucketEntry will share the
// same RefCnt, do retain here, in order to count the number of RPC references
cachedBlock.retain();
}
// Update the cache statistics.
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());

View File

@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
private final ByteBuffAllocator allocator;
final ByteBuffAllocator allocator;
/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
@ -194,7 +194,10 @@ class BucketEntry implements HBaseReferenceCounted {
}
Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
}
Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
return this.deserializerReference().deserialize(buf, allocator);
}

View File

@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff;
@ -35,9 +34,9 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
@Override
public Cacheable read(BucketEntry be) throws IOException {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
ByteBuff dst = be.allocator.allocate(be.getLength());
bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers());
return be.wrapAsCacheable(dst);
}
}

View File

@ -129,20 +129,25 @@ public class FileIOEngine implements IOEngine {
long offset = be.offset();
int length = be.getLength();
Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
ByteBuff dstBuff = be.allocator.allocate(length);
if (length != 0) {
accessFile(readAccessor, dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache
// evicts this buffer from the file the data is already copied and there is no need to
// ensure that the results are not corrupted before consuming them.
if (dstBuffer.limit() != length) {
throw new IllegalArgumentIOException(
"Only " + dstBuffer.limit() + " bytes read, " + length + " expected");
try {
accessFile(readAccessor, dstBuff, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache
// evicts this buffer from the file the data is already copied and there is no need to
// ensure that the results are not corrupted before consuming them.
if (dstBuff.limit() != length) {
throw new IllegalArgumentIOException(
"Only " + dstBuff.limit() + " bytes read, " + length + " expected");
}
} catch (IOException ioe) {
dstBuff.release();
throw ioe;
}
}
dstBuffer.rewind();
return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer });
dstBuff.rewind();
return be.wrapAsCacheable(dstBuff);
}
@VisibleForTesting
@ -164,10 +169,7 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
if (!srcBuffer.hasRemaining()) {
return;
}
accessFile(writeAccessor, srcBuffer, offset);
write(ByteBuff.wrap(srcBuffer), offset);
}
/**
@ -208,28 +210,30 @@ public class FileIOEngine implements IOEngine {
}
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
write(dup, offset);
public void write(ByteBuff srcBuff, long offset) throws IOException {
if (!srcBuff.hasRemaining()) {
return;
}
accessFile(writeAccessor, srcBuff, offset);
}
private void accessFile(FileAccessor accessor, ByteBuffer buffer,
private void accessFile(FileAccessor accessor, ByteBuff buff,
long globalOffset) throws IOException {
int startFileNum = getFileNum(globalOffset);
int remainingAccessDataLen = buffer.remaining();
int remainingAccessDataLen = buff.remaining();
int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
int accessFileNum = startFileNum;
long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
int bufLimit = buffer.limit();
int bufLimit = buff.limit();
while (true) {
FileChannel fileChannel = fileChannels[accessFileNum];
int accessLen = 0;
if (endFileNum > accessFileNum) {
// short the limit;
buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
}
try {
accessLen = accessor.access(fileChannel, buffer, accessOffset);
accessLen = accessor.access(fileChannel, buff, accessOffset);
} catch (ClosedByInterruptException e) {
throw e;
} catch (ClosedChannelException e) {
@ -237,7 +241,7 @@ public class FileIOEngine implements IOEngine {
continue;
}
// recover the limit
buffer.limit(bufLimit);
buff.limit(bufLimit);
if (accessLen < remainingAccessDataLen) {
remainingAccessDataLen -= accessLen;
accessFileNum++;
@ -246,7 +250,7 @@ public class FileIOEngine implements IOEngine {
break;
}
if (accessFileNum >= fileChannels.length) {
throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
+ " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
+ globalOffset);
}
@ -304,23 +308,23 @@ public class FileIOEngine implements IOEngine {
}
private interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
throws IOException;
}
private static class FileReadAccessor implements FileAccessor {
@Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException {
return fileChannel.read(byteBuffer, accessOffset);
return buff.read(fileChannel, accessOffset);
}
}
private static class FileWriteAccessor implements FileAccessor {
@Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException {
return fileChannel.write(byteBuffer, accessOffset);
return buff.write(fileChannel, accessOffset);
}
}
}

View File

@ -29,6 +29,8 @@ import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENT
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@ -58,9 +60,14 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ IOTests.class, SmallTests.class })
public class TestHFileScannerImplReferenceCount {
@ -71,6 +78,15 @@ public class TestHFileScannerImplReferenceCount {
@Rule
public TestName CASE = new TestName();
@Parameters(name = "{index}: ioengine={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
new Object[] { "mmap" }, new Object[] { "pmem" });
}
@Parameter
public String ioengine;
private static final Logger LOG =
LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -113,12 +129,16 @@ public class TestHFileScannerImplReferenceCount {
@Before
public void setUp() throws IOException {
String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
this.workDir = UTIL.getDataTestDir(caseName);
if (!"offheap".equals(ioengine)) {
ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
}
UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
this.firstCell = null;
this.secondCell = null;
this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
this.conf = new Configuration(UTIL.getConfiguration());
String caseName = CASE.getMethodName();
this.workDir = UTIL.getDataTestDir(caseName);
this.fs = this.workDir.getFileSystem(conf);
this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
@ -202,34 +222,34 @@ public class TestHFileScannerImplReferenceCount {
scanner.seekTo(firstCell);
curBlock = scanner.curBlock;
Assert.assertEquals(curBlock.refCnt(), 2);
this.assertRefCnt(curBlock, 2);
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
// refCnt should be unchanged.
scanner.seekTo(firstCell);
Assert.assertTrue(curBlock == scanner.curBlock);
Assert.assertEquals(curBlock.refCnt(), 2);
this.assertRefCnt(curBlock, 2);
prevBlock = curBlock;
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
Assert.assertEquals(prevBlock.refCnt(), 2);
Assert.assertEquals(curBlock.refCnt(), 2);
this.assertRefCnt(prevBlock, 2);
this.assertRefCnt(curBlock, 2);
// After shipped, the prevBlock will be release, but curBlock is still referenced by the
// curBlock.
scanner.shipped();
Assert.assertEquals(prevBlock.refCnt(), 1);
Assert.assertEquals(curBlock.refCnt(), 2);
this.assertRefCnt(prevBlock, 1);
this.assertRefCnt(curBlock, 2);
// Try to ship again, though with nothing to client.
scanner.shipped();
Assert.assertEquals(prevBlock.refCnt(), 1);
Assert.assertEquals(curBlock.refCnt(), 2);
this.assertRefCnt(prevBlock, 1);
this.assertRefCnt(curBlock, 2);
// The curBlock will also be released.
scanner.close();
Assert.assertEquals(curBlock.refCnt(), 1);
this.assertRefCnt(curBlock, 1);
// Finish the block & block2 RPC path
Assert.assertTrue(block1.release());
@ -287,7 +307,7 @@ public class TestHFileScannerImplReferenceCount {
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block2);
Assert.assertEquals(1, block2.refCnt());
Assert.assertEquals(2, curBlock.refCnt());
this.assertRefCnt(curBlock, 2);
prevBlock = scanner.curBlock;
// Release the block1, no other reference.
@ -305,22 +325,22 @@ public class TestHFileScannerImplReferenceCount {
// the curBlock is read from IOEngine, so a different block.
Assert.assertFalse(curBlock == block1);
// Two reference for curBlock: 1. scanner; 2. blockCache.
Assert.assertEquals(2, curBlock.refCnt());
this.assertRefCnt(curBlock, 2);
// Reference count of prevBlock must be unchanged because we haven't shipped.
Assert.assertEquals(2, prevBlock.refCnt());
this.assertRefCnt(prevBlock, 2);
// Do the shipped
scanner.shipped();
Assert.assertEquals(scanner.prevBlocks.size(), 0);
Assert.assertNotNull(scanner.curBlock);
Assert.assertEquals(2, curBlock.refCnt());
Assert.assertEquals(1, prevBlock.refCnt());
this.assertRefCnt(curBlock, 2);
this.assertRefCnt(prevBlock, 1);
// Do the close
scanner.close();
Assert.assertNull(scanner.curBlock);
Assert.assertEquals(1, curBlock.refCnt());
Assert.assertEquals(1, prevBlock.refCnt());
this.assertRefCnt(curBlock, 1);
this.assertRefCnt(prevBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
Assert.assertEquals(0, curBlock.refCnt());
@ -340,18 +360,26 @@ public class TestHFileScannerImplReferenceCount {
Assert.assertTrue(scanner.seekTo());
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block1);
Assert.assertEquals(2, curBlock.refCnt());
this.assertRefCnt(curBlock, 2);
// Return false because firstCell <= c[0]
Assert.assertFalse(scanner.seekBefore(firstCell));
// The block1 shouldn't be released because we still don't do the shipped or close.
Assert.assertEquals(2, curBlock.refCnt());
this.assertRefCnt(curBlock, 2);
scanner.close();
Assert.assertEquals(1, curBlock.refCnt());
this.assertRefCnt(curBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
Assert.assertEquals(0, curBlock.refCnt());
}
private void assertRefCnt(HFileBlock block, int value) {
if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
Assert.assertEquals(value, block.refCnt());
} else {
Assert.assertEquals(value - 1, block.refCnt());
}
}
@Test
public void testDefault() throws Exception {
testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
@ -48,8 +49,8 @@ public class TestByteBufferIOEngine {
private static class MockBucketEntry extends BucketEntry {
private long off;
MockBucketEntry(long offset, int length) {
super(offset & 0xFF00, length, 0, false);
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
this.off = offset;
}
@ -66,7 +67,11 @@ public class TestByteBufferIOEngine {
}
static BucketEntry createBucketEntry(long offset, int len) {
BucketEntry be = new MockBucketEntry(offset, len);
return createBucketEntry(offset, len, ByteBuffAllocator.HEAP);
}
static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) {
BucketEntry be = new MockBucketEntry(offset, len, allocator);
be.setDeserializerReference(DESERIALIZER);
return be;
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@ -30,8 +31,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
@ -40,6 +44,9 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Basic test for {@link FileIOEngine}
@ -130,6 +137,31 @@ public class TestFileIOEngine {
assertArrayEquals(data1, data2.array());
}
@Test
public void testReadFailedShouldReleaseByteBuff() {
ByteBuffAllocator alloc = Mockito.mock(ByteBuffAllocator.class);
final RefCnt refCnt = RefCnt.create();
Mockito.when(alloc.allocate(Mockito.anyInt())).thenAnswer(new Answer<ByteBuff>() {
@Override
public ByteBuff answer(InvocationOnMock invocation) throws Throwable {
int len = invocation.getArgument(0);
return ByteBuff.wrap(new ByteBuffer[]{ByteBuffer.allocate(len + 1)}, refCnt);
}
});
int len = 10;
byte[] data1 = new byte[len];
assertEquals(1, refCnt.refCnt());
try {
fileIOEngine.write(ByteBuffer.wrap(data1), 0);
BucketEntry be = createBucketEntry(0, len, alloc);
fileIOEngine.read(be);
fail();
} catch (IOException ioe) {
// expected exception.
}
assertEquals(0, refCnt.refCnt());
}
@Test
public void testClosedChannelException() throws IOException {
fileIOEngine.closeFileChannels();