HBASE-22185 RAMQueueEntry#writeToCache should freeBlock if any exception encountered instead of the IOException catch block
This commit is contained in:
parent
387a5da462
commit
494a8efe5f
|
@ -1531,29 +1531,36 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
this.accessCounter = accessCounter;
|
||||
}
|
||||
|
||||
public BucketEntry writeToCache(final IOEngine ioEngine,
|
||||
final BucketAllocator bucketAllocator,
|
||||
final LongAdder realCacheSize) throws CacheFullException, IOException,
|
||||
BucketAllocatorException {
|
||||
private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) {
|
||||
if (ioEngine.usesSharedMemory()) {
|
||||
if (UnsafeAvailChecker.isAvailable()) {
|
||||
return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
|
||||
} else {
|
||||
return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
|
||||
}
|
||||
} else {
|
||||
return new BucketEntry(offset, len, accessCounter, inMemory);
|
||||
}
|
||||
}
|
||||
|
||||
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
|
||||
final LongAdder realCacheSize) throws IOException {
|
||||
int len = data.getSerializedLength();
|
||||
// This cacheable thing can't be serialized
|
||||
if (len == 0) return null;
|
||||
if (len == 0) {
|
||||
return null;
|
||||
}
|
||||
long offset = bucketAllocator.allocateBlock(len);
|
||||
BucketEntry bucketEntry = ioEngine.usesSharedMemory()
|
||||
? UnsafeAvailChecker.isAvailable()
|
||||
? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
||||
: new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
||||
: new BucketEntry(offset, len, accessCounter, inMemory);
|
||||
bucketEntry.setDeserialiserReference(data.getDeserializer());
|
||||
boolean succ = false;
|
||||
BucketEntry bucketEntry;
|
||||
try {
|
||||
bucketEntry = getBucketEntry(ioEngine, offset, len);
|
||||
bucketEntry.setDeserialiserReference(data.getDeserializer());
|
||||
if (data instanceof HFileBlock) {
|
||||
// If an instance of HFileBlock, save on some allocations.
|
||||
HFileBlock block = (HFileBlock)data;
|
||||
HFileBlock block = (HFileBlock) data;
|
||||
ByteBuff sliceBuf = block.getBufferReadOnly();
|
||||
ByteBuffer metadata = block.getMetaData();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Write offset=" + offset + ", len=" + len);
|
||||
}
|
||||
ioEngine.write(sliceBuf, offset);
|
||||
ioEngine.write(metadata, offset + len - metadata.limit());
|
||||
} else {
|
||||
|
@ -1561,12 +1568,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
data.serialize(bb, true);
|
||||
ioEngine.write(bb, offset);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// free it in bucket allocator
|
||||
bucketAllocator.freeBlock(offset);
|
||||
throw ioe;
|
||||
succ = true;
|
||||
} finally {
|
||||
if (!succ) {
|
||||
bucketAllocator.freeBlock(offset);
|
||||
}
|
||||
}
|
||||
|
||||
realCacheSize.add(len);
|
||||
return bucketEntry;
|
||||
}
|
||||
|
|
|
@ -50,15 +50,19 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
|
||||
|
@ -460,4 +464,39 @@ public class TestBucketCache {
|
|||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
|
||||
block1Buffer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
|
||||
// initialize an block.
|
||||
int size = 100, offset = 20;
|
||||
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
|
||||
ByteBuffer buf = ByteBuffer.allocate(length);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
offset, 52, -1, meta);
|
||||
|
||||
// initialize an mocked ioengine.
|
||||
IOEngine ioEngine = Mockito.mock(IOEngine.class);
|
||||
Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false);
|
||||
// Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
|
||||
Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
|
||||
Mockito.anyLong());
|
||||
Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
|
||||
Mockito.anyLong());
|
||||
|
||||
// create an bucket allocator.
|
||||
long availableSpace = 1024 * 1024 * 1024L;
|
||||
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
|
||||
|
||||
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
|
||||
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
|
||||
|
||||
Assert.assertEquals(0, allocator.getUsedSize());
|
||||
try {
|
||||
re.writeToCache(ioEngine, allocator, null);
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
Assert.assertEquals(0, allocator.getUsedSize());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue