From 4ceffc83febbd76c8da3aa86314f8004200ab77a Mon Sep 17 00:00:00 2001 From: huzheng Date: Mon, 8 Apr 2019 23:15:46 +0800 Subject: [PATCH] HBASE-22185 RAMQueueEntry#writeToCache should freeBlock if any exception encountered instead of the IOException catch block --- .../hbase/io/hfile/bucket/BucketCache.java | 49 +++++++++++-------- .../io/hfile/bucket/TestBucketCache.java | 39 +++++++++++++++ 2 files changed, 67 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 333b7ef7998..f52d8af3bf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1502,30 +1502,37 @@ public class BucketCache implements BlockCache, HeapSize { this.accessCounter = accessCounter; } - public BucketEntry writeToCache(final IOEngine ioEngine, - final BucketAllocator bucketAllocator, - final UniqueIndexMap deserialiserMap, - final LongAdder realCacheSize) throws CacheFullException, IOException, - BucketAllocatorException { + private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) { + if (ioEngine.usesSharedMemory()) { + if (UnsafeAvailChecker.isAvailable()) { + return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory); + } else { + return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory); + } + } else { + return new BucketEntry(offset, len, accessCounter, inMemory); + } + } + + public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, + final UniqueIndexMap deserialiserMap, final LongAdder realCacheSize) + throws IOException { int len = data.getSerializedLength(); // This cacheable thing can't be serialized - if (len == 0) return null; + if (len == 0) { + return null; + } long offset = bucketAllocator.allocateBlock(len); - BucketEntry bucketEntry = ioEngine.usesSharedMemory() - ? UnsafeAvailChecker.isAvailable() - ? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory) - : new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory) - : new BucketEntry(offset, len, accessCounter, inMemory); - bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); + boolean succ = false; + BucketEntry bucketEntry; try { + bucketEntry = getBucketEntry(ioEngine, offset, len); + bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); if (data instanceof HFileBlock) { // If an instance of HFileBlock, save on some allocations. - HFileBlock block = (HFileBlock)data; + HFileBlock block = (HFileBlock) data; ByteBuff sliceBuf = block.getBufferReadOnly(); ByteBuffer metadata = block.getMetaData(); - if (LOG.isTraceEnabled()) { - LOG.trace("Write offset=" + offset + ", len=" + len); - } ioEngine.write(sliceBuf, offset); ioEngine.write(metadata, offset + len - metadata.limit()); } else { @@ -1533,12 +1540,12 @@ public class BucketCache implements BlockCache, HeapSize { data.serialize(bb, true); ioEngine.write(bb, offset); } - } catch (IOException ioe) { - // free it in bucket allocator - bucketAllocator.freeBlock(offset); - throw ioe; + succ = true; + } finally { + if (!succ) { + bucketAllocator.freeBlock(offset); + } } - realCacheSize.add(len); return bucketEntry; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 924dd027688..5363d702ca0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -48,15 +48,19 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -454,4 +458,39 @@ public class TestBucketCache { CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, block1Buffer); } + + @Test + public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { + // initialize an block. + int size = 100, offset = 20; + int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; + ByteBuffer buf = ByteBuffer.allocate(length); + HFileContext meta = new HFileContextBuilder().build(); + HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER, + offset, 52, -1, meta); + + // initialize an mocked ioengine. + IOEngine ioEngine = Mockito.mock(IOEngine.class); + Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false); + // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); + Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), + Mockito.anyLong()); + Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), + Mockito.anyLong()); + + // create an bucket allocator. + long availableSpace = 1024 * 1024 * 1024L; + BucketAllocator allocator = new BucketAllocator(availableSpace, null); + + BlockCacheKey key = new BlockCacheKey("dummy", 1L); + RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true); + + Assert.assertEquals(0, allocator.getUsedSize()); + try { + re.writeToCache(ioEngine, allocator, new UniqueIndexMap<>(), null); + Assert.fail(); + } catch (Exception e) { + } + Assert.assertEquals(0, allocator.getUsedSize()); + } }