HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Bryan Beaudreault 2022-07-04 16:18:45 -04:00 committed by GitHub
parent 724bf7837a
commit f3f292fad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 664 additions and 29 deletions

View File

@ -319,11 +319,20 @@ public class ByteBuffAllocator {
// just allocate the ByteBuffer from on-heap.
bbs.add(allocateOnHeap(remain));
}
ByteBuff bb = ByteBuff.wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
});
ByteBuff bb;
// we only need a recycler if we successfully pulled from the pool
// this matters for determining whether to add leak detection in RefCnt
if (lenFromReservoir == 0) {
bb = ByteBuff.wrap(bbs);
} else {
bb = ByteBuff.wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
});
}
bb.limit(size);
return bb;
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.nio;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@ -547,6 +548,28 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
return wrap(buffer, RefCnt.create());
}
/**
* Calling this method in strategic locations where ByteBuffs are referenced may help diagnose
* potential buffer leaks. We pass the buffer itself as a default hint, but one can use
* {@link #touch(Object)} to pass their own hint as well.
*/
@Override
public ByteBuff touch() {
return touch(this);
}
@Override
public ByteBuff touch(Object hint) {
refCnt.touch(hint);
return this;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public RefCnt getRefCnt() {
return refCnt;
}
/**
* Make this private because we don't want to expose the refCnt related wrap method to upstream.
*/

View File

@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.nio;
import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetectorFactory;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakTracker;
/**
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
@ -31,7 +35,10 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
@InterfaceAudience.Private
public class RefCnt extends AbstractReferenceCounted {
private Recycler recycler = ByteBuffAllocator.NONE;
private static final ResourceLeakDetector<RefCnt> detector =
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(RefCnt.class);
private final Recycler recycler;
private final ResourceLeakTracker<RefCnt> leak;
/**
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
@ -49,15 +56,66 @@ public class RefCnt extends AbstractReferenceCounted {
public RefCnt(Recycler recycler) {
this.recycler = recycler;
this.leak = recycler == ByteBuffAllocator.NONE ? null : detector.track(this);
}
@Override
public ReferenceCounted retain() {
maybeRecord();
return super.retain();
}
@Override
public ReferenceCounted retain(int increment) {
maybeRecord();
return super.retain(increment);
}
@Override
public boolean release() {
maybeRecord();
return super.release();
}
@Override
public boolean release(int decrement) {
maybeRecord();
return super.release(decrement);
}
@Override
protected final void deallocate() {
this.recycler.free();
if (leak != null) {
this.leak.close(this);
}
}
@Override
public RefCnt touch() {
maybeRecord();
return this;
}
@Override
public final ReferenceCounted touch(Object hint) {
throw new UnsupportedOperationException();
maybeRecord(hint);
return this;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public Recycler getRecycler() {
return recycler;
}
private void maybeRecord() {
maybeRecord(null);
}
private void maybeRecord(Object hint) {
if (leak != null) {
leak.record(hint);
}
}
}

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.getHeapAllocationRatio;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -45,6 +46,21 @@ public class TestByteBuffAllocator {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
@Test
public void testRecycleOnlyPooledBuffers() {
int maxBuffersInPool = 10;
int bufSize = 1024;
int minSize = bufSize / 8;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
ByteBuff buff = alloc.allocate(minSize - 1);
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
alloc = new ByteBuffAllocator(true, 0, bufSize, minSize);
buff = alloc.allocate(minSize * 2);
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
}
@Test
public void testAllocateByteBuffToReadInto() {
int maxBuffersInPool = 10;
@ -329,8 +345,6 @@ public class TestByteBuffAllocator {
ByteBuff buf = alloc.allocate(bufSize);
assertException(() -> buf.retain(2));
assertException(() -> buf.release(2));
assertException(() -> buf.touch());
assertException(() -> buf.touch(new Object()));
}
private void assertException(Runnable r) {

View File

@ -0,0 +1,341 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel;
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger;
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory;
@Category({ RPCTests.class, SmallTests.class })
public class TestByteBuffAllocatorLeakDetection {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBuffAllocatorLeakDetection.class);
@SuppressWarnings("unused")
@Test
public void testLeakDetection() throws InterruptedException {
InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory();
AtomicInteger leaksDetected = new AtomicInteger();
InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected));
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
assertTrue(ResourceLeakDetector.isEnabled());
try {
int maxBuffersInPool = 10;
int bufSize = 1024;
int minSize = bufSize / 8;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
// tracking leaks happens on creation of a RefCnt, through a call to detector.track().
// If a leak occurs, but detector.track() is never called again, the leak will not be
// realized. Further, causing a leak requires a GC event. So below we do some allocations,
// cause some GC's, do more allocations, and then expect a leak to show up.
// first allocate on-heap. we expect to not see a leak from this, because we
// dont count on-heap references
alloc.allocate(minSize - 1);
System.gc();
Thread.sleep(1000);
// cause an allocation to trigger a leak detection, if there were one.
// keep a reference so we don't trigger a leak right away from this.
ByteBuff reference = alloc.allocate(minSize * 2);
assertEquals(0, leaksDetected.get());
// allocate, but don't keep a reference. this should cause a leak
alloc.allocate(minSize * 2);
System.gc();
Thread.sleep(1000);
// allocate again, this should cause the above leak to be detected
alloc.allocate(minSize * 2);
assertEquals(1, leaksDetected.get());
} finally {
InternalLoggerFactory.setDefaultFactory(original);
}
}
private static class MockedLoggerFactory extends Slf4JLoggerFactory {
private AtomicInteger leaksDetected;
public MockedLoggerFactory(AtomicInteger leaksDetected) {
this.leaksDetected = leaksDetected;
}
@Override
public InternalLogger newInstance(String name) {
InternalLogger delegate = super.newInstance(name);
return new MockedLogger(leaksDetected, delegate);
}
}
private static class MockedLogger implements InternalLogger {
private AtomicInteger leaksDetected;
private InternalLogger delegate;
public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) {
this.leaksDetected = leaksDetected;
this.delegate = delegate;
}
private void maybeCountLeak(String msgOrFormat) {
if (msgOrFormat.startsWith("LEAK")) {
leaksDetected.incrementAndGet();
}
}
@Override
public void error(String msg) {
maybeCountLeak(msg);
delegate.error(msg);
}
@Override
public void error(String format, Object arg) {
maybeCountLeak(format);
delegate.error(format, arg);
}
@Override
public void error(String format, Object argA, Object argB) {
maybeCountLeak(format);
delegate.error(format, argA, argB);
}
@Override
public void error(String format, Object... arguments) {
maybeCountLeak(format);
delegate.error(format, arguments);
}
@Override
public void error(String msg, Throwable t) {
maybeCountLeak(msg);
delegate.error(msg, t);
}
@Override
public void error(Throwable t) {
delegate.error(t);
}
@Override
public String name() {
return delegate.name();
}
@Override
public boolean isTraceEnabled() {
return delegate.isTraceEnabled();
}
@Override
public void trace(String msg) {
delegate.trace(msg);
}
@Override
public void trace(String format, Object arg) {
delegate.trace(format, arg);
}
@Override
public void trace(String format, Object argA, Object argB) {
delegate.trace(format, argA, argB);
}
@Override
public void trace(String format, Object... arguments) {
delegate.trace(format, arguments);
}
@Override
public void trace(String msg, Throwable t) {
delegate.trace(msg, t);
}
@Override
public void trace(Throwable t) {
delegate.trace(t);
}
@Override
public boolean isDebugEnabled() {
return delegate.isDebugEnabled();
}
@Override
public void debug(String msg) {
delegate.debug(msg);
}
@Override
public void debug(String format, Object arg) {
delegate.debug(format, arg);
}
@Override
public void debug(String format, Object argA, Object argB) {
delegate.debug(format, argA, argB);
}
@Override
public void debug(String format, Object... arguments) {
delegate.debug(format, arguments);
}
@Override
public void debug(String msg, Throwable t) {
delegate.debug(msg, t);
}
@Override
public void debug(Throwable t) {
delegate.debug(t);
}
@Override
public boolean isInfoEnabled() {
return delegate.isInfoEnabled();
}
@Override
public void info(String msg) {
delegate.info(msg);
}
@Override
public void info(String format, Object arg) {
delegate.info(format, arg);
}
@Override
public void info(String format, Object argA, Object argB) {
delegate.info(format, argA, argB);
}
@Override
public void info(String format, Object... arguments) {
delegate.info(format, arguments);
}
@Override
public void info(String msg, Throwable t) {
delegate.info(msg, t);
}
@Override
public void info(Throwable t) {
delegate.info(t);
}
@Override
public boolean isWarnEnabled() {
return delegate.isWarnEnabled();
}
@Override
public void warn(String msg) {
delegate.warn(msg);
}
@Override
public void warn(String format, Object arg) {
delegate.warn(format, arg);
}
@Override
public void warn(String format, Object... arguments) {
delegate.warn(format, arguments);
}
@Override
public void warn(String format, Object argA, Object argB) {
delegate.warn(format, argA, argB);
}
@Override
public void warn(String msg, Throwable t) {
delegate.warn(msg, t);
}
@Override
public void warn(Throwable t) {
delegate.warn(t);
}
@Override
public boolean isErrorEnabled() {
return delegate.isErrorEnabled();
}
@Override
public boolean isEnabled(InternalLogLevel level) {
return delegate.isEnabled(level);
}
@Override
public void log(InternalLogLevel level, String msg) {
delegate.log(level, msg);
}
@Override
public void log(InternalLogLevel level, String format, Object arg) {
delegate.log(level, format, arg);
}
@Override
public void log(InternalLogLevel level, String format, Object argA, Object argB) {
delegate.log(level, format, argA, argB);
}
@Override
public void log(InternalLogLevel level, String format, Object... arguments) {
delegate.log(level, format, arguments);
}
@Override
public void log(InternalLogLevel level, String msg, Throwable t) {
delegate.log(level, msg, t);
}
@Override
public void log(InternalLogLevel level, Throwable t) {
delegate.log(level, t);
}
}
}

View File

@ -418,6 +418,22 @@ public class HFileBlock implements Cacheable {
return buf.release();
}
/**
* Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
* potential buffer leaks. We pass the block itself as a default hint, but one can use
* {@link #touch(Object)} to pass their own hint as well.
*/
@Override
public HFileBlock touch() {
return touch(this);
}
@Override
public HFileBlock touch(Object hint) {
buf.touch(hint);
return this;
}
/** @return get data block encoding id that was used to encode this block */
short getDataBlockEncodingId() {
if (blockType != BlockType.ENCODED_DATA) {
@ -616,8 +632,9 @@ public class HFileBlock implements Cacheable {
return this;
}
HFileBlock unpacked = shallowClone(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
HFileBlock unpacked = shallowClone(this, newBuf);
boolean succ = false;
try {
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
@ -643,20 +660,21 @@ public class HFileBlock implements Cacheable {
* Always allocates a new buffer of the correct size. Copies header bytes from the existing
* buffer. Does not change header fields. Reserve room to keep checksum bytes too.
*/
private void allocateBuffer() {
private ByteBuff allocateBufferForUnpacking() {
int cksumBytes = totalChecksumBytes();
int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
ByteBuff source = buf.duplicate();
ByteBuff newBuf = allocator.allocate(capacityNeeded);
// Copy header bytes into newBuf.
buf.position(0);
newBuf.put(0, buf, 0, headerSize);
source.position(0);
newBuf.put(0, source, 0, headerSize);
buf = newBuf;
// set limit to exclude next block's header
buf.limit(capacityNeeded);
newBuf.limit(capacityNeeded);
return newBuf;
}
/**
@ -708,11 +726,6 @@ public class HFileBlock implements Cacheable {
* by default.
*/
public boolean isSharedMem() {
if (this instanceof SharedMemHFileBlock) {
return true;
} else if (this instanceof ExclusiveMemHFileBlock) {
return false;
}
return true;
}
@ -1997,23 +2010,31 @@ public class HFileBlock implements Cacheable {
+ onDiskDataSizeWithHeader;
}
private static HFileBlockBuilder createBuilder(HFileBlock blk) {
/**
* Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
* loaded with all of the original fields from blk, except now using the newBuff and setting
* isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
* an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
* {@link SharedMemHFileBlock}. Or vice versa.
* @param blk the block to clone from
* @param newBuff the new buffer to use
*/
private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
return new HFileBlockBuilder().withBlockType(blk.blockType)
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
.withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the
// buffer.
.withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
.withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
.withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
.withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem());
.withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
}
static HFileBlock shallowClone(HFileBlock blk) {
return createBuilder(blk).build();
private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
return createBuilder(blk, newBuf).build();
}
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
return createBuilder(blk, deepCloned).build();
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ IOTests.class, MediumTests.class })
public class TestHFileBlockUnpack {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHFileBlockUnpack.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
// repetition gives us some chance to get a good compression ratio
private static float CHANCE_TO_REPEAT = 0.6f;
private static final int MIN_ALLOCATION_SIZE = 10 * 1024;
ByteBuffAllocator allocator;
@Rule
public TestName name = new TestName();
private FileSystem fs;
@Before
public void setUp() throws Exception {
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
allocator = ByteBuffAllocator.create(conf, true);
}
/**
* If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that
* block will be allocated to heap regardless of desire for off-heap. After de-compressing the
* block, the new size may now exceed the min allocation size. This test ensures that those
* de-compressed blocks, which will be allocated off-heap, are properly marked as
* {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170
*/
@Test
public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
HFileContext meta =
new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
.withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
int totalSize;
try (FSDataOutputStream os = fs.create(path)) {
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta);
hbw.startWriting(BlockType.DATA);
writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
hbw.writeHeaderAndData(os);
totalSize = hbw.getOnDiskSizeWithHeader();
assertTrue(
"expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
totalSize < MIN_ALLOCATION_SIZE);
}
try (FSDataInputStream is = fs.open(path)) {
meta =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
.withIncludesMvcc(false).withIncludesTags(false).build();
ReaderContext context =
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf);
hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf);
hbr.setIncludesMemStoreTS(false);
HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
blockFromHFile.sanityCheck();
assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked());
assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem());
assertTrue(
"expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader()
+ " to be less than " + MIN_ALLOCATION_SIZE,
blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
assertTrue(
"expected generated block uncompressed size "
+ blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than "
+ MIN_ALLOCATION_SIZE,
blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr);
assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked());
assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem());
}
}
static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
Random random = new Random(42);
byte[] family = new byte[] { 1 };
int rowKey = 0;
int qualifier = 0;
int value = 0;
long timestamp = 0;
int totalSize = 0;
// go until just up to the limit. compression should bring the total on-disk size under
while (totalSize < desiredSize) {
rowKey = maybeIncrement(random, rowKey);
qualifier = maybeIncrement(random, qualifier);
value = maybeIncrement(random, value);
timestamp = maybeIncrement(random, (int) timestamp);
KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier),
timestamp, Bytes.toBytes(value));
hbw.write(keyValue);
totalSize += keyValue.getLength();
}
return totalSize;
}
private static int maybeIncrement(Random random, int value) {
if (random.nextFloat() < CHANCE_TO_REPEAT) {
return value;
}
return value + 1;
}
}