HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
fec317560f
commit
e3963458b1
|
@ -323,11 +323,20 @@ public class ByteBuffAllocator {
|
|||
// just allocate the ByteBuffer from on-heap.
|
||||
bbs.add(allocateOnHeap(remain));
|
||||
}
|
||||
ByteBuff bb = ByteBuff.wrap(bbs, () -> {
|
||||
|
||||
ByteBuff bb;
|
||||
// we only need a recycler if we successfully pulled from the pool
|
||||
// this matters for determining whether to add leak detection in RefCnt
|
||||
if (lenFromReservoir == 0) {
|
||||
bb = ByteBuff.wrap(bbs);
|
||||
} else {
|
||||
bb = ByteBuff.wrap(bbs, () -> {
|
||||
for (int i = 0; i < lenFromReservoir; i++) {
|
||||
this.putbackBuffer(bbs.get(i));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bb.limit(size);
|
||||
return bb;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.nio;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
@ -547,6 +548,28 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
|
|||
return wrap(buffer, RefCnt.create());
|
||||
}
|
||||
|
||||
/**
|
||||
* Calling this method in strategic locations where ByteBuffs are referenced may help diagnose
|
||||
* potential buffer leaks. We pass the buffer itself as a default hint, but one can use
|
||||
* {@link #touch(Object)} to pass their own hint as well.
|
||||
*/
|
||||
@Override
|
||||
public ByteBuff touch() {
|
||||
return touch(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuff touch(Object hint) {
|
||||
refCnt.touch(hint);
|
||||
return this;
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
public RefCnt getRefCnt() {
|
||||
return refCnt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make this private because we don't want to expose the refCnt related wrap method to upstream.
|
||||
*/
|
||||
|
|
|
@ -17,12 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.nio;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetectorFactory;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakTracker;
|
||||
|
||||
/**
|
||||
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
|
||||
|
@ -31,7 +35,10 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
|
|||
@InterfaceAudience.Private
|
||||
public class RefCnt extends AbstractReferenceCounted {
|
||||
|
||||
private Recycler recycler = ByteBuffAllocator.NONE;
|
||||
private static final ResourceLeakDetector<RefCnt> detector =
|
||||
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(RefCnt.class);
|
||||
private final Recycler recycler;
|
||||
private final ResourceLeakTracker<RefCnt> leak;
|
||||
|
||||
/**
|
||||
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
|
||||
|
@ -49,15 +56,66 @@ public class RefCnt extends AbstractReferenceCounted {
|
|||
|
||||
public RefCnt(Recycler recycler) {
|
||||
this.recycler = recycler;
|
||||
this.leak = recycler == ByteBuffAllocator.NONE ? null : detector.track(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain() {
|
||||
maybeRecord();
|
||||
return super.retain();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain(int increment) {
|
||||
maybeRecord();
|
||||
return super.retain(increment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() {
|
||||
maybeRecord();
|
||||
return super.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(int decrement) {
|
||||
maybeRecord();
|
||||
return super.release(decrement);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void deallocate() {
|
||||
this.recycler.free();
|
||||
if (leak != null) {
|
||||
this.leak.close(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefCnt touch() {
|
||||
maybeRecord();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ReferenceCounted touch(Object hint) {
|
||||
throw new UnsupportedOperationException();
|
||||
maybeRecord(hint);
|
||||
return this;
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
public Recycler getRecycler() {
|
||||
return recycler;
|
||||
}
|
||||
|
||||
private void maybeRecord() {
|
||||
maybeRecord(null);
|
||||
}
|
||||
|
||||
private void maybeRecord(Object hint) {
|
||||
if (leak != null) {
|
||||
leak.record(hint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
|
|||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.getHeapAllocationRatio;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -45,6 +46,21 @@ public class TestByteBuffAllocator {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
|
||||
|
||||
@Test
|
||||
public void testRecycleOnlyPooledBuffers() {
|
||||
int maxBuffersInPool = 10;
|
||||
int bufSize = 1024;
|
||||
int minSize = bufSize / 8;
|
||||
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
|
||||
|
||||
ByteBuff buff = alloc.allocate(minSize - 1);
|
||||
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
|
||||
|
||||
alloc = new ByteBuffAllocator(true, 0, bufSize, minSize);
|
||||
buff = alloc.allocate(minSize * 2);
|
||||
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateByteBuffToReadInto() {
|
||||
int maxBuffersInPool = 10;
|
||||
|
@ -329,8 +345,6 @@ public class TestByteBuffAllocator {
|
|||
ByteBuff buf = alloc.allocate(bufSize);
|
||||
assertException(() -> buf.retain(2));
|
||||
assertException(() -> buf.release(2));
|
||||
assertException(() -> buf.touch());
|
||||
assertException(() -> buf.touch(new Object()));
|
||||
}
|
||||
|
||||
private void assertException(Runnable r) {
|
||||
|
|
|
@ -0,0 +1,341 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory;
|
||||
|
||||
@Category({ RPCTests.class, SmallTests.class })
|
||||
public class TestByteBuffAllocatorLeakDetection {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestByteBuffAllocatorLeakDetection.class);
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Test
|
||||
public void testLeakDetection() throws InterruptedException {
|
||||
InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory();
|
||||
AtomicInteger leaksDetected = new AtomicInteger();
|
||||
InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected));
|
||||
|
||||
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
|
||||
assertTrue(ResourceLeakDetector.isEnabled());
|
||||
|
||||
try {
|
||||
int maxBuffersInPool = 10;
|
||||
int bufSize = 1024;
|
||||
int minSize = bufSize / 8;
|
||||
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
|
||||
|
||||
// tracking leaks happens on creation of a RefCnt, through a call to detector.track().
|
||||
// If a leak occurs, but detector.track() is never called again, the leak will not be
|
||||
// realized. Further, causing a leak requires a GC event. So below we do some allocations,
|
||||
// cause some GC's, do more allocations, and then expect a leak to show up.
|
||||
|
||||
// first allocate on-heap. we expect to not see a leak from this, because we
|
||||
// dont count on-heap references
|
||||
alloc.allocate(minSize - 1);
|
||||
System.gc();
|
||||
Thread.sleep(1000);
|
||||
|
||||
// cause an allocation to trigger a leak detection, if there were one.
|
||||
// keep a reference so we don't trigger a leak right away from this.
|
||||
ByteBuff reference = alloc.allocate(minSize * 2);
|
||||
assertEquals(0, leaksDetected.get());
|
||||
|
||||
// allocate, but don't keep a reference. this should cause a leak
|
||||
alloc.allocate(minSize * 2);
|
||||
System.gc();
|
||||
Thread.sleep(1000);
|
||||
|
||||
// allocate again, this should cause the above leak to be detected
|
||||
alloc.allocate(minSize * 2);
|
||||
assertEquals(1, leaksDetected.get());
|
||||
} finally {
|
||||
InternalLoggerFactory.setDefaultFactory(original);
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockedLoggerFactory extends Slf4JLoggerFactory {
|
||||
|
||||
private AtomicInteger leaksDetected;
|
||||
|
||||
public MockedLoggerFactory(AtomicInteger leaksDetected) {
|
||||
this.leaksDetected = leaksDetected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalLogger newInstance(String name) {
|
||||
InternalLogger delegate = super.newInstance(name);
|
||||
return new MockedLogger(leaksDetected, delegate);
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockedLogger implements InternalLogger {
|
||||
|
||||
private AtomicInteger leaksDetected;
|
||||
private InternalLogger delegate;
|
||||
|
||||
public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) {
|
||||
this.leaksDetected = leaksDetected;
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
private void maybeCountLeak(String msgOrFormat) {
|
||||
if (msgOrFormat.startsWith("LEAK")) {
|
||||
leaksDetected.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String msg) {
|
||||
maybeCountLeak(msg);
|
||||
delegate.error(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String format, Object arg) {
|
||||
maybeCountLeak(format);
|
||||
delegate.error(format, arg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String format, Object argA, Object argB) {
|
||||
maybeCountLeak(format);
|
||||
delegate.error(format, argA, argB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String format, Object... arguments) {
|
||||
maybeCountLeak(format);
|
||||
delegate.error(format, arguments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String msg, Throwable t) {
|
||||
maybeCountLeak(msg);
|
||||
delegate.error(msg, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(Throwable t) {
|
||||
delegate.error(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return delegate.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTraceEnabled() {
|
||||
return delegate.isTraceEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trace(String msg) {
|
||||
delegate.trace(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trace(String format, Object arg) {
|
||||
delegate.trace(format, arg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trace(String format, Object argA, Object argB) {
|
||||
delegate.trace(format, argA, argB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trace(String format, Object... arguments) {
|
||||
delegate.trace(format, arguments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trace(String msg, Throwable t) {
|
||||
delegate.trace(msg, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trace(Throwable t) {
|
||||
delegate.trace(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDebugEnabled() {
|
||||
return delegate.isDebugEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(String msg) {
|
||||
delegate.debug(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(String format, Object arg) {
|
||||
delegate.debug(format, arg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(String format, Object argA, Object argB) {
|
||||
delegate.debug(format, argA, argB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(String format, Object... arguments) {
|
||||
delegate.debug(format, arguments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(String msg, Throwable t) {
|
||||
delegate.debug(msg, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(Throwable t) {
|
||||
delegate.debug(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInfoEnabled() {
|
||||
return delegate.isInfoEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String msg) {
|
||||
delegate.info(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String format, Object arg) {
|
||||
delegate.info(format, arg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String format, Object argA, Object argB) {
|
||||
delegate.info(format, argA, argB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String format, Object... arguments) {
|
||||
delegate.info(format, arguments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String msg, Throwable t) {
|
||||
delegate.info(msg, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(Throwable t) {
|
||||
delegate.info(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWarnEnabled() {
|
||||
return delegate.isWarnEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String msg) {
|
||||
delegate.warn(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String format, Object arg) {
|
||||
delegate.warn(format, arg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String format, Object... arguments) {
|
||||
delegate.warn(format, arguments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String format, Object argA, Object argB) {
|
||||
delegate.warn(format, argA, argB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String msg, Throwable t) {
|
||||
delegate.warn(msg, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(Throwable t) {
|
||||
delegate.warn(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isErrorEnabled() {
|
||||
return delegate.isErrorEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled(InternalLogLevel level) {
|
||||
return delegate.isEnabled(level);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(InternalLogLevel level, String msg) {
|
||||
delegate.log(level, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(InternalLogLevel level, String format, Object arg) {
|
||||
delegate.log(level, format, arg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(InternalLogLevel level, String format, Object argA, Object argB) {
|
||||
delegate.log(level, format, argA, argB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(InternalLogLevel level, String format, Object... arguments) {
|
||||
delegate.log(level, format, arguments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(InternalLogLevel level, String msg, Throwable t) {
|
||||
delegate.log(level, msg, t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(InternalLogLevel level, Throwable t) {
|
||||
delegate.log(level, t);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -418,6 +418,22 @@ public class HFileBlock implements Cacheable {
|
|||
return buf.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
|
||||
* potential buffer leaks. We pass the block itself as a default hint, but one can use
|
||||
* {@link #touch(Object)} to pass their own hint as well.
|
||||
*/
|
||||
@Override
|
||||
public HFileBlock touch() {
|
||||
return touch(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock touch(Object hint) {
|
||||
buf.touch(hint);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** @return get data block encoding id that was used to encode this block */
|
||||
short getDataBlockEncodingId() {
|
||||
if (blockType != BlockType.ENCODED_DATA) {
|
||||
|
@ -616,8 +632,9 @@ public class HFileBlock implements Cacheable {
|
|||
return this;
|
||||
}
|
||||
|
||||
HFileBlock unpacked = shallowClone(this);
|
||||
unpacked.allocateBuffer(); // allocates space for the decompressed block
|
||||
ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
|
||||
HFileBlock unpacked = shallowClone(this, newBuf);
|
||||
|
||||
boolean succ = false;
|
||||
try {
|
||||
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
|
||||
|
@ -643,20 +660,21 @@ public class HFileBlock implements Cacheable {
|
|||
* Always allocates a new buffer of the correct size. Copies header bytes from the existing
|
||||
* buffer. Does not change header fields. Reserve room to keep checksum bytes too.
|
||||
*/
|
||||
private void allocateBuffer() {
|
||||
private ByteBuff allocateBufferForUnpacking() {
|
||||
int cksumBytes = totalChecksumBytes();
|
||||
int headerSize = headerSize();
|
||||
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
|
||||
|
||||
ByteBuff source = buf.duplicate();
|
||||
ByteBuff newBuf = allocator.allocate(capacityNeeded);
|
||||
|
||||
// Copy header bytes into newBuf.
|
||||
buf.position(0);
|
||||
newBuf.put(0, buf, 0, headerSize);
|
||||
source.position(0);
|
||||
newBuf.put(0, source, 0, headerSize);
|
||||
|
||||
buf = newBuf;
|
||||
// set limit to exclude next block's header
|
||||
buf.limit(capacityNeeded);
|
||||
newBuf.limit(capacityNeeded);
|
||||
return newBuf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -708,11 +726,6 @@ public class HFileBlock implements Cacheable {
|
|||
* by default.
|
||||
*/
|
||||
public boolean isSharedMem() {
|
||||
if (this instanceof SharedMemHFileBlock) {
|
||||
return true;
|
||||
} else if (this instanceof ExclusiveMemHFileBlock) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1997,23 +2010,31 @@ public class HFileBlock implements Cacheable {
|
|||
+ onDiskDataSizeWithHeader;
|
||||
}
|
||||
|
||||
private static HFileBlockBuilder createBuilder(HFileBlock blk) {
|
||||
/**
|
||||
* Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
|
||||
* loaded with all of the original fields from blk, except now using the newBuff and setting
|
||||
* isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
|
||||
* an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
|
||||
* {@link SharedMemHFileBlock}. Or vice versa.
|
||||
* @param blk the block to clone from
|
||||
* @param newBuff the new buffer to use
|
||||
*/
|
||||
private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
|
||||
return new HFileBlockBuilder().withBlockType(blk.blockType)
|
||||
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
|
||||
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
|
||||
.withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the
|
||||
// buffer.
|
||||
.withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
|
||||
.withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
|
||||
.withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
|
||||
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
|
||||
.withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem());
|
||||
.withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
|
||||
}
|
||||
|
||||
static HFileBlock shallowClone(HFileBlock blk) {
|
||||
return createBuilder(blk).build();
|
||||
private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
|
||||
return createBuilder(blk, newBuf).build();
|
||||
}
|
||||
|
||||
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
|
||||
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
|
||||
return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
|
||||
return createBuilder(blk, deepCloned).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({ IOTests.class, MediumTests.class })
|
||||
public class TestHFileBlockUnpack {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileBlockUnpack.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
// repetition gives us some chance to get a good compression ratio
|
||||
private static float CHANCE_TO_REPEAT = 0.6f;
|
||||
|
||||
private static final int MIN_ALLOCATION_SIZE = 10 * 1024;
|
||||
|
||||
ByteBuffAllocator allocator;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
private FileSystem fs;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
|
||||
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
|
||||
allocator = ByteBuffAllocator.create(conf, true);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that
|
||||
* block will be allocated to heap regardless of desire for off-heap. After de-compressing the
|
||||
* block, the new size may now exceed the min allocation size. This test ensures that those
|
||||
* de-compressed blocks, which will be allocated off-heap, are properly marked as
|
||||
* {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170
|
||||
*/
|
||||
@Test
|
||||
public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
HFileContext meta =
|
||||
new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
|
||||
.withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
|
||||
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
|
||||
int totalSize;
|
||||
try (FSDataOutputStream os = fs.create(path)) {
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta);
|
||||
hbw.startWriting(BlockType.DATA);
|
||||
writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
|
||||
hbw.writeHeaderAndData(os);
|
||||
totalSize = hbw.getOnDiskSizeWithHeader();
|
||||
assertTrue(
|
||||
"expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
|
||||
totalSize < MIN_ALLOCATION_SIZE);
|
||||
}
|
||||
|
||||
try (FSDataInputStream is = fs.open(path)) {
|
||||
meta =
|
||||
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
|
||||
.withIncludesMvcc(false).withIncludesTags(false).build();
|
||||
ReaderContext context =
|
||||
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
|
||||
.withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
|
||||
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf);
|
||||
hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf);
|
||||
hbr.setIncludesMemStoreTS(false);
|
||||
HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
|
||||
blockFromHFile.sanityCheck();
|
||||
assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked());
|
||||
assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem());
|
||||
|
||||
assertTrue(
|
||||
"expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader()
|
||||
+ " to be less than " + MIN_ALLOCATION_SIZE,
|
||||
blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
|
||||
assertTrue(
|
||||
"expected generated block uncompressed size "
|
||||
+ blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than "
|
||||
+ MIN_ALLOCATION_SIZE,
|
||||
blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
|
||||
|
||||
HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr);
|
||||
assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked());
|
||||
assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem());
|
||||
}
|
||||
}
|
||||
|
||||
static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
|
||||
Random random = new Random(42);
|
||||
|
||||
byte[] family = new byte[] { 1 };
|
||||
int rowKey = 0;
|
||||
int qualifier = 0;
|
||||
int value = 0;
|
||||
long timestamp = 0;
|
||||
|
||||
int totalSize = 0;
|
||||
|
||||
// go until just up to the limit. compression should bring the total on-disk size under
|
||||
while (totalSize < desiredSize) {
|
||||
rowKey = maybeIncrement(random, rowKey);
|
||||
qualifier = maybeIncrement(random, qualifier);
|
||||
value = maybeIncrement(random, value);
|
||||
timestamp = maybeIncrement(random, (int) timestamp);
|
||||
|
||||
KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier),
|
||||
timestamp, Bytes.toBytes(value));
|
||||
hbw.write(keyValue);
|
||||
totalSize += keyValue.getLength();
|
||||
}
|
||||
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
private static int maybeIncrement(Random random, int value) {
|
||||
if (random.nextFloat() < CHANCE_TO_REPEAT) {
|
||||
return value;
|
||||
}
|
||||
return value + 1;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue