HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)

This commit is contained in:
chenxu14 2019-09-09 17:38:33 +08:00 committed by openinx
parent 7648855e59
commit fb7230c3f1
10 changed files with 241 additions and 78 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.nio;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.List; import java.util.List;
@ -450,10 +451,37 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
*/ */
public abstract int read(ReadableByteChannel channel) throws IOException; public abstract int read(ReadableByteChannel channel) throws IOException;
// static helper methods /**
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { * Reads bytes from FileChannel into this ByteBuff
if (buf.remaining() <= NIO_BUFFER_LIMIT) { */
public abstract int read(FileChannel channel, long offset) throws IOException;
/**
* Write this ByteBuff's data into target file
*/
public abstract int write(FileChannel channel, long offset) throws IOException;
/**
* function interface for Channel read
*/
@FunctionalInterface
interface ChannelReader {
int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
}
static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
return channel.read(buf); return channel.read(buf);
};
static final ChannelReader FILE_READER = (channel, buf, offset) -> {
return ((FileChannel)channel).read(buf, offset);
};
// static helper methods
public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
ChannelReader reader) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
return reader.read(channel, buf, offset);
} }
int originalLimit = buf.limit(); int originalLimit = buf.limit();
int initialRemaining = buf.remaining(); int initialRemaining = buf.remaining();
@ -463,7 +491,8 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
try { try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize); buf.limit(buf.position() + ioSize);
ret = channel.read(buf); offset += ret;
ret = reader.read(channel, buf, offset);
if (ret < ioSize) { if (ret < ioSize) {
break; break;
} }

View File

@ -24,7 +24,10 @@ import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.InvalidMarkException; import java.nio.InvalidMarkException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff {
private int markedItemIndex = -1; private int markedItemIndex = -1;
private final int[] itemBeginPos; private final int[] itemBeginPos;
private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
@Override
public boolean hasNext() {
return curItemIndex < limitedItemIndex ||
(curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
}
@Override
public ByteBuffer next() {
if (curItemIndex >= items.length) {
throw new NoSuchElementException("items overflow");
}
curItem = items[curItemIndex++];
return curItem;
}
};
public MultiByteBuff(ByteBuffer... items) { public MultiByteBuff(ByteBuffer... items) {
this(NONE, items); this(NONE, items);
} }
@ -1064,23 +1084,44 @@ public class MultiByteBuff extends ByteBuff {
return output; return output;
} }
@Override private int internalRead(ReadableByteChannel channel, long offset,
public int read(ReadableByteChannel channel) throws IOException { ChannelReader reader) throws IOException {
checkRefCount(); checkRefCount();
int total = 0; int total = 0;
while (true) { while (buffsIterator.hasNext()) {
// Read max possible into the current BB ByteBuffer buffer = buffsIterator.next();
int len = channelRead(channel, this.curItem); int len = read(channel, buffer, offset, reader);
if (len > 0) if (len > 0) {
total += len; total += len;
if (this.curItem.hasRemaining()) { offset += len;
// We were not able to read enough to fill the current BB itself. Means there is no point in }
// doing more reads from Channel. Only this much there for now. if (buffer.hasRemaining()) {
break; break;
} else { }
if (this.curItemIndex >= this.limitedItemIndex) break; }
this.curItemIndex++; return total;
this.curItem = this.items[this.curItemIndex]; }
@Override
public int read(ReadableByteChannel channel) throws IOException {
return internalRead(channel, 0, CHANNEL_READER);
}
@Override
public int read(FileChannel channel, long offset) throws IOException {
return internalRead(channel, offset, FILE_READER);
}
@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while (buffsIterator.hasNext()) {
ByteBuffer buffer = buffsIterator.next();
while (buffer.hasRemaining()) {
int len = channel.write(buffer, offset);
total += len;
offset += len;
} }
} }
return total; return total;

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
@ -371,7 +372,25 @@ public class SingleByteBuff extends ByteBuff {
@Override @Override
public int read(ReadableByteChannel channel) throws IOException { public int read(ReadableByteChannel channel) throws IOException {
checkRefCount(); checkRefCount();
return channelRead(channel, buf); return read(channel, buf, 0, CHANNEL_READER);
}
@Override
public int read(FileChannel channel, long offset) throws IOException {
checkRefCount();
return read(channel, buf, offset, FILE_READER);
}
@Override
public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
while(buf.hasRemaining()) {
int len = channel.write(buf, offset);
total += len;
offset += len;
}
return total;
} }
@Override @Override

View File

@ -502,8 +502,11 @@ public class BucketCache implements BlockCache, HeapSize {
// block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
// the same BucketEntry, then all of the three will share the same refCnt. // the same BucketEntry, then all of the three will share the same refCnt.
Cacheable cachedBlock = ioEngine.read(bucketEntry); Cacheable cachedBlock = ioEngine.read(bucketEntry);
// RPC start to reference, so retain here. if (ioEngine.usesSharedMemory()) {
// If IOEngine use shared memory, cachedBlock and BucketEntry will share the
// same RefCnt, do retain here, in order to count the number of RPC references
cachedBlock.retain(); cachedBlock.retain();
}
// Update the cache statistics. // Update the cache statistics.
if (updateCacheMetrics) { if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); cacheStats.hit(caching, key.isPrimary(), key.getBlockType());

View File

@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/ */
private final RefCnt refCnt; private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted; final AtomicBoolean markedAsEvicted;
private final ByteBuffAllocator allocator; final ByteBuffAllocator allocator;
/** /**
* Time this block was cached. Presumes we are created just before we are added to the cache. * Time this block was cached. Presumes we are created just before we are added to the cache.
@ -194,7 +194,10 @@ class BucketEntry implements HBaseReferenceCounted {
} }
Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException { Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt); return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
}
Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
return this.deserializerReference().deserialize(buf, allocator); return this.deserializerReference().deserialize(buf, allocator);
} }

View File

@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket; package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
@ -35,9 +34,9 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
@Override @Override
public Cacheable read(BucketEntry be) throws IOException { public Cacheable read(BucketEntry be) throws IOException {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength())); ByteBuff dst = be.allocator.allocate(be.getLength());
bufferArray.read(be.offset(), dst); bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength()); dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers()); return be.wrapAsCacheable(dst);
} }
} }

View File

@ -129,20 +129,25 @@ public class FileIOEngine implements IOEngine {
long offset = be.offset(); long offset = be.offset();
int length = be.getLength(); int length = be.getLength();
Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
ByteBuffer dstBuffer = ByteBuffer.allocate(length); ByteBuff dstBuff = be.allocator.allocate(length);
if (length != 0) { if (length != 0) {
accessFile(readAccessor, dstBuffer, offset); try {
accessFile(readAccessor, dstBuff, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file // The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache // Hence in this case there is no shared memory that we point to. Even if the BucketCache
// evicts this buffer from the file the data is already copied and there is no need to // evicts this buffer from the file the data is already copied and there is no need to
// ensure that the results are not corrupted before consuming them. // ensure that the results are not corrupted before consuming them.
if (dstBuffer.limit() != length) { if (dstBuff.limit() != length) {
throw new IllegalArgumentIOException( throw new IllegalArgumentIOException(
"Only " + dstBuffer.limit() + " bytes read, " + length + " expected"); "Only " + dstBuff.limit() + " bytes read, " + length + " expected");
}
} catch (IOException ioe) {
dstBuff.release();
throw ioe;
} }
} }
dstBuffer.rewind(); dstBuff.rewind();
return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer }); return be.wrapAsCacheable(dstBuff);
} }
@VisibleForTesting @VisibleForTesting
@ -164,10 +169,7 @@ public class FileIOEngine implements IOEngine {
*/ */
@Override @Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException { public void write(ByteBuffer srcBuffer, long offset) throws IOException {
if (!srcBuffer.hasRemaining()) { write(ByteBuff.wrap(srcBuffer), offset);
return;
}
accessFile(writeAccessor, srcBuffer, offset);
} }
/** /**
@ -208,28 +210,30 @@ public class FileIOEngine implements IOEngine {
} }
@Override @Override
public void write(ByteBuff srcBuffer, long offset) throws IOException { public void write(ByteBuff srcBuff, long offset) throws IOException {
ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate(); if (!srcBuff.hasRemaining()) {
write(dup, offset); return;
}
accessFile(writeAccessor, srcBuff, offset);
} }
private void accessFile(FileAccessor accessor, ByteBuffer buffer, private void accessFile(FileAccessor accessor, ByteBuff buff,
long globalOffset) throws IOException { long globalOffset) throws IOException {
int startFileNum = getFileNum(globalOffset); int startFileNum = getFileNum(globalOffset);
int remainingAccessDataLen = buffer.remaining(); int remainingAccessDataLen = buff.remaining();
int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
int accessFileNum = startFileNum; int accessFileNum = startFileNum;
long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
int bufLimit = buffer.limit(); int bufLimit = buff.limit();
while (true) { while (true) {
FileChannel fileChannel = fileChannels[accessFileNum]; FileChannel fileChannel = fileChannels[accessFileNum];
int accessLen = 0; int accessLen = 0;
if (endFileNum > accessFileNum) { if (endFileNum > accessFileNum) {
// short the limit; // short the limit;
buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
} }
try { try {
accessLen = accessor.access(fileChannel, buffer, accessOffset); accessLen = accessor.access(fileChannel, buff, accessOffset);
} catch (ClosedByInterruptException e) { } catch (ClosedByInterruptException e) {
throw e; throw e;
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -237,7 +241,7 @@ public class FileIOEngine implements IOEngine {
continue; continue;
} }
// recover the limit // recover the limit
buffer.limit(bufLimit); buff.limit(bufLimit);
if (accessLen < remainingAccessDataLen) { if (accessLen < remainingAccessDataLen) {
remainingAccessDataLen -= accessLen; remainingAccessDataLen -= accessLen;
accessFileNum++; accessFileNum++;
@ -246,7 +250,7 @@ public class FileIOEngine implements IOEngine {
break; break;
} }
if (accessFileNum >= fileChannels.length) { if (accessFileNum >= fileChannels.length) {
throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining()) throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
+ " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
+ globalOffset); + globalOffset);
} }
@ -304,23 +308,23 @@ public class FileIOEngine implements IOEngine {
} }
private interface FileAccessor { private interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
throws IOException; throws IOException;
} }
private static class FileReadAccessor implements FileAccessor { private static class FileReadAccessor implements FileAccessor {
@Override @Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer, public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException { long accessOffset) throws IOException {
return fileChannel.read(byteBuffer, accessOffset); return buff.read(fileChannel, accessOffset);
} }
} }
private static class FileWriteAccessor implements FileAccessor { private static class FileWriteAccessor implements FileAccessor {
@Override @Override
public int access(FileChannel fileChannel, ByteBuffer byteBuffer, public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException { long accessOffset) throws IOException {
return fileChannel.write(byteBuffer, accessOffset); return buff.write(fileChannel, accessOffset);
} }
} }
} }

View File

@ -29,6 +29,8 @@ import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENT
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -58,9 +60,14 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@Category({ IOTests.class, SmallTests.class }) @Category({ IOTests.class, SmallTests.class })
public class TestHFileScannerImplReferenceCount { public class TestHFileScannerImplReferenceCount {
@ -71,6 +78,15 @@ public class TestHFileScannerImplReferenceCount {
@Rule @Rule
public TestName CASE = new TestName(); public TestName CASE = new TestName();
@Parameters(name = "{index}: ioengine={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
new Object[] { "mmap" }, new Object[] { "pmem" });
}
@Parameter
public String ioengine;
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class); LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -113,12 +129,16 @@ public class TestHFileScannerImplReferenceCount {
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
this.workDir = UTIL.getDataTestDir(caseName);
if (!"offheap".equals(ioengine)) {
ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
}
UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
this.firstCell = null; this.firstCell = null;
this.secondCell = null; this.secondCell = null;
this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true); this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
this.conf = new Configuration(UTIL.getConfiguration()); this.conf = new Configuration(UTIL.getConfiguration());
String caseName = CASE.getMethodName();
this.workDir = UTIL.getDataTestDir(caseName);
this.fs = this.workDir.getFileSystem(conf); this.fs = this.workDir.getFileSystem(conf);
this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis()); this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName); LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
@ -202,34 +222,34 @@ public class TestHFileScannerImplReferenceCount {
scanner.seekTo(firstCell); scanner.seekTo(firstCell);
curBlock = scanner.curBlock; curBlock = scanner.curBlock;
Assert.assertEquals(curBlock.refCnt(), 2); this.assertRefCnt(curBlock, 2);
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
// refCnt should be unchanged. // refCnt should be unchanged.
scanner.seekTo(firstCell); scanner.seekTo(firstCell);
Assert.assertTrue(curBlock == scanner.curBlock); Assert.assertTrue(curBlock == scanner.curBlock);
Assert.assertEquals(curBlock.refCnt(), 2); this.assertRefCnt(curBlock, 2);
prevBlock = curBlock; prevBlock = curBlock;
scanner.seekTo(secondCell); scanner.seekTo(secondCell);
curBlock = scanner.curBlock; curBlock = scanner.curBlock;
Assert.assertEquals(prevBlock.refCnt(), 2); this.assertRefCnt(prevBlock, 2);
Assert.assertEquals(curBlock.refCnt(), 2); this.assertRefCnt(curBlock, 2);
// After shipped, the prevBlock will be release, but curBlock is still referenced by the // After shipped, the prevBlock will be release, but curBlock is still referenced by the
// curBlock. // curBlock.
scanner.shipped(); scanner.shipped();
Assert.assertEquals(prevBlock.refCnt(), 1); this.assertRefCnt(prevBlock, 1);
Assert.assertEquals(curBlock.refCnt(), 2); this.assertRefCnt(curBlock, 2);
// Try to ship again, though with nothing to client. // Try to ship again, though with nothing to client.
scanner.shipped(); scanner.shipped();
Assert.assertEquals(prevBlock.refCnt(), 1); this.assertRefCnt(prevBlock, 1);
Assert.assertEquals(curBlock.refCnt(), 2); this.assertRefCnt(curBlock, 2);
// The curBlock will also be released. // The curBlock will also be released.
scanner.close(); scanner.close();
Assert.assertEquals(curBlock.refCnt(), 1); this.assertRefCnt(curBlock, 1);
// Finish the block & block2 RPC path // Finish the block & block2 RPC path
Assert.assertTrue(block1.release()); Assert.assertTrue(block1.release());
@ -287,7 +307,7 @@ public class TestHFileScannerImplReferenceCount {
curBlock = scanner.curBlock; curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block2); Assert.assertFalse(curBlock == block2);
Assert.assertEquals(1, block2.refCnt()); Assert.assertEquals(1, block2.refCnt());
Assert.assertEquals(2, curBlock.refCnt()); this.assertRefCnt(curBlock, 2);
prevBlock = scanner.curBlock; prevBlock = scanner.curBlock;
// Release the block1, no other reference. // Release the block1, no other reference.
@ -305,22 +325,22 @@ public class TestHFileScannerImplReferenceCount {
// the curBlock is read from IOEngine, so a different block. // the curBlock is read from IOEngine, so a different block.
Assert.assertFalse(curBlock == block1); Assert.assertFalse(curBlock == block1);
// Two reference for curBlock: 1. scanner; 2. blockCache. // Two reference for curBlock: 1. scanner; 2. blockCache.
Assert.assertEquals(2, curBlock.refCnt()); this.assertRefCnt(curBlock, 2);
// Reference count of prevBlock must be unchanged because we haven't shipped. // Reference count of prevBlock must be unchanged because we haven't shipped.
Assert.assertEquals(2, prevBlock.refCnt()); this.assertRefCnt(prevBlock, 2);
// Do the shipped // Do the shipped
scanner.shipped(); scanner.shipped();
Assert.assertEquals(scanner.prevBlocks.size(), 0); Assert.assertEquals(scanner.prevBlocks.size(), 0);
Assert.assertNotNull(scanner.curBlock); Assert.assertNotNull(scanner.curBlock);
Assert.assertEquals(2, curBlock.refCnt()); this.assertRefCnt(curBlock, 2);
Assert.assertEquals(1, prevBlock.refCnt()); this.assertRefCnt(prevBlock, 1);
// Do the close // Do the close
scanner.close(); scanner.close();
Assert.assertNull(scanner.curBlock); Assert.assertNull(scanner.curBlock);
Assert.assertEquals(1, curBlock.refCnt()); this.assertRefCnt(curBlock, 1);
Assert.assertEquals(1, prevBlock.refCnt()); this.assertRefCnt(prevBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
Assert.assertEquals(0, curBlock.refCnt()); Assert.assertEquals(0, curBlock.refCnt());
@ -340,18 +360,26 @@ public class TestHFileScannerImplReferenceCount {
Assert.assertTrue(scanner.seekTo()); Assert.assertTrue(scanner.seekTo());
curBlock = scanner.curBlock; curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block1); Assert.assertFalse(curBlock == block1);
Assert.assertEquals(2, curBlock.refCnt()); this.assertRefCnt(curBlock, 2);
// Return false because firstCell <= c[0] // Return false because firstCell <= c[0]
Assert.assertFalse(scanner.seekBefore(firstCell)); Assert.assertFalse(scanner.seekBefore(firstCell));
// The block1 shouldn't be released because we still don't do the shipped or close. // The block1 shouldn't be released because we still don't do the shipped or close.
Assert.assertEquals(2, curBlock.refCnt()); this.assertRefCnt(curBlock, 2);
scanner.close(); scanner.close();
Assert.assertEquals(1, curBlock.refCnt()); this.assertRefCnt(curBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1); Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
Assert.assertEquals(0, curBlock.refCnt()); Assert.assertEquals(0, curBlock.refCnt());
} }
private void assertRefCnt(HFileBlock block, int value) {
if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
Assert.assertEquals(value, block.refCnt());
} else {
Assert.assertEquals(value - 1, block.refCnt());
}
}
@Test @Test
public void testDefault() throws Exception { public void testDefault() throws Exception {
testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE); testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert; import org.junit.Assert;
@ -48,8 +49,8 @@ public class TestByteBufferIOEngine {
private static class MockBucketEntry extends BucketEntry { private static class MockBucketEntry extends BucketEntry {
private long off; private long off;
MockBucketEntry(long offset, int length) { MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
super(offset & 0xFF00, length, 0, false); super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
this.off = offset; this.off = offset;
} }
@ -66,7 +67,11 @@ public class TestByteBufferIOEngine {
} }
static BucketEntry createBucketEntry(long offset, int len) { static BucketEntry createBucketEntry(long offset, int len) {
BucketEntry be = new MockBucketEntry(offset, len); return createBucketEntry(offset, len, ByteBuffAllocator.HEAP);
}
static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) {
BucketEntry be = new MockBucketEntry(offset, len, allocator);
be.setDeserializerReference(DESERIALIZER); be.setDeserializerReference(DESERIALIZER);
return be; return be;
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -30,8 +31,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After; import org.junit.After;
@ -40,6 +44,9 @@ import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** /**
* Basic test for {@link FileIOEngine} * Basic test for {@link FileIOEngine}
@ -130,6 +137,31 @@ public class TestFileIOEngine {
assertArrayEquals(data1, data2.array()); assertArrayEquals(data1, data2.array());
} }
@Test
public void testReadFailedShouldReleaseByteBuff() {
ByteBuffAllocator alloc = Mockito.mock(ByteBuffAllocator.class);
final RefCnt refCnt = RefCnt.create();
Mockito.when(alloc.allocate(Mockito.anyInt())).thenAnswer(new Answer<ByteBuff>() {
@Override
public ByteBuff answer(InvocationOnMock invocation) throws Throwable {
int len = invocation.getArgument(0);
return ByteBuff.wrap(new ByteBuffer[]{ByteBuffer.allocate(len + 1)}, refCnt);
}
});
int len = 10;
byte[] data1 = new byte[len];
assertEquals(1, refCnt.refCnt());
try {
fileIOEngine.write(ByteBuffer.wrap(data1), 0);
BucketEntry be = createBucketEntry(0, len, alloc);
fileIOEngine.read(be);
fail();
} catch (IOException ioe) {
// expected exception.
}
assertEquals(0, refCnt.refCnt());
}
@Test @Test
public void testClosedChannelException() throws IOException { public void testClosedChannelException() throws IOException {
fileIOEngine.closeFileChannels(); fileIOEngine.closeFileChannels();