HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
af338d2c99
commit
932bf591b3
|
@ -235,7 +235,8 @@ public class HFileBlock implements Cacheable {
|
|||
* (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
|
||||
* formerly known as EXTRA_SERIALIZATION_SPACE).
|
||||
*/
|
||||
static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
|
||||
public static final int BLOCK_METADATA_SPACE =
|
||||
Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
|
||||
|
||||
/**
|
||||
* Each checksum value is an integer that can be stored in 4 bytes.
|
||||
|
@ -1884,8 +1885,7 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* For use by bucketcache. This exposes internals.
|
||||
*/
|
||||
public ByteBuffer getMetaData() {
|
||||
ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
|
||||
public ByteBuffer getMetaData(ByteBuffer bb) {
|
||||
bb = addMetaData(bb, true);
|
||||
bb.flip();
|
||||
return bb;
|
||||
|
|
|
@ -934,6 +934,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
class WriterThread extends Thread {
|
||||
private final BlockingQueue<RAMQueueEntry> inputQueue;
|
||||
private volatile boolean writerEnabled = true;
|
||||
private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
|
||||
|
||||
WriterThread(BlockingQueue<RAMQueueEntry> queue) {
|
||||
super("BucketCacheWriterThread");
|
||||
|
@ -959,7 +960,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
break;
|
||||
}
|
||||
}
|
||||
doDrain(entries);
|
||||
doDrain(entries, metaBuff);
|
||||
} catch (Exception ioe) {
|
||||
LOG.error("WriterThread encountered error", ioe);
|
||||
}
|
||||
|
@ -1035,7 +1036,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
* @param entries Presumes list passed in here will be processed by this invocation only. No
|
||||
* interference expected.
|
||||
*/
|
||||
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
|
||||
void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
|
||||
if (entries.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -1063,9 +1064,14 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
if (ramCache.containsKey(cacheKey)) {
|
||||
blocksByHFile.add(cacheKey);
|
||||
}
|
||||
|
||||
// Reset the position for reuse.
|
||||
// It should be guaranteed that the data in the metaBuff has been transferred to the
|
||||
// ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
|
||||
// transferred with our current IOEngines. Should take care, when we have new kinds of
|
||||
// IOEngine in the future.
|
||||
metaBuff.clear();
|
||||
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
|
||||
this::createRecycler);
|
||||
this::createRecycler, metaBuff);
|
||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||
bucketEntries[index] = bucketEntry;
|
||||
if (ioErrorStartTime > 0) {
|
||||
|
@ -1489,8 +1495,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
|
||||
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
|
||||
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
|
||||
throws IOException {
|
||||
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
|
||||
ByteBuffer metaBuff) throws IOException {
|
||||
int len = data.getSerializedLength();
|
||||
// This cacheable thing can't be serialized
|
||||
if (len == 0) {
|
||||
|
@ -1507,9 +1513,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
// If an instance of HFileBlock, save on some allocations.
|
||||
HFileBlock block = (HFileBlock) data;
|
||||
ByteBuff sliceBuf = block.getBufferReadOnly();
|
||||
ByteBuffer metadata = block.getMetaData();
|
||||
block.getMetaData(metaBuff);
|
||||
ioEngine.write(sliceBuf, offset);
|
||||
ioEngine.write(metadata, offset + len - metadata.limit());
|
||||
ioEngine.write(metaBuff, offset + len - metaBuff.limit());
|
||||
} else {
|
||||
// Only used for testing.
|
||||
ByteBuffer bb = ByteBuffer.allocate(len);
|
||||
|
|
|
@ -590,7 +590,8 @@ public class TestBucketCache {
|
|||
|
||||
Assert.assertEquals(0, allocator.getUsedSize());
|
||||
try {
|
||||
re.writeToCache(ioEngine, allocator, null, null);
|
||||
re.writeToCache(ioEngine, allocator, null, null,
|
||||
ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
|
|
@ -627,8 +627,8 @@ public class TestBucketCacheRefCnt {
|
|||
}
|
||||
|
||||
@Override
|
||||
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
|
||||
super.doDrain(entries);
|
||||
void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
|
||||
super.doDrain(entries, metaBuff);
|
||||
if (entries.size() > 0) {
|
||||
/**
|
||||
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -140,7 +142,7 @@ public class TestBucketWriterThread {
|
|||
RAMQueueEntry rqe = q.remove();
|
||||
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
|
||||
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||
this.q.add(spiedRqe);
|
||||
doDrainOfOneEntry(bc, wt, q);
|
||||
// Cache disabled when ioes w/o ever healing.
|
||||
|
@ -162,7 +164,8 @@ public class TestBucketWriterThread {
|
|||
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
|
||||
Mockito.doThrow(cfe).
|
||||
doReturn(mockedBucketEntry).
|
||||
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
|
||||
Mockito.any());
|
||||
this.q.add(spiedRqe);
|
||||
doDrainOfOneEntry(bc, wt, q);
|
||||
}
|
||||
|
@ -171,7 +174,7 @@ public class TestBucketWriterThread {
|
|||
final BlockingQueue<RAMQueueEntry> q)
|
||||
throws InterruptedException {
|
||||
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
|
||||
bc.doDrain(rqes);
|
||||
bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
|
||||
assertTrue(q.isEmpty());
|
||||
assertTrue(bc.ramCache.isEmpty());
|
||||
assertEquals(0, bc.heapSize());
|
||||
|
|
Loading…
Reference in New Issue