HBASE-22185 RAMQueueEntry#writeToCache should freeBlock if any exception encountered instead of the IOException catch block #130
This commit is contained in:
parent
6df26a1ef9
commit
055a47e5c3
|
@ -1429,26 +1429,25 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
this.accessCounter = accessCounter;
|
||||
}
|
||||
|
||||
public BucketEntry writeToCache(final IOEngine ioEngine,
|
||||
final BucketAllocator bucketAllocator,
|
||||
final UniqueIndexMap<Integer> deserialiserMap,
|
||||
final AtomicLong realCacheSize) throws CacheFullException, IOException,
|
||||
BucketAllocatorException {
|
||||
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
|
||||
final UniqueIndexMap<Integer> deserialiserMap, final AtomicLong realCacheSize)
|
||||
throws IOException {
|
||||
int len = data.getSerializedLength();
|
||||
// This cacheable thing can't be serialized
|
||||
if (len == 0) return null;
|
||||
if (len == 0) {
|
||||
return null;
|
||||
}
|
||||
long offset = bucketAllocator.allocateBlock(len);
|
||||
BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
|
||||
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
||||
BucketEntry bucketEntry;
|
||||
boolean succ = false;
|
||||
try {
|
||||
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
|
||||
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
||||
if (data instanceof HFileBlock) {
|
||||
// If an instance of HFileBlock, save on some allocations.
|
||||
HFileBlock block = (HFileBlock)data;
|
||||
HFileBlock block = (HFileBlock) data;
|
||||
ByteBuffer sliceBuf = block.getBufferReadOnly();
|
||||
ByteBuffer metadata = block.getMetaData();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Write offset=" + offset + ", len=" + len);
|
||||
}
|
||||
ioEngine.write(sliceBuf, offset);
|
||||
ioEngine.write(metadata, offset + len - metadata.limit());
|
||||
} else {
|
||||
|
@ -1456,12 +1455,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
data.serialize(bb, true);
|
||||
ioEngine.write(bb, offset);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// free it in bucket allocator
|
||||
bucketAllocator.freeBlock(offset);
|
||||
throw ioe;
|
||||
succ = true;
|
||||
} finally {
|
||||
if (!succ) {
|
||||
bucketAllocator.freeBlock(offset);
|
||||
}
|
||||
}
|
||||
|
||||
realCacheSize.addAndGet(len);
|
||||
return bucketEntry;
|
||||
}
|
||||
|
|
|
@ -50,13 +50,16 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Basic test of BucketCache.Puts and gets.
|
||||
|
@ -431,4 +434,36 @@ public class TestBucketCache {
|
|||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata,
|
||||
actualBuffer, block1Buffer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
|
||||
// initialize an block.
|
||||
int size = 100, offset = 20;
|
||||
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
|
||||
ByteBuffer buf = ByteBuffer.allocate(length);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
offset, 52, -1, meta);
|
||||
|
||||
// initialize an mocked ioengine.
|
||||
IOEngine ioEngine = Mockito.mock(IOEngine.class);
|
||||
// Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
|
||||
Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
|
||||
Mockito.anyLong());
|
||||
|
||||
// create an bucket allocator.
|
||||
long availableSpace = 1024 * 1024 * 1024L;
|
||||
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
|
||||
|
||||
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
|
||||
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
|
||||
|
||||
Assert.assertEquals(0, allocator.getUsedSize());
|
||||
try {
|
||||
re.writeToCache(ioEngine, allocator, new UniqueIndexMap<Integer>(), null);
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
Assert.assertEquals(0, allocator.getUsedSize());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue