HBASE-15837 Memstore size accounting is wrong if postBatchMutate() throws exception

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
This commit is contained in:
Enis Soztutar 2016-05-25 17:04:50 -07:00
parent 7af013b267
commit 02a641e2b1
2 changed files with 56 additions and 6 deletions

View File

@ -1112,7 +1112,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.rsAccounting != null) {
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
}
return this.memstoreSize.addAndGet(memStoreSize);
long size = this.memstoreSize.addAndGet(memStoreSize);
// This is extremely bad if we make memstoreSize negative. Log as much info on the offending
// caller as possible. (memStoreSize might be a negative value already -- freeing memory)
if (size < 0) {
LOG.error("Asked to modify this region's (" + this.toString()
+ ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
+ (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
}
return size;
}
@Override
@ -2964,8 +2972,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
initialized = true;
}
long addedSize = doMiniBatchMutation(batchOp);
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
doMiniBatchMutation(batchOp);
long newSize = this.getMemstoreSize();
if (isFlushSize(newSize)) {
requestFlush();
}
@ -3046,6 +3054,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int noOfPuts = 0, noOfDeletes = 0;
WALKey walKey = null;
long mvccNum = 0;
long addedSize = 0;
try {
// ------------------------------------
// STEP 1. Try to acquire as many locks as we can, and ensure
@ -3289,7 +3298,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// visible to scanners till we update the MVCC. The MVCC is
// moved only when the sync is complete.
// ----------------------------------
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
@ -3372,8 +3380,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (writeEntry != null) mvcc.complete(writeEntry);
} else if (writeEntry != null) {
mvcc.completeAndWait(writeEntry);
} else {
this.addAndGetGlobalMemstoreSize(addedSize);
if (writeEntry != null) {
mvcc.completeAndWait(writeEntry);
}
}
if (locked) {

View File

@ -36,6 +36,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -399,6 +400,44 @@ public class TestHRegion {
HRegion.closeHRegion(region);
}
@Test
public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + testName);
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
Store store = region.getStore(COLUMN_FAMILY_BYTES);
assertEquals(0, region.getMemstoreSize());
// Put one value
byte [] value = Bytes.toBytes(name.getMethodName());
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
region.put(put);
long onePutSize = region.getMemstoreSize();
assertTrue(onePutSize > 0);
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
doThrow(new IOException())
.when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
region.setCoprocessorHost(mockedCPHost);
put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
try {
region.put(put);
fail("Should have failed with IOException");
} catch (IOException expected) {
}
assertEquals("memstoreSize should be incremented", onePutSize * 2, region.getMemstoreSize());
assertEquals("flushable size should be incremented", onePutSize * 2, store.getFlushableSize());
region.setCoprocessorHost(null);
HBaseTestingUtility.closeRegionAndWAL(region);
}
/**
* Test we do not lose data if we fail a flush and then close.
* Part of HBase-10466. Tests the following from the issue description: