HBASE-15837 Memstore size accounting is wrong if postBatchMutate() throws exception
This commit is contained in:
parent
3ffaa116fe
commit
43d1a999fe
|
@ -1117,7 +1117,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (this.rsAccounting != null) {
|
||||
rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
|
||||
}
|
||||
return this.memstoreSize.addAndGet(memStoreSize);
|
||||
long size = this.memstoreSize.addAndGet(memStoreSize);
|
||||
// This is extremely bad if we make memstoreSize negative. Log as much info on the offending
|
||||
// caller as possible. (memStoreSize might be a negative value already -- freeing memory)
|
||||
if (size < 0) {
|
||||
LOG.error("Asked to modify this region's (" + this.toString()
|
||||
+ ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
|
||||
+ (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2367,7 +2375,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
perCfExtras = new StringBuilder();
|
||||
for (Store store: storesToFlush) {
|
||||
perCfExtras.append("; ").append(store.getColumnFamilyName());
|
||||
perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
|
||||
perCfExtras.append("=").append(StringUtils.byteDesc(store.getFlushableSize()));
|
||||
}
|
||||
}
|
||||
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
|
||||
|
@ -2943,8 +2951,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
initialized = true;
|
||||
}
|
||||
long addedSize = doMiniBatchMutate(batchOp);
|
||||
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
|
||||
doMiniBatchMutate(batchOp);
|
||||
long newSize = this.getMemstoreSize();
|
||||
requestFlushIfNeeded(newSize);
|
||||
}
|
||||
} finally {
|
||||
|
@ -3025,6 +3033,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
int cellCount = 0;
|
||||
/** Keep track of the locks we hold so we can release them in finally clause */
|
||||
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
|
||||
long addedSize = 0;
|
||||
try {
|
||||
// STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
|
||||
int numReadyToWrite = 0;
|
||||
|
@ -3206,7 +3215,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
// STEP 5. Write back to memstore
|
||||
long addedSize = 0;
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
|
||||
continue;
|
||||
|
@ -3267,6 +3275,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
} finally {
|
||||
// Call complete rather than completeAndWait because we probably had error if walKey != null
|
||||
if (writeEntry != null) mvcc.complete(writeEntry);
|
||||
this.addAndGetGlobalMemstoreSize(addedSize);
|
||||
if (locked) {
|
||||
this.updatesLock.readLock().unlock();
|
||||
}
|
||||
|
|
|
@ -172,6 +172,7 @@ import static org.junit.Assert.fail;
|
|||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
@ -412,6 +413,44 @@ public class TestHRegion {
|
|||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
|
||||
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
|
||||
FileSystem fs = FileSystem.get(CONF);
|
||||
Path rootDir = new Path(dir + testName);
|
||||
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
|
||||
HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
|
||||
CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
|
||||
Store store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||
assertEquals(0, region.getMemstoreSize());
|
||||
|
||||
// Put one value
|
||||
byte [] value = Bytes.toBytes(name.getMethodName());
|
||||
Put put = new Put(value);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
|
||||
region.put(put);
|
||||
long onePutSize = region.getMemstoreSize();
|
||||
assertTrue(onePutSize > 0);
|
||||
|
||||
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
|
||||
doThrow(new IOException())
|
||||
.when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
|
||||
region.setCoprocessorHost(mockedCPHost);
|
||||
|
||||
put = new Put(value);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
|
||||
try {
|
||||
region.put(put);
|
||||
fail("Should have failed with IOException");
|
||||
} catch (IOException expected) {
|
||||
}
|
||||
assertEquals("memstoreSize should be incremented", onePutSize * 2, region.getMemstoreSize());
|
||||
assertEquals("flushable size should be incremented", onePutSize * 2, store.getFlushableSize());
|
||||
|
||||
region.setCoprocessorHost(null);
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test we do not lose data if we fail a flush and then close.
|
||||
* Part of HBase-10466. Tests the following from the issue description:
|
||||
|
@ -6044,6 +6083,7 @@ public class TestHRegion {
|
|||
final HTableDescriptor htd, final RegionServerServices rsServices) {
|
||||
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
|
||||
}
|
||||
@Override
|
||||
protected long getNextSequenceId(WAL wal) throws IOException {
|
||||
return 42;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue