diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index c04c3f55aae..3e0a830d357 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.nio; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import java.util.List; @@ -450,10 +451,37 @@ public abstract class ByteBuff implements HBaseReferenceCounted { */ public abstract int read(ReadableByteChannel channel) throws IOException; + /** + * Reads bytes from FileChannel into this ByteBuff + */ + public abstract int read(FileChannel channel, long offset) throws IOException; + + /** + * Write this ByteBuff's data into target file + */ + public abstract int write(FileChannel channel, long offset) throws IOException; + + /** + * function interface for Channel read + */ + @FunctionalInterface + interface ChannelReader { + int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException; + } + + static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> { + return channel.read(buf); + }; + + static final ChannelReader FILE_READER = (channel, buf, offset) -> { + return ((FileChannel)channel).read(buf, offset); + }; + // static helper methods - public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { + public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset, + ChannelReader reader) throws IOException { if (buf.remaining() <= NIO_BUFFER_LIMIT) { - return channel.read(buf); + return reader.read(channel, buf, offset); } int originalLimit = buf.limit(); int initialRemaining = buf.remaining(); @@ -463,7 +491,8 @@ public abstract class ByteBuff implements HBaseReferenceCounted { try { int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); buf.limit(buf.position() + ioSize); - ret = channel.read(buf); + offset += ret; + ret = reader.read(channel, buf, offset); if (ret < ioSize) { break; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 3ce17090397..df0ae8eaadc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -24,7 +24,10 @@ import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.InvalidMarkException; +import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; +import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff { private int markedItemIndex = -1; private final int[] itemBeginPos; + private Iterator buffsIterator = new Iterator() { + @Override + public boolean hasNext() { + return curItemIndex < limitedItemIndex || + (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining()); + } + + @Override + public ByteBuffer next() { + if (curItemIndex >= items.length) { + throw new NoSuchElementException("items overflow"); + } + curItem = items[curItemIndex++]; + return curItem; + } + }; + public MultiByteBuff(ByteBuffer... items) { this(NONE, items); } @@ -1064,23 +1084,44 @@ public class MultiByteBuff extends ByteBuff { return output; } - @Override - public int read(ReadableByteChannel channel) throws IOException { + private int internalRead(ReadableByteChannel channel, long offset, + ChannelReader reader) throws IOException { checkRefCount(); int total = 0; - while (true) { - // Read max possible into the current BB - int len = channelRead(channel, this.curItem); - if (len > 0) + while (buffsIterator.hasNext()) { + ByteBuffer buffer = buffsIterator.next(); + int len = read(channel, buffer, offset, reader); + if (len > 0) { total += len; - if (this.curItem.hasRemaining()) { - // We were not able to read enough to fill the current BB itself. Means there is no point in - // doing more reads from Channel. Only this much there for now. + offset += len; + } + if (buffer.hasRemaining()) { break; - } else { - if (this.curItemIndex >= this.limitedItemIndex) break; - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; + } + } + return total; + } + + @Override + public int read(ReadableByteChannel channel) throws IOException { + return internalRead(channel, 0, CHANNEL_READER); + } + + @Override + public int read(FileChannel channel, long offset) throws IOException { + return internalRead(channel, offset, FILE_READER); + } + + @Override + public int write(FileChannel channel, long offset) throws IOException { + checkRefCount(); + int total = 0; + while (buffsIterator.hasNext()) { + ByteBuffer buffer = buffsIterator.next(); + while (buffer.hasRemaining()) { + int len = channel.write(buffer, offset); + total += len; + offset += len; } } return total; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 36a83a0ec21..797bfdc1fff 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; @@ -371,7 +372,25 @@ public class SingleByteBuff extends ByteBuff { @Override public int read(ReadableByteChannel channel) throws IOException { checkRefCount(); - return channelRead(channel, buf); + return read(channel, buf, 0, CHANNEL_READER); + } + + @Override + public int read(FileChannel channel, long offset) throws IOException { + checkRefCount(); + return read(channel, buf, offset, FILE_READER); + } + + @Override + public int write(FileChannel channel, long offset) throws IOException { + checkRefCount(); + int total = 0; + while(buf.hasRemaining()) { + int len = channel.write(buf, offset); + total += len; + offset += len; + } + return total; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 106bf3a706f..ef8c4d16fd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -502,8 +502,11 @@ public class BucketCache implements BlockCache, HeapSize { // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to // the same BucketEntry, then all of the three will share the same refCnt. Cacheable cachedBlock = ioEngine.read(bucketEntry); - // RPC start to reference, so retain here. - cachedBlock.retain(); + if (ioEngine.usesSharedMemory()) { + // If IOEngine use shared memory, cachedBlock and BucketEntry will share the + // same RefCnt, do retain here, in order to count the number of RPC references + cachedBlock.retain(); + } // Update the cache statistics. if (updateCacheMetrics) { cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index ca41ecafb9d..2dd77756e58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted { */ private final RefCnt refCnt; final AtomicBoolean markedAsEvicted; - private final ByteBuffAllocator allocator; + final ByteBuffAllocator allocator; /** * Time this block was cached. Presumes we are created just before we are added to the cache. @@ -194,7 +194,10 @@ class BucketEntry implements HBaseReferenceCounted { } Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException { - ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt); + return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt)); + } + + Cacheable wrapAsCacheable(ByteBuff buf) throws IOException { return this.deserializerReference().deserialize(buf, allocator); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java index 3d7f2b1f3bd..3169a66539a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java @@ -17,7 +17,6 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -35,9 +34,9 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine { @Override public Cacheable read(BucketEntry be) throws IOException { - ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength())); + ByteBuff dst = be.allocator.allocate(be.getLength()); bufferArray.read(be.offset(), dst); dst.position(0).limit(be.getLength()); - return be.wrapAsCacheable(dst.nioByteBuffers()); + return be.wrapAsCacheable(dst); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index b3afe482a02..9e6a75b0dcb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -129,20 +129,25 @@ public class FileIOEngine implements IOEngine { long offset = be.offset(); int length = be.getLength(); Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); - ByteBuffer dstBuffer = ByteBuffer.allocate(length); + ByteBuff dstBuff = be.allocator.allocate(length); if (length != 0) { - accessFile(readAccessor, dstBuffer, offset); - // The buffer created out of the fileChannel is formed by copying the data from the file - // Hence in this case there is no shared memory that we point to. Even if the BucketCache - // evicts this buffer from the file the data is already copied and there is no need to - // ensure that the results are not corrupted before consuming them. - if (dstBuffer.limit() != length) { - throw new IllegalArgumentIOException( - "Only " + dstBuffer.limit() + " bytes read, " + length + " expected"); + try { + accessFile(readAccessor, dstBuff, offset); + // The buffer created out of the fileChannel is formed by copying the data from the file + // Hence in this case there is no shared memory that we point to. Even if the BucketCache + // evicts this buffer from the file the data is already copied and there is no need to + // ensure that the results are not corrupted before consuming them. + if (dstBuff.limit() != length) { + throw new IllegalArgumentIOException( + "Only " + dstBuff.limit() + " bytes read, " + length + " expected"); + } + } catch (IOException ioe) { + dstBuff.release(); + throw ioe; } } - dstBuffer.rewind(); - return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer }); + dstBuff.rewind(); + return be.wrapAsCacheable(dstBuff); } @VisibleForTesting @@ -164,10 +169,7 @@ public class FileIOEngine implements IOEngine { */ @Override public void write(ByteBuffer srcBuffer, long offset) throws IOException { - if (!srcBuffer.hasRemaining()) { - return; - } - accessFile(writeAccessor, srcBuffer, offset); + write(ByteBuff.wrap(srcBuffer), offset); } /** @@ -208,28 +210,30 @@ public class FileIOEngine implements IOEngine { } @Override - public void write(ByteBuff srcBuffer, long offset) throws IOException { - ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate(); - write(dup, offset); + public void write(ByteBuff srcBuff, long offset) throws IOException { + if (!srcBuff.hasRemaining()) { + return; + } + accessFile(writeAccessor, srcBuff, offset); } - private void accessFile(FileAccessor accessor, ByteBuffer buffer, + private void accessFile(FileAccessor accessor, ByteBuff buff, long globalOffset) throws IOException { int startFileNum = getFileNum(globalOffset); - int remainingAccessDataLen = buffer.remaining(); + int remainingAccessDataLen = buff.remaining(); int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); int accessFileNum = startFileNum; long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); - int bufLimit = buffer.limit(); + int bufLimit = buff.limit(); while (true) { FileChannel fileChannel = fileChannels[accessFileNum]; int accessLen = 0; if (endFileNum > accessFileNum) { // short the limit; - buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); + buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); } try { - accessLen = accessor.access(fileChannel, buffer, accessOffset); + accessLen = accessor.access(fileChannel, buff, accessOffset); } catch (ClosedByInterruptException e) { throw e; } catch (ClosedChannelException e) { @@ -237,7 +241,7 @@ public class FileIOEngine implements IOEngine { continue; } // recover the limit - buffer.limit(bufLimit); + buff.limit(bufLimit); if (accessLen < remainingAccessDataLen) { remainingAccessDataLen -= accessLen; accessFileNum++; @@ -246,7 +250,7 @@ public class FileIOEngine implements IOEngine { break; } if (accessFileNum >= fileChannels.length) { - throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining()) + throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining()) + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" + globalOffset); } @@ -304,23 +308,23 @@ public class FileIOEngine implements IOEngine { } private interface FileAccessor { - int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) + int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException; } private static class FileReadAccessor implements FileAccessor { @Override - public int access(FileChannel fileChannel, ByteBuffer byteBuffer, + public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException { - return fileChannel.read(byteBuffer, accessOffset); + return buff.read(fileChannel, accessOffset); } } private static class FileWriteAccessor implements FileAccessor { @Override - public int access(FileChannel fileChannel, ByteBuffer byteBuffer, + public int access(FileChannel fileChannel, ByteBuff buff, long accessOffset) throws IOException { - return fileChannel.write(byteBuffer, accessOffset); + return buff.write(fileChannel, accessOffset); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java index dd9a1c80bed..fa670399eb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java @@ -29,6 +29,8 @@ import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENT import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -58,9 +60,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) @Category({ IOTests.class, SmallTests.class }) public class TestHFileScannerImplReferenceCount { @@ -71,6 +78,15 @@ public class TestHFileScannerImplReferenceCount { @Rule public TestName CASE = new TestName(); + @Parameters(name = "{index}: ioengine={0}") + public static Collection data() { + return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" }, + new Object[] { "mmap" }, new Object[] { "pmem" }); + } + + @Parameter + public String ioengine; + private static final Logger LOG = LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @@ -113,12 +129,16 @@ public class TestHFileScannerImplReferenceCount { @Before public void setUp() throws IOException { + String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"); + this.workDir = UTIL.getDataTestDir(caseName); + if (!"offheap".equals(ioengine)) { + ioengine = ioengine + ":" + workDir.toString() + "/cachedata"; + } + UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine); this.firstCell = null; this.secondCell = null; this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true); this.conf = new Configuration(UTIL.getConfiguration()); - String caseName = CASE.getMethodName(); - this.workDir = UTIL.getDataTestDir(caseName); this.fs = this.workDir.getFileSystem(conf); this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis()); LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName); @@ -202,34 +222,34 @@ public class TestHFileScannerImplReferenceCount { scanner.seekTo(firstCell); curBlock = scanner.curBlock; - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(curBlock, 2); // Seek to the block again, the curBlock won't change and won't read from BlockCache. so // refCnt should be unchanged. scanner.seekTo(firstCell); Assert.assertTrue(curBlock == scanner.curBlock); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(curBlock, 2); prevBlock = curBlock; scanner.seekTo(secondCell); curBlock = scanner.curBlock; - Assert.assertEquals(prevBlock.refCnt(), 2); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(prevBlock, 2); + this.assertRefCnt(curBlock, 2); // After shipped, the prevBlock will be release, but curBlock is still referenced by the // curBlock. scanner.shipped(); - Assert.assertEquals(prevBlock.refCnt(), 1); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(prevBlock, 1); + this.assertRefCnt(curBlock, 2); // Try to ship again, though with nothing to client. scanner.shipped(); - Assert.assertEquals(prevBlock.refCnt(), 1); - Assert.assertEquals(curBlock.refCnt(), 2); + this.assertRefCnt(prevBlock, 1); + this.assertRefCnt(curBlock, 2); // The curBlock will also be released. scanner.close(); - Assert.assertEquals(curBlock.refCnt(), 1); + this.assertRefCnt(curBlock, 1); // Finish the block & block2 RPC path Assert.assertTrue(block1.release()); @@ -287,7 +307,7 @@ public class TestHFileScannerImplReferenceCount { curBlock = scanner.curBlock; Assert.assertFalse(curBlock == block2); Assert.assertEquals(1, block2.refCnt()); - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); prevBlock = scanner.curBlock; // Release the block1, no other reference. @@ -305,22 +325,22 @@ public class TestHFileScannerImplReferenceCount { // the curBlock is read from IOEngine, so a different block. Assert.assertFalse(curBlock == block1); // Two reference for curBlock: 1. scanner; 2. blockCache. - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); // Reference count of prevBlock must be unchanged because we haven't shipped. - Assert.assertEquals(2, prevBlock.refCnt()); + this.assertRefCnt(prevBlock, 2); // Do the shipped scanner.shipped(); Assert.assertEquals(scanner.prevBlocks.size(), 0); Assert.assertNotNull(scanner.curBlock); - Assert.assertEquals(2, curBlock.refCnt()); - Assert.assertEquals(1, prevBlock.refCnt()); + this.assertRefCnt(curBlock, 2); + this.assertRefCnt(prevBlock, 1); // Do the close scanner.close(); Assert.assertNull(scanner.curBlock); - Assert.assertEquals(1, curBlock.refCnt()); - Assert.assertEquals(1, prevBlock.refCnt()); + this.assertRefCnt(curBlock, 1); + this.assertRefCnt(prevBlock, 1); Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); Assert.assertEquals(0, curBlock.refCnt()); @@ -340,18 +360,26 @@ public class TestHFileScannerImplReferenceCount { Assert.assertTrue(scanner.seekTo()); curBlock = scanner.curBlock; Assert.assertFalse(curBlock == block1); - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); // Return false because firstCell <= c[0] Assert.assertFalse(scanner.seekBefore(firstCell)); // The block1 shouldn't be released because we still don't do the shipped or close. - Assert.assertEquals(2, curBlock.refCnt()); + this.assertRefCnt(curBlock, 2); scanner.close(); - Assert.assertEquals(1, curBlock.refCnt()); + this.assertRefCnt(curBlock, 1); Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1); Assert.assertEquals(0, curBlock.refCnt()); } + private void assertRefCnt(HFileBlock block, int value) { + if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) { + Assert.assertEquals(value, block.refCnt()); + } else { + Assert.assertEquals(value - 1, block.refCnt()); + } + } + @Test public void testDefault() throws Exception { testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index 2184fa5488d..d1b8f9a87e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Assert; @@ -48,8 +49,8 @@ public class TestByteBufferIOEngine { private static class MockBucketEntry extends BucketEntry { private long off; - MockBucketEntry(long offset, int length) { - super(offset & 0xFF00, length, 0, false); + MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { + super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator); this.off = offset; } @@ -66,7 +67,11 @@ public class TestByteBufferIOEngine { } static BucketEntry createBucketEntry(long offset, int len) { - BucketEntry be = new MockBucketEntry(offset, len); + return createBucketEntry(offset, len, ByteBuffAllocator.HEAP); + } + + static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) { + BucketEntry be = new MockBucketEntry(offset, len, allocator); be.setDeserializerReference(DESERIALIZER); return be; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 6bd91d0d221..8f86e8309db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -30,8 +31,11 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; + import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.After; @@ -40,6 +44,9 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * Basic test for {@link FileIOEngine} @@ -130,6 +137,31 @@ public class TestFileIOEngine { assertArrayEquals(data1, data2.array()); } + @Test + public void testReadFailedShouldReleaseByteBuff() { + ByteBuffAllocator alloc = Mockito.mock(ByteBuffAllocator.class); + final RefCnt refCnt = RefCnt.create(); + Mockito.when(alloc.allocate(Mockito.anyInt())).thenAnswer(new Answer() { + @Override + public ByteBuff answer(InvocationOnMock invocation) throws Throwable { + int len = invocation.getArgument(0); + return ByteBuff.wrap(new ByteBuffer[]{ByteBuffer.allocate(len + 1)}, refCnt); + } + }); + int len = 10; + byte[] data1 = new byte[len]; + assertEquals(1, refCnt.refCnt()); + try { + fileIOEngine.write(ByteBuffer.wrap(data1), 0); + BucketEntry be = createBucketEntry(0, len, alloc); + fileIOEngine.read(be); + fail(); + } catch (IOException ioe) { + // expected exception. + } + assertEquals(0, refCnt.refCnt()); + } + @Test public void testClosedChannelException() throws IOException { fileIOEngine.closeFileChannels();