HBASE-12565 Race condition in HRegion.batchMutate() causes partial data to be written when region closes

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Keith Winkler 2014-12-02 10:15:23 -06:00 committed by stack
parent 155a644b1c
commit f83e25e180
3 changed files with 145 additions and 76 deletions

View File

@ -2421,25 +2421,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
* @param mutations the list of mutations
* @param batchOp contains the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
boolean initialized = false;
while (!batchOp.isDone()) {
if (!batchOp.isInReplay()) {
checkReadOnly();
}
checkResources();
Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
startRegionOperation(op);
try {
while (!batchOp.isDone()) {
if (!batchOp.isInReplay()) {
checkReadOnly();
}
checkResources();
long newSize;
Operation op = Operation.BATCH_MUTATE;
if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
startRegionOperation(op);
try {
if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) {
@ -2448,13 +2445,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
initialized = true;
}
long addedSize = doMiniBatchMutation(batchOp);
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
} finally {
closeRegionOperation(op);
}
if (isFlushSize(newSize)) {
requestFlush();
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
if (isFlushSize(newSize)) {
requestFlush();
}
}
} finally {
closeRegionOperation(op);
}
return batchOp.retCodeDetails;
}
@ -2583,7 +2580,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
boolean shouldBlock = numReadyToWrite == 0;
RowLock rowLock = null;
try {
rowLock = getRowLock(mutation.getRow(), shouldBlock);
rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
} catch (IOException ioe) {
LOG.warn("Failed getting lock in batch put, row="
+ Bytes.toStringBinary(mutation.getRow()), ioe);
@ -3767,47 +3764,55 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException if waitForLock was true and the lock could not be acquired after waiting
*/
public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
checkRow(row, "row lock");
startRegionOperation();
try {
HashedBytes rowKey = new HashedBytes(row);
RowLockContext rowLockContext = new RowLockContext(rowKey);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
if (existingContext == null) {
// Row is not already locked by any thread, use newly created context.
break;
} else if (existingContext.ownedByCurrentThread()) {
// Row is already locked by current thread, reuse existing context instead.
rowLockContext = existingContext;
break;
} else {
if (!waitForLock) {
return null;
}
try {
// Row is already locked by some other thread, give up or wait for it
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
}
// allocate new lock for this thread
return rowLockContext.newLock();
return getRowLockInternal(row, waitForLock);
} finally {
closeRegionOperation();
}
}
/**
* A version of getRowLock(byte[], boolean) to use when a region operation has already been
* started (the calling thread has already acquired the region-close-guard lock).
*/
protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
checkRow(row, "row lock");
HashedBytes rowKey = new HashedBytes(row);
RowLockContext rowLockContext = new RowLockContext(rowKey);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
if (existingContext == null) {
// Row is not already locked by any thread, use newly created context.
break;
} else if (existingContext.ownedByCurrentThread()) {
// Row is already locked by current thread, reuse existing context instead.
rowLockContext = existingContext;
break;
} else {
if (!waitForLock) {
return null;
}
try {
// Row is already locked by some other thread, give up or wait for it
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
}
// allocate new lock for this thread
return rowLockContext.newLock();
}
/**
* Acquires a lock on the given row.
* The same thread may acquire multiple locks on the same row.

View File

@ -613,11 +613,11 @@ public class TestAtomicOperation {
}
@Override
public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
return new WrappedRowLock(super.getRowLock(row, waitForLock));
return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
}
public class WrappedRowLock extends RowLock {

View File

@ -1321,12 +1321,11 @@ public class TestHRegion {
}
@Test
public void testBatchPut() throws Exception {
byte[] b = Bytes.toBytes(getName());
public void testBatchPut_whileNoRowLocksHeld() throws IOException {
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
byte[] qual = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("val");
this.region = initHRegion(b, getName(), CONF, cf);
this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
try {
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
@ -1356,9 +1355,34 @@ public class TestHRegion {
}
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
@Test
public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
byte[] qual = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("val");
this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
try {
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
final Put[] puts = new Put[10];
for (int i = 0; i < 10; i++) {
puts[i] = new Put(Bytes.toBytes("row_" + i));
puts[i].add(cf, qual, val);
}
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
LOG.info("batchPut will have to break into four batches to avoid row locks");
RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_4"));
RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_6"));
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
@ -1368,36 +1392,76 @@ public class TestHRegion {
retFromThread.set(region.batchMutate(puts));
}
};
LOG.info("...starting put thread while holding lock");
LOG.info("...starting put thread while holding locks");
ctx.addThread(putter);
ctx.startThreads();
LOG.info("...waiting for put thread to sync first time");
long startWait = System.currentTimeMillis();
while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) {
Thread.sleep(100);
if (System.currentTimeMillis() - startWait > 10000) {
fail("Timed out waiting for thread to sync first minibatch");
LOG.info("...waiting for put thread to sync 1st time");
waitForCounter(source, "syncTimeNumOps", syncs + 1);
// Now attempt to close the region from another thread. Prior to HBASE-12565
// this would cause the in-progress batchMutate operation to to fail with
// exception because it use to release and re-acquire the close-guard lock
// between batches. Caller then didn't get status indicating which writes succeeded.
// We now expect this thread to block until the batchMutate call finishes.
Thread regionCloseThread = new Thread() {
@Override
public void run() {
try {
HRegion.closeHRegion(region);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
LOG.info("...releasing row lock, which should let put thread continue");
rowLock.release();
LOG.info("...joining on thread");
};
regionCloseThread.start();
LOG.info("...releasing row lock 1, which should let put thread continue");
rowLock1.release();
LOG.info("...waiting for put thread to sync 2nd time");
waitForCounter(source, "syncTimeNumOps", syncs + 2);
LOG.info("...releasing row lock 2, which should let put thread continue");
rowLock2.release();
LOG.info("...waiting for put thread to sync 3rd time");
waitForCounter(source, "syncTimeNumOps", syncs + 3);
LOG.info("...releasing row lock 3, which should let put thread continue");
rowLock3.release();
LOG.info("...waiting for put thread to sync 4th time");
waitForCounter(source, "syncTimeNumOps", syncs + 4);
LOG.info("...joining on put thread");
ctx.stop();
LOG.info("...checking that next batch was synced");
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source);
codes = retFromThread.get();
for (int i = 0; i < 10; i++) {
regionCloseThread.join();
OperationStatus[] codes = retFromThread.get();
for (int i = 0; i < codes.length; i++) {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
codes[i].getOperationStatusCode());
}
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
throws InterruptedException {
long startWait = System.currentTimeMillis();
long currentCount;
while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
Thread.sleep(100);
if (System.currentTimeMillis() - startWait > 10000) {
fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
expectedCount, currentCount));
}
}
}
@Test
public void testBatchPutWithTsSlop() throws Exception {
byte[] b = Bytes.toBytes(getName());