HBASE-22090 The HFileBlock#CacheableDeserializer should pass ByteBuffAllocator to the newly created HFileBlock

This commit is contained in:
huzheng 2019-04-25 19:41:04 +08:00
parent 97476ed2e0
commit a8f8a4a1c9
11 changed files with 93 additions and 102 deletions

View File

@ -36,6 +36,7 @@ import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
@ -271,10 +272,10 @@ public class MemcachedBlockCache implements BlockCache {
public HFileBlock decode(CachedData d) {
try {
ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, true,
return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP,
MemoryType.EXCLUSIVE);
} catch (IOException e) {
LOG.warn("Error deserializing data from memcached",e);
LOG.warn("Failed to deserialize data from memcached", e);
}
return null;
}

View File

@ -19,37 +19,30 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
/**
* Interface for a deserializer. Throws an IOException if the serialized data is
* incomplete or wrong.
* */
* Interface for a deserializer. Throws an IOException if the serialized data is incomplete or
* wrong.
*/
@InterfaceAudience.Private
public interface CacheableDeserializer<T extends Cacheable> {
/**
* Returns the deserialized object.
*
* @return T the deserialized object.
*/
T deserialize(ByteBuff b) throws IOException;
/**
* @param b
* @param reuse true if Cacheable object can use the given buffer as its
* content
* @param b ByteBuff to deserialize the Cacheable.
* @param allocator to manage NIO ByteBuffers for future allocation or de-allocation.
* @param memType the {@link MemoryType} of the buffer
* @return T the deserialized object.
* @throws IOException
*/
T deserialize(ByteBuff b, boolean reuse, MemoryType memType) throws IOException;
T deserialize(ByteBuff b, ByteBuffAllocator allocator, MemoryType memType) throws IOException;
/**
* Get the identifier of this deserialiser. Identifier is unique for each
* deserializer and generated by {@link CacheableDeserializerIdManager}
* Get the identifier of this deserializer. Identifier is unique for each deserializer and
* generated by {@link CacheableDeserializerIdManager}
* @return identifier number of this cacheable deserializer
*/
int getDeserialiserIdentifier();
int getDeserializerIdentifier();
}

View File

@ -264,42 +264,27 @@ public class HFileBlock implements Cacheable {
}
@Override
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc, MemoryType memType)
throws IOException {
// The buf has the file block followed by block metadata.
// Set limit to just before the BLOCK_METADATA_SPACE then rewind.
buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
// Get a new buffer to pass the HFileBlock for it to 'own'.
ByteBuff newByteBuff;
if (reuse) {
newByteBuff = buf.slice();
} else {
int len = buf.limit();
newByteBuff = ByteBuff.wrap(ByteBuffer.allocate(len));
newByteBuff.put(0, buf, buf.position(), len);
}
ByteBuff newByteBuff = buf.slice();
// Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt();
// TODO make the newly created HFileBlock use the off-heap allocator, Need change the
// deserializer or change the deserialize interface.
return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null,
ByteBuffAllocator.HEAP);
alloc);
}
@Override
public int getDeserialiserIdentifier() {
public int getDeserializerIdentifier() {
return DESERIALIZER_IDENTIFIER;
}
@Override
public HFileBlock deserialize(ByteBuff b) throws IOException {
// Used only in tests
return deserialize(b, false, MemoryType.EXCLUSIVE);
}
}
private static final int DESERIALIZER_IDENTIFIER;
@ -563,6 +548,10 @@ public class HFileBlock implements Cacheable {
return dup;
}
public ByteBuffAllocator getByteBuffAllocator() {
return this.allocator;
}
@VisibleForTesting
private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
String fieldName) throws IOException {

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@ -1334,6 +1335,13 @@ public class BucketCache implements BlockCache, HeapSize {
this.accessCounter = accessCounter;
}
private ByteBuffAllocator getByteBuffAllocator() {
if (data instanceof HFileBlock) {
return ((HFileBlock) data).getByteBuffAllocator();
}
return ByteBuffAllocator.HEAP;
}
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize) throws IOException {
int len = data.getSerializedLength();
@ -1345,9 +1353,9 @@ public class BucketCache implements BlockCache, HeapSize {
boolean succ = false;
BucketEntry bucketEntry = null;
try {
bucketEntry =
new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler));
bucketEntry.setDeserialiserReference(data.getDeserializer());
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler),
getByteBuffAllocator());
bucketEntry.setDeserializerReference(data.getDeserializer());
if (data instanceof HFileBlock) {
// If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock) data;

View File

@ -26,6 +26,7 @@ import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
@ -58,7 +59,7 @@ class BucketEntry implements HBaseReferenceCounted {
* The index of the deserializer that can deserialize this BucketEntry content. See
* {@link CacheableDeserializerIdManager} for hosting of index to serializers.
*/
byte deserialiserIndex;
byte deserializerIndex;
private volatile long accessCounter;
private BlockPriority priority;
@ -80,6 +81,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
private final ByteBuffAllocator allocator;
/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
@ -87,16 +89,18 @@ class BucketEntry implements HBaseReferenceCounted {
private final long cachedTime = System.nanoTime();
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
this(offset, length, accessCounter, inMemory, RefCnt.create());
this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP);
}
BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt) {
BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt,
ByteBuffAllocator allocator) {
setOffset(offset);
this.length = length;
this.accessCounter = accessCounter;
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
this.refCnt = refCnt;
this.markedAsEvicted = new AtomicBoolean(false);
this.allocator = allocator;
}
long offset() {
@ -120,11 +124,11 @@ class BucketEntry implements HBaseReferenceCounted {
}
CacheableDeserializer<Cacheable> deserializerReference() {
return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
return CacheableDeserializerIdManager.getDeserializer(deserializerIndex);
}
void setDeserialiserReference(CacheableDeserializer<Cacheable> deserializer) {
this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
void setDeserializerReference(CacheableDeserializer<Cacheable> deserializer) {
this.deserializerIndex = (byte) deserializer.getDeserializerIdentifier();
}
long getAccessCounter() {
@ -192,7 +196,7 @@ class BucketEntry implements HBaseReferenceCounted {
Cacheable wrapAsCacheable(ByteBuffer[] buffers, MemoryType memoryType) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
return this.deserializerReference().deserialize(buf, true, memoryType);
return this.deserializerReference().deserialize(buf, allocator, memoryType);
}
interface BucketEntryHandler<T> {

View File

@ -105,7 +105,7 @@ final class BucketProtoUtils {
return BucketCacheProtos.BucketEntry.newBuilder()
.setOffset(entry.offset())
.setLength(entry.getLength())
.setDeserialiserIndex(entry.deserialiserIndex)
.setDeserialiserIndex(entry.deserializerIndex)
.setAccessCounter(entry.getAccessCounter())
.setPriority(toPB(entry.getPriority()))
.build();
@ -146,8 +146,8 @@ final class BucketProtoUtils {
}
// Convert it to the identifier for the deserializer that we have in this runtime
if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) {
int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserialiserIdentifier();
value.deserialiserIndex = (byte) actualIndex;
int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserializerIdentifier();
value.deserializerIndex = (byte) actualIndex;
} else {
// We could make this more plugable, but right now HFileBlock is the only implementation
// of Cacheable outside of tests, so this might not ever matter.

View File

@ -225,29 +225,22 @@ public class CacheTestUtils {
public static class ByteArrayCacheable implements Cacheable {
static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
new CacheableDeserializer<Cacheable>() {
@Override
public int getDeserializerIdentifier() {
return deserializerIdentifier;
}
@Override
public Cacheable deserialize(ByteBuff b) throws IOException {
int len = b.getInt();
Thread.yield();
byte buf[] = new byte[len];
b.get(buf);
return new ByteArrayCacheable(buf);
}
@Override
public int getDeserialiserIdentifier() {
return deserializerIdentifier;
}
@Override
public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType)
throws IOException {
return deserialize(b);
}
};
@Override
public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc, MemoryType memType)
throws IOException {
int len = b.getInt();
Thread.yield();
byte buf[] = new byte[len];
b.get(buf);
return new ByteArrayCacheable(buf);
}
};
final byte[] buf;

View File

@ -77,18 +77,13 @@ public class TestCacheConfig {
}
@Override
public int getDeserialiserIdentifier() {
public int getDeserializerIdentifier() {
return deserializedIdentifier;
}
@Override
public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType) throws IOException {
LOG.info("Deserialized " + b + ", reuse=" + reuse);
return cacheable;
}
@Override
public Cacheable deserialize(ByteBuff b) throws IOException {
public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc, MemoryType memType)
throws IOException {
LOG.info("Deserialized " + b);
return cacheable;
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
import static org.junit.Assert.*;
@ -120,7 +121,7 @@ public class TestHFileBlock {
this.includesMemstoreTS = includesMemstoreTS;
this.includesTag = includesTag;
this.useHeapAllocator = useHeapAllocator;
this.alloc = useHeapAllocator ? ByteBuffAllocator.HEAP : createOffHeapAlloc();
this.alloc = useHeapAllocator ? HEAP : createOffHeapAlloc();
assertAllocator();
}
@ -524,16 +525,14 @@ public class TestHFileBlock {
for (boolean reuseBuffer : new boolean[] { false, true }) {
ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
blockFromHFile.serialize(serialized, true);
HFileBlock deserialized =
(HFileBlock) blockFromHFile.getDeserializer().deserialize(
new SingleByteBuff(serialized), reuseBuffer, MemoryType.EXCLUSIVE);
assertEquals(
"Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer()
.deserialize(new SingleByteBuff(serialized), HEAP, MemoryType.EXCLUSIVE);
assertEquals("Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
blockFromHFile, deserialized);
// intentional reference comparison
if (blockFromHFile != blockUnpacked) {
assertEquals("Deserializaed block cannot be unpacked correctly.",
blockUnpacked, deserialized.unpack(meta, hbr));
assertEquals("Deserialized block cannot be unpacked correctly.", blockUnpacked,
deserialized.unpack(meta, hbr));
}
}
assertTrue(blockUnpacked.release());
@ -916,7 +915,7 @@ public class TestHFileBlock {
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.withChecksumType(ChecksumType.NULL).build();
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-1, 0, -1, meta, ByteBuffAllocator.HEAP);
-1, 0, -1, meta, HEAP);
long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
new MultiByteBuff(buf).getClass(), true)
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -28,6 +27,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
@ -65,8 +66,12 @@ public class TestBucketCacheRefCnt {
}
private static HFileBlock createBlock(int offset, int size) {
return createBlock(offset, size, ByteBuffAllocator.HEAP);
}
private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.allocate(size),
HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, HEAP);
HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
}
private static BlockCacheKey createKey(String hfileName, long offset) {
@ -133,9 +138,10 @@ public class TestBucketCacheRefCnt {
@Test
public void testBlockInBackingMap() throws Exception {
ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
cache = create(1, 1000);
try {
HFileBlock blk = createBlock(200, 1020);
HFileBlock blk = createBlock(200, 1020, alloc);
BlockCacheKey key = createKey("testHFile-00", 200);
cache.cacheBlock(key, blk);
waitUntilFlushedToCache(key);
@ -144,6 +150,7 @@ public class TestBucketCacheRefCnt {
Cacheable block = cache.getBlock(key, false, false, false);
assertTrue(block.getMemoryType() == MemoryType.SHARED);
assertTrue(block instanceof HFileBlock);
assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
assertEquals(2, block.refCnt());
block.retain();
@ -152,6 +159,7 @@ public class TestBucketCacheRefCnt {
Cacheable newBlock = cache.getBlock(key, false, false, false);
assertTrue(newBlock.getMemoryType() == MemoryType.SHARED);
assertTrue(newBlock instanceof HFileBlock);
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
assertEquals(4, newBlock.refCnt());
// release the newBlock
@ -173,6 +181,7 @@ public class TestBucketCacheRefCnt {
newBlock = cache.getBlock(key, false, false, false);
assertEquals(2, block.refCnt());
assertEquals(2, newBlock.refCnt());
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
// Release the block
assertFalse(block.release());
@ -188,17 +197,20 @@ public class TestBucketCacheRefCnt {
@Test
public void testInBucketCache() throws IOException {
ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true);
cache = create(1, 1000);
try {
HFileBlock blk = createBlock(200, 1020);
HFileBlock blk = createBlock(200, 1020, alloc);
BlockCacheKey key = createKey("testHFile-00", 200);
cache.cacheBlock(key, blk);
assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
Cacheable block1 = cache.getBlock(key, false, false, false);
assertTrue(block1.refCnt() >= 2);
assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc);
Cacheable block2 = cache.getBlock(key, false, false, false);
assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc);
assertTrue(block2.refCnt() >= 3);
cache.evictBlock(key);
@ -209,6 +221,7 @@ public class TestBucketCacheRefCnt {
// Get key again
Cacheable block3 = cache.getBlock(key, false, false, false);
if (block3 != null) {
assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc);
assertTrue(block3.refCnt() >= 3);
assertFalse(block3.release());
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
@ -67,7 +68,7 @@ public class TestByteBufferIOEngine {
static BucketEntry createBucketEntry(long offset, int len) {
BucketEntry be = new MockBucketEntry(offset, len);
be.setDeserialiserReference(DESERIALIZER);
be.setDeserializerReference(DESERIALIZER);
return be;
}
@ -126,12 +127,7 @@ public class TestByteBufferIOEngine {
private int identifier;
@Override
public Cacheable deserialize(ByteBuff b) throws IOException {
return null;
}
@Override
public Cacheable deserialize(final ByteBuff b, boolean reuse, MemoryType memType)
public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc, MemoryType memType)
throws IOException {
this.buf = b;
return null;
@ -142,7 +138,7 @@ public class TestByteBufferIOEngine {
}
@Override
public int getDeserialiserIdentifier() {
public int getDeserializerIdentifier() {
return identifier;
}
}