HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Yutong Xiao 2022-02-18 22:24:51 +08:00 committed by GitHub
parent 24124f7485
commit e100198ab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 17 deletions

View File

@ -235,7 +235,8 @@ public class HFileBlock implements Cacheable {
* (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
* formerly known as EXTRA_SERIALIZATION_SPACE). * formerly known as EXTRA_SERIALIZATION_SPACE).
*/ */
static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; public static final int BLOCK_METADATA_SPACE =
Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
/** /**
* Each checksum value is an integer that can be stored in 4 bytes. * Each checksum value is an integer that can be stored in 4 bytes.
@ -1883,8 +1884,7 @@ public class HFileBlock implements Cacheable {
/** /**
* For use by bucketcache. This exposes internals. * For use by bucketcache. This exposes internals.
*/ */
public ByteBuffer getMetaData() { public ByteBuffer getMetaData(ByteBuffer bb) {
ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
bb = addMetaData(bb, true); bb = addMetaData(bb, true);
bb.flip(); bb.flip();
return bb; return bb;

View File

@ -945,6 +945,7 @@ public class BucketCache implements BlockCache, HeapSize {
class WriterThread extends Thread { class WriterThread extends Thread {
private final BlockingQueue<RAMQueueEntry> inputQueue; private final BlockingQueue<RAMQueueEntry> inputQueue;
private volatile boolean writerEnabled = true; private volatile boolean writerEnabled = true;
private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
WriterThread(BlockingQueue<RAMQueueEntry> queue) { WriterThread(BlockingQueue<RAMQueueEntry> queue) {
super("BucketCacheWriterThread"); super("BucketCacheWriterThread");
@ -970,7 +971,7 @@ public class BucketCache implements BlockCache, HeapSize {
break; break;
} }
} }
doDrain(entries); doDrain(entries, metaBuff);
} catch (Exception ioe) { } catch (Exception ioe) {
LOG.error("WriterThread encountered error", ioe); LOG.error("WriterThread encountered error", ioe);
} }
@ -1046,7 +1047,7 @@ public class BucketCache implements BlockCache, HeapSize {
* @param entries Presumes list passed in here will be processed by this invocation only. No * @param entries Presumes list passed in here will be processed by this invocation only. No
* interference expected. * interference expected.
*/ */
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException { void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
if (entries.isEmpty()) { if (entries.isEmpty()) {
return; return;
} }
@ -1074,9 +1075,14 @@ public class BucketCache implements BlockCache, HeapSize {
if (ramCache.containsKey(cacheKey)) { if (ramCache.containsKey(cacheKey)) {
blocksByHFile.add(cacheKey); blocksByHFile.add(cacheKey);
} }
// Reset the position for reuse.
// It should be guaranteed that the data in the metaBuff has been transferred to the
// ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
// transferred with our current IOEngines. Should take care, when we have new kinds of
// IOEngine in the future.
metaBuff.clear();
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
this::createRecycler); this::createRecycler, metaBuff);
// Successfully added. Up index and add bucketEntry. Clear io exceptions. // Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry; bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) { if (ioErrorStartTime > 0) {
@ -1504,8 +1510,8 @@ public class BucketCache implements BlockCache, HeapSize {
} }
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler) final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
throws IOException { ByteBuffer metaBuff) throws IOException {
int len = data.getSerializedLength(); int len = data.getSerializedLength();
// This cacheable thing can't be serialized // This cacheable thing can't be serialized
if (len == 0) { if (len == 0) {
@ -1522,9 +1528,9 @@ public class BucketCache implements BlockCache, HeapSize {
// If an instance of HFileBlock, save on some allocations. // If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock) data; HFileBlock block = (HFileBlock) data;
ByteBuff sliceBuf = block.getBufferReadOnly(); ByteBuff sliceBuf = block.getBufferReadOnly();
ByteBuffer metadata = block.getMetaData(); block.getMetaData(metaBuff);
ioEngine.write(sliceBuf, offset); ioEngine.write(sliceBuf, offset);
ioEngine.write(metadata, offset + len - metadata.limit()); ioEngine.write(metaBuff, offset + len - metaBuff.limit());
} else { } else {
// Only used for testing. // Only used for testing.
ByteBuffer bb = ByteBuffer.allocate(len); ByteBuffer bb = ByteBuffer.allocate(len);

View File

@ -686,7 +686,8 @@ public class TestBucketCache {
Assert.assertEquals(0, allocator.getUsedSize()); Assert.assertEquals(0, allocator.getUsedSize());
try { try {
re.writeToCache(ioEngine, allocator, null, null); re.writeToCache(ioEngine, allocator, null, null,
ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
Assert.fail(); Assert.fail();
} catch (Exception e) { } catch (Exception e) {
} }

View File

@ -627,8 +627,8 @@ public class TestBucketCacheRefCnt {
} }
@Override @Override
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException { void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
super.doDrain(entries); super.doDrain(entries, metaBuff);
if (entries.size() > 0) { if (entries.size() > 0) {
/** /**
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -140,7 +142,7 @@ public class TestBucketWriterThread {
RAMQueueEntry rqe = q.remove(); RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe); RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe); this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q); doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing. // Cache disabled when ioes w/o ever healing.
@ -162,7 +164,8 @@ public class TestBucketWriterThread {
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe). Mockito.doThrow(cfe).
doReturn(mockedBucketEntry). doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
this.q.add(spiedRqe); this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q); doDrainOfOneEntry(bc, wt, q);
} }
@ -171,7 +174,7 @@ public class TestBucketWriterThread {
final BlockingQueue<RAMQueueEntry> q) final BlockingQueue<RAMQueueEntry> q)
throws InterruptedException { throws InterruptedException {
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
bc.doDrain(rqes); bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
assertTrue(q.isEmpty()); assertTrue(q.isEmpty());
assertTrue(bc.ramCache.isEmpty()); assertTrue(bc.ramCache.isEmpty());
assertEquals(0, bc.heapSize()); assertEquals(0, bc.heapSize());