HBASE-22185 RAMQueueEntry#writeToCache should freeBlock if any exception encountered instead of the IOException catch block
This commit is contained in:
parent
3feac73e02
commit
4ceffc83fe
|
@ -1502,30 +1502,37 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
this.accessCounter = accessCounter;
|
this.accessCounter = accessCounter;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BucketEntry writeToCache(final IOEngine ioEngine,
|
private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) {
|
||||||
final BucketAllocator bucketAllocator,
|
if (ioEngine.usesSharedMemory()) {
|
||||||
final UniqueIndexMap<Integer> deserialiserMap,
|
if (UnsafeAvailChecker.isAvailable()) {
|
||||||
final LongAdder realCacheSize) throws CacheFullException, IOException,
|
return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
|
||||||
BucketAllocatorException {
|
} else {
|
||||||
|
return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return new BucketEntry(offset, len, accessCounter, inMemory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
|
||||||
|
final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize)
|
||||||
|
throws IOException {
|
||||||
int len = data.getSerializedLength();
|
int len = data.getSerializedLength();
|
||||||
// This cacheable thing can't be serialized
|
// This cacheable thing can't be serialized
|
||||||
if (len == 0) return null;
|
if (len == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
long offset = bucketAllocator.allocateBlock(len);
|
long offset = bucketAllocator.allocateBlock(len);
|
||||||
BucketEntry bucketEntry = ioEngine.usesSharedMemory()
|
boolean succ = false;
|
||||||
? UnsafeAvailChecker.isAvailable()
|
BucketEntry bucketEntry;
|
||||||
? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
|
||||||
: new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
|
||||||
: new BucketEntry(offset, len, accessCounter, inMemory);
|
|
||||||
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
|
||||||
try {
|
try {
|
||||||
|
bucketEntry = getBucketEntry(ioEngine, offset, len);
|
||||||
|
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
||||||
if (data instanceof HFileBlock) {
|
if (data instanceof HFileBlock) {
|
||||||
// If an instance of HFileBlock, save on some allocations.
|
// If an instance of HFileBlock, save on some allocations.
|
||||||
HFileBlock block = (HFileBlock)data;
|
HFileBlock block = (HFileBlock) data;
|
||||||
ByteBuff sliceBuf = block.getBufferReadOnly();
|
ByteBuff sliceBuf = block.getBufferReadOnly();
|
||||||
ByteBuffer metadata = block.getMetaData();
|
ByteBuffer metadata = block.getMetaData();
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Write offset=" + offset + ", len=" + len);
|
|
||||||
}
|
|
||||||
ioEngine.write(sliceBuf, offset);
|
ioEngine.write(sliceBuf, offset);
|
||||||
ioEngine.write(metadata, offset + len - metadata.limit());
|
ioEngine.write(metadata, offset + len - metadata.limit());
|
||||||
} else {
|
} else {
|
||||||
|
@ -1533,12 +1540,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
data.serialize(bb, true);
|
data.serialize(bb, true);
|
||||||
ioEngine.write(bb, offset);
|
ioEngine.write(bb, offset);
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
succ = true;
|
||||||
// free it in bucket allocator
|
} finally {
|
||||||
bucketAllocator.freeBlock(offset);
|
if (!succ) {
|
||||||
throw ioe;
|
bucketAllocator.freeBlock(offset);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
realCacheSize.add(len);
|
realCacheSize.add(len);
|
||||||
return bucketEntry;
|
return bucketEntry;
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,15 +48,19 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||||
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
|
@ -454,4 +458,39 @@ public class TestBucketCache {
|
||||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
|
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
|
||||||
block1Buffer);
|
block1Buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
|
||||||
|
// initialize an block.
|
||||||
|
int size = 100, offset = 20;
|
||||||
|
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
|
||||||
|
ByteBuffer buf = ByteBuffer.allocate(length);
|
||||||
|
HFileContext meta = new HFileContextBuilder().build();
|
||||||
|
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||||
|
offset, 52, -1, meta);
|
||||||
|
|
||||||
|
// initialize an mocked ioengine.
|
||||||
|
IOEngine ioEngine = Mockito.mock(IOEngine.class);
|
||||||
|
Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false);
|
||||||
|
// Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
|
||||||
|
Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
|
||||||
|
Mockito.anyLong());
|
||||||
|
Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
|
||||||
|
Mockito.anyLong());
|
||||||
|
|
||||||
|
// create an bucket allocator.
|
||||||
|
long availableSpace = 1024 * 1024 * 1024L;
|
||||||
|
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
|
||||||
|
|
||||||
|
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
|
||||||
|
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
|
||||||
|
|
||||||
|
Assert.assertEquals(0, allocator.getUsedSize());
|
||||||
|
try {
|
||||||
|
re.writeToCache(ioEngine, allocator, new UniqueIndexMap<>(), null);
|
||||||
|
Assert.fail();
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
Assert.assertEquals(0, allocator.getUsedSize());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue