diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java index ecef63662f6..60b89223c16 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java @@ -319,11 +319,20 @@ public class ByteBuffAllocator { // just allocate the ByteBuffer from on-heap. bbs.add(allocateOnHeap(remain)); } - ByteBuff bb = ByteBuff.wrap(bbs, () -> { - for (int i = 0; i < lenFromReservoir; i++) { - this.putbackBuffer(bbs.get(i)); - } - }); + + ByteBuff bb; + // we only need a recycler if we successfully pulled from the pool + // this matters for determining whether to add leak detection in RefCnt + if (lenFromReservoir == 0) { + bb = ByteBuff.wrap(bbs); + } else { + bb = ByteBuff.wrap(bbs, () -> { + for (int i = 0; i < lenFromReservoir; i++) { + this.putbackBuffer(bbs.get(i)); + } + }); + } + bb.limit(size); return bb; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 9e2ccc33131..2925fab161b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.nio; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -547,6 +548,28 @@ public abstract class ByteBuff implements HBaseReferenceCounted { return wrap(buffer, RefCnt.create()); } + /** + * Calling this method in strategic locations where ByteBuffs are referenced may help diagnose + * potential buffer leaks. We pass the buffer itself as a default hint, but one can use + * {@link #touch(Object)} to pass their own hint as well. + */ + @Override + public ByteBuff touch() { + return touch(this); + } + + @Override + public ByteBuff touch(Object hint) { + refCnt.touch(hint); + return this; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public RefCnt getRefCnt() { + return refCnt; + } + /** * Make this private because we don't want to expose the refCnt related wrap method to upstream. */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java index c7b6dbf7086..7c1f23383d3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hbase.nio; +import com.google.errorprone.annotations.RestrictedApi; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted; import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; +import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; +import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetectorFactory; +import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakTracker; /** * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the @@ -31,7 +35,10 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; @InterfaceAudience.Private public class RefCnt extends AbstractReferenceCounted { - private Recycler recycler = ByteBuffAllocator.NONE; + private static final ResourceLeakDetector detector = + ResourceLeakDetectorFactory.instance().newResourceLeakDetector(RefCnt.class); + private final Recycler recycler; + private final ResourceLeakTracker leak; /** * Create an {@link RefCnt} with an initial reference count = 1. If the reference count become @@ -49,15 +56,66 @@ public class RefCnt extends AbstractReferenceCounted { public RefCnt(Recycler recycler) { this.recycler = recycler; + this.leak = recycler == ByteBuffAllocator.NONE ? null : detector.track(this); + } + + @Override + public ReferenceCounted retain() { + maybeRecord(); + return super.retain(); + } + + @Override + public ReferenceCounted retain(int increment) { + maybeRecord(); + return super.retain(increment); + } + + @Override + public boolean release() { + maybeRecord(); + return super.release(); + } + + @Override + public boolean release(int decrement) { + maybeRecord(); + return super.release(decrement); } @Override protected final void deallocate() { this.recycler.free(); + if (leak != null) { + this.leak.close(this); + } + } + + @Override + public RefCnt touch() { + maybeRecord(); + return this; } @Override public final ReferenceCounted touch(Object hint) { - throw new UnsupportedOperationException(); + maybeRecord(hint); + return this; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public Recycler getRecycler() { + return recycler; + } + + private void maybeRecord() { + maybeRecord(null); + } + + private void maybeRecord(Object hint) { + if (leak != null) { + leak.record(hint); + } } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java index 7cfdcd659d1..d77dc6604be 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.getHeapAllocationRatio; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -45,6 +46,21 @@ public class TestByteBuffAllocator { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestByteBuffAllocator.class); + @Test + public void testRecycleOnlyPooledBuffers() { + int maxBuffersInPool = 10; + int bufSize = 1024; + int minSize = bufSize / 8; + ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize); + + ByteBuff buff = alloc.allocate(minSize - 1); + assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler()); + + alloc = new ByteBuffAllocator(true, 0, bufSize, minSize); + buff = alloc.allocate(minSize * 2); + assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler()); + } + @Test public void testAllocateByteBuffToReadInto() { int maxBuffersInPool = 10; @@ -329,8 +345,6 @@ public class TestByteBuffAllocator { ByteBuff buf = alloc.allocate(bufSize); assertException(() -> buf.retain(2)); assertException(() -> buf.release(2)); - assertException(() -> buf.touch()); - assertException(() -> buf.touch(new Object())); } private void assertException(Runnable r) { diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java new file mode 100644 index 00000000000..ffc0292902e --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; +import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel; +import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger; +import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory; +import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory; + +@Category({ RPCTests.class, SmallTests.class }) +public class TestByteBuffAllocatorLeakDetection { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestByteBuffAllocatorLeakDetection.class); + + @SuppressWarnings("unused") + @Test + public void testLeakDetection() throws InterruptedException { + InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory(); + AtomicInteger leaksDetected = new AtomicInteger(); + InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected)); + + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + assertTrue(ResourceLeakDetector.isEnabled()); + + try { + int maxBuffersInPool = 10; + int bufSize = 1024; + int minSize = bufSize / 8; + ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize); + + // tracking leaks happens on creation of a RefCnt, through a call to detector.track(). + // If a leak occurs, but detector.track() is never called again, the leak will not be + // realized. Further, causing a leak requires a GC event. So below we do some allocations, + // cause some GC's, do more allocations, and then expect a leak to show up. + + // first allocate on-heap. we expect to not see a leak from this, because we + // dont count on-heap references + alloc.allocate(minSize - 1); + System.gc(); + Thread.sleep(1000); + + // cause an allocation to trigger a leak detection, if there were one. + // keep a reference so we don't trigger a leak right away from this. + ByteBuff reference = alloc.allocate(minSize * 2); + assertEquals(0, leaksDetected.get()); + + // allocate, but don't keep a reference. this should cause a leak + alloc.allocate(minSize * 2); + System.gc(); + Thread.sleep(1000); + + // allocate again, this should cause the above leak to be detected + alloc.allocate(minSize * 2); + assertEquals(1, leaksDetected.get()); + } finally { + InternalLoggerFactory.setDefaultFactory(original); + } + } + + private static class MockedLoggerFactory extends Slf4JLoggerFactory { + + private AtomicInteger leaksDetected; + + public MockedLoggerFactory(AtomicInteger leaksDetected) { + this.leaksDetected = leaksDetected; + } + + @Override + public InternalLogger newInstance(String name) { + InternalLogger delegate = super.newInstance(name); + return new MockedLogger(leaksDetected, delegate); + } + } + + private static class MockedLogger implements InternalLogger { + + private AtomicInteger leaksDetected; + private InternalLogger delegate; + + public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) { + this.leaksDetected = leaksDetected; + this.delegate = delegate; + } + + private void maybeCountLeak(String msgOrFormat) { + if (msgOrFormat.startsWith("LEAK")) { + leaksDetected.incrementAndGet(); + } + } + + @Override + public void error(String msg) { + maybeCountLeak(msg); + delegate.error(msg); + } + + @Override + public void error(String format, Object arg) { + maybeCountLeak(format); + delegate.error(format, arg); + } + + @Override + public void error(String format, Object argA, Object argB) { + maybeCountLeak(format); + delegate.error(format, argA, argB); + } + + @Override + public void error(String format, Object... arguments) { + maybeCountLeak(format); + delegate.error(format, arguments); + } + + @Override + public void error(String msg, Throwable t) { + maybeCountLeak(msg); + delegate.error(msg, t); + } + + @Override + public void error(Throwable t) { + delegate.error(t); + } + + @Override + public String name() { + return delegate.name(); + } + + @Override + public boolean isTraceEnabled() { + return delegate.isTraceEnabled(); + } + + @Override + public void trace(String msg) { + delegate.trace(msg); + } + + @Override + public void trace(String format, Object arg) { + delegate.trace(format, arg); + } + + @Override + public void trace(String format, Object argA, Object argB) { + delegate.trace(format, argA, argB); + } + + @Override + public void trace(String format, Object... arguments) { + delegate.trace(format, arguments); + } + + @Override + public void trace(String msg, Throwable t) { + delegate.trace(msg, t); + } + + @Override + public void trace(Throwable t) { + delegate.trace(t); + } + + @Override + public boolean isDebugEnabled() { + return delegate.isDebugEnabled(); + } + + @Override + public void debug(String msg) { + delegate.debug(msg); + } + + @Override + public void debug(String format, Object arg) { + delegate.debug(format, arg); + } + + @Override + public void debug(String format, Object argA, Object argB) { + delegate.debug(format, argA, argB); + } + + @Override + public void debug(String format, Object... arguments) { + delegate.debug(format, arguments); + } + + @Override + public void debug(String msg, Throwable t) { + delegate.debug(msg, t); + } + + @Override + public void debug(Throwable t) { + delegate.debug(t); + } + + @Override + public boolean isInfoEnabled() { + return delegate.isInfoEnabled(); + } + + @Override + public void info(String msg) { + delegate.info(msg); + } + + @Override + public void info(String format, Object arg) { + delegate.info(format, arg); + } + + @Override + public void info(String format, Object argA, Object argB) { + delegate.info(format, argA, argB); + } + + @Override + public void info(String format, Object... arguments) { + delegate.info(format, arguments); + } + + @Override + public void info(String msg, Throwable t) { + delegate.info(msg, t); + } + + @Override + public void info(Throwable t) { + delegate.info(t); + } + + @Override + public boolean isWarnEnabled() { + return delegate.isWarnEnabled(); + } + + @Override + public void warn(String msg) { + delegate.warn(msg); + } + + @Override + public void warn(String format, Object arg) { + delegate.warn(format, arg); + } + + @Override + public void warn(String format, Object... arguments) { + delegate.warn(format, arguments); + } + + @Override + public void warn(String format, Object argA, Object argB) { + delegate.warn(format, argA, argB); + } + + @Override + public void warn(String msg, Throwable t) { + delegate.warn(msg, t); + } + + @Override + public void warn(Throwable t) { + delegate.warn(t); + } + + @Override + public boolean isErrorEnabled() { + return delegate.isErrorEnabled(); + } + + @Override + public boolean isEnabled(InternalLogLevel level) { + return delegate.isEnabled(level); + } + + @Override + public void log(InternalLogLevel level, String msg) { + delegate.log(level, msg); + } + + @Override + public void log(InternalLogLevel level, String format, Object arg) { + delegate.log(level, format, arg); + } + + @Override + public void log(InternalLogLevel level, String format, Object argA, Object argB) { + delegate.log(level, format, argA, argB); + } + + @Override + public void log(InternalLogLevel level, String format, Object... arguments) { + delegate.log(level, format, arguments); + } + + @Override + public void log(InternalLogLevel level, String msg, Throwable t) { + delegate.log(level, msg, t); + } + + @Override + public void log(InternalLogLevel level, Throwable t) { + delegate.log(level, t); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index d054e83ac19..0da331897dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -418,6 +418,22 @@ public class HFileBlock implements Cacheable { return buf.release(); } + /** + * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose + * potential buffer leaks. We pass the block itself as a default hint, but one can use + * {@link #touch(Object)} to pass their own hint as well. + */ + @Override + public HFileBlock touch() { + return touch(this); + } + + @Override + public HFileBlock touch(Object hint) { + buf.touch(hint); + return this; + } + /** @return get data block encoding id that was used to encode this block */ short getDataBlockEncodingId() { if (blockType != BlockType.ENCODED_DATA) { @@ -616,8 +632,9 @@ public class HFileBlock implements Cacheable { return this; } - HFileBlock unpacked = shallowClone(this); - unpacked.allocateBuffer(); // allocates space for the decompressed block + ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block + HFileBlock unpacked = shallowClone(this, newBuf); + boolean succ = false; try { HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA @@ -643,20 +660,21 @@ public class HFileBlock implements Cacheable { * Always allocates a new buffer of the correct size. Copies header bytes from the existing * buffer. Does not change header fields. Reserve room to keep checksum bytes too. */ - private void allocateBuffer() { + private ByteBuff allocateBufferForUnpacking() { int cksumBytes = totalChecksumBytes(); int headerSize = headerSize(); int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + ByteBuff source = buf.duplicate(); ByteBuff newBuf = allocator.allocate(capacityNeeded); // Copy header bytes into newBuf. - buf.position(0); - newBuf.put(0, buf, 0, headerSize); + source.position(0); + newBuf.put(0, source, 0, headerSize); - buf = newBuf; // set limit to exclude next block's header - buf.limit(capacityNeeded); + newBuf.limit(capacityNeeded); + return newBuf; } /** @@ -708,11 +726,6 @@ public class HFileBlock implements Cacheable { * by default. */ public boolean isSharedMem() { - if (this instanceof SharedMemHFileBlock) { - return true; - } else if (this instanceof ExclusiveMemHFileBlock) { - return false; - } return true; } @@ -1997,23 +2010,31 @@ public class HFileBlock implements Cacheable { + onDiskDataSizeWithHeader; } - private static HFileBlockBuilder createBuilder(HFileBlock blk) { + /** + * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be + * loaded with all of the original fields from blk, except now using the newBuff and setting + * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been + * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a + * {@link SharedMemHFileBlock}. Or vice versa. + * @param blk the block to clone from + * @param newBuff the new buffer to use + */ + private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) { return new HFileBlockBuilder().withBlockType(blk.blockType) .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) - .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the - // buffer. - .withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) + .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset) + .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext) - .withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem()); + .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray()); } - static HFileBlock shallowClone(HFileBlock blk) { - return createBuilder(blk).build(); + private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) { + return createBuilder(blk, newBuf).build(); } static HFileBlock deepCloneOnHeap(HFileBlock blk) { ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit()))); - return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build(); + return createBuilder(blk, deepCloned).build(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java new file mode 100644 index 00000000000..a8399fa6b5a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ IOTests.class, MediumTests.class }) +public class TestHFileBlockUnpack { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHFileBlockUnpack.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + // repetition gives us some chance to get a good compression ratio + private static float CHANCE_TO_REPEAT = 0.6f; + + private static final int MIN_ALLOCATION_SIZE = 10 * 1024; + + ByteBuffAllocator allocator; + + @Rule + public TestName name = new TestName(); + private FileSystem fs; + + @Before + public void setUp() throws Exception { + fs = HFileSystem.get(TEST_UTIL.getConfiguration()); + Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE); + allocator = ByteBuffAllocator.create(conf, true); + + } + + /** + * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that + * block will be allocated to heap regardless of desire for off-heap. After de-compressing the + * block, the new size may now exceed the min allocation size. This test ensures that those + * de-compressed blocks, which will be allocated off-heap, are properly marked as + * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170 + */ + @Test + public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + HFileContext meta = + new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false) + .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); + + Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); + int totalSize; + try (FSDataOutputStream os = fs.create(path)) { + HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta); + hbw.startWriting(BlockType.DATA); + writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1); + hbw.writeHeaderAndData(os); + totalSize = hbw.getOnDiskSizeWithHeader(); + assertTrue( + "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE, + totalSize < MIN_ALLOCATION_SIZE); + } + + try (FSDataInputStream is = fs.open(path)) { + meta = + new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ) + .withIncludesMvcc(false).withIncludesTags(false).build(); + ReaderContext context = + new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) + .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); + HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf); + hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf); + hbr.setIncludesMemStoreTS(false); + HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false); + blockFromHFile.sanityCheck(); + assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked()); + assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem()); + + assertTrue( + "expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader() + + " to be less than " + MIN_ALLOCATION_SIZE, + blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE); + assertTrue( + "expected generated block uncompressed size " + + blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than " + + MIN_ALLOCATION_SIZE, + blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE); + + HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr); + assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked()); + assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem()); + } + } + + static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException { + Random random = new Random(42); + + byte[] family = new byte[] { 1 }; + int rowKey = 0; + int qualifier = 0; + int value = 0; + long timestamp = 0; + + int totalSize = 0; + + // go until just up to the limit. compression should bring the total on-disk size under + while (totalSize < desiredSize) { + rowKey = maybeIncrement(random, rowKey); + qualifier = maybeIncrement(random, qualifier); + value = maybeIncrement(random, value); + timestamp = maybeIncrement(random, (int) timestamp); + + KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier), + timestamp, Bytes.toBytes(value)); + hbw.write(keyValue); + totalSize += keyValue.getLength(); + } + + return totalSize; + } + + private static int maybeIncrement(Random random, int value) { + if (random.nextFloat() < CHANCE_TO_REPEAT) { + return value; + } + return value + 1; + } + +}