From f83e25e180243c45d79deefe20598f8f1817a158 Mon Sep 17 00:00:00 2001 From: Keith Winkler Date: Tue, 2 Dec 2014 10:15:23 -0600 Subject: [PATCH] HBASE-12565 Race condition in HRegion.batchMutate() causes partial data to be written when region closes Signed-off-by: stack --- .../hadoop/hbase/regionserver/HRegion.java | 111 +++++++++--------- .../regionserver/TestAtomicOperation.java | 4 +- .../hbase/regionserver/TestHRegion.java | 106 +++++++++++++---- 3 files changed, 145 insertions(+), 76 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d5c07f07586..1007a882846 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2421,25 +2421,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. - * @param mutations the list of mutations + * @param batchOp contains the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ OperationStatus[] batchMutate(BatchOperationInProgress batchOp) throws IOException { boolean initialized = false; - while (!batchOp.isDone()) { - if (!batchOp.isInReplay()) { - checkReadOnly(); - } - checkResources(); + Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; + startRegionOperation(op); + try { + while (!batchOp.isDone()) { + if (!batchOp.isInReplay()) { + checkReadOnly(); + } + checkResources(); - long newSize; - Operation op = Operation.BATCH_MUTATE; - if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE; - startRegionOperation(op); - - try { if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); if (!batchOp.isInReplay()) { @@ -2448,13 +2445,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // initialized = true; } long addedSize = doMiniBatchMutation(batchOp); - newSize = this.addAndGetGlobalMemstoreSize(addedSize); - } finally { - closeRegionOperation(op); - } - if (isFlushSize(newSize)) { - requestFlush(); + long newSize = this.addAndGetGlobalMemstoreSize(addedSize); + if (isFlushSize(newSize)) { + requestFlush(); + } } + } finally { + closeRegionOperation(op); } return batchOp.retCodeDetails; } @@ -2583,7 +2580,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLock(mutation.getRow(), shouldBlock); + rowLock = getRowLockInternal(mutation.getRow(), shouldBlock); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); @@ -3767,47 +3764,55 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException { - checkRow(row, "row lock"); startRegionOperation(); try { - HashedBytes rowKey = new HashedBytes(row); - RowLockContext rowLockContext = new RowLockContext(rowKey); - - // loop until we acquire the row lock (unless !waitForLock) - while (true) { - RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); - if (existingContext == null) { - // Row is not already locked by any thread, use newly created context. - break; - } else if (existingContext.ownedByCurrentThread()) { - // Row is already locked by current thread, reuse existing context instead. - rowLockContext = existingContext; - break; - } else { - if (!waitForLock) { - return null; - } - try { - // Row is already locked by some other thread, give up or wait for it - if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { - throw new IOException("Timed out waiting for lock for row: " + rowKey); - } - } catch (InterruptedException ie) { - LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } - } - } - - // allocate new lock for this thread - return rowLockContext.newLock(); + return getRowLockInternal(row, waitForLock); } finally { closeRegionOperation(); } } + /** + * A version of getRowLock(byte[], boolean) to use when a region operation has already been + * started (the calling thread has already acquired the region-close-guard lock). + */ + protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException { + checkRow(row, "row lock"); + HashedBytes rowKey = new HashedBytes(row); + RowLockContext rowLockContext = new RowLockContext(rowKey); + + // loop until we acquire the row lock (unless !waitForLock) + while (true) { + RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); + if (existingContext == null) { + // Row is not already locked by any thread, use newly created context. + break; + } else if (existingContext.ownedByCurrentThread()) { + // Row is already locked by current thread, reuse existing context instead. + rowLockContext = existingContext; + break; + } else { + if (!waitForLock) { + return null; + } + try { + // Row is already locked by some other thread, give up or wait for it + if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for lock for row: " + rowKey); + } + } catch (InterruptedException ie) { + LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + } + } + + // allocate new lock for this thread + return rowLockContext.newLock(); + } + /** * Acquires a lock on the given row. * The same thread may acquire multiple locks on the same row. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index a4b4959696c..883e530be36 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -613,11 +613,11 @@ public class TestAtomicOperation { } @Override - public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException { + public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return new WrappedRowLock(super.getRowLock(row, waitForLock)); + return new WrappedRowLock(super.getRowLockInternal(row, waitForLock)); } public class WrappedRowLock extends RowLock { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index f7936c3ff28..ffaa34709d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1321,12 +1321,11 @@ public class TestHRegion { } @Test - public void testBatchPut() throws Exception { - byte[] b = Bytes.toBytes(getName()); + public void testBatchPut_whileNoRowLocksHeld() throws IOException { byte[] cf = Bytes.toBytes(COLUMN_FAMILY); byte[] qual = Bytes.toBytes("qual"); byte[] val = Bytes.toBytes("val"); - this.region = initHRegion(b, getName(), CONF, cf); + this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf); MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); @@ -1356,9 +1355,34 @@ public class TestHRegion { } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } - LOG.info("Next a batch put that has to break into two batches to avoid a lock"); - RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2")); + @Test + public void testBatchPut_whileMultipleRowLocksHeld() throws Exception { + byte[] cf = Bytes.toBytes(COLUMN_FAMILY); + byte[] qual = Bytes.toBytes("qual"); + byte[] val = Bytes.toBytes("val"); + this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf); + MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); + try { + long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); + metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); + + final Put[] puts = new Put[10]; + for (int i = 0; i < 10; i++) { + puts[i] = new Put(Bytes.toBytes("row_" + i)); + puts[i].add(cf, qual, val); + } + puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); + + LOG.info("batchPut will have to break into four batches to avoid row locks"); + RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2")); + RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_4")); + RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_6")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); final AtomicReference retFromThread = new AtomicReference(); @@ -1368,36 +1392,76 @@ public class TestHRegion { retFromThread.set(region.batchMutate(puts)); } }; - LOG.info("...starting put thread while holding lock"); + LOG.info("...starting put thread while holding locks"); ctx.addThread(putter); ctx.startThreads(); - LOG.info("...waiting for put thread to sync first time"); - long startWait = System.currentTimeMillis(); - while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) { - Thread.sleep(100); - if (System.currentTimeMillis() - startWait > 10000) { - fail("Timed out waiting for thread to sync first minibatch"); + LOG.info("...waiting for put thread to sync 1st time"); + waitForCounter(source, "syncTimeNumOps", syncs + 1); + + // Now attempt to close the region from another thread. Prior to HBASE-12565 + // this would cause the in-progress batchMutate operation to to fail with + // exception because it use to release and re-acquire the close-guard lock + // between batches. Caller then didn't get status indicating which writes succeeded. + // We now expect this thread to block until the batchMutate call finishes. + Thread regionCloseThread = new Thread() { + @Override + public void run() { + try { + HRegion.closeHRegion(region); + } catch (IOException e) { + throw new RuntimeException(e); + } } - } - LOG.info("...releasing row lock, which should let put thread continue"); - rowLock.release(); - LOG.info("...joining on thread"); + }; + regionCloseThread.start(); + + LOG.info("...releasing row lock 1, which should let put thread continue"); + rowLock1.release(); + + LOG.info("...waiting for put thread to sync 2nd time"); + waitForCounter(source, "syncTimeNumOps", syncs + 2); + + LOG.info("...releasing row lock 2, which should let put thread continue"); + rowLock2.release(); + + LOG.info("...waiting for put thread to sync 3rd time"); + waitForCounter(source, "syncTimeNumOps", syncs + 3); + + LOG.info("...releasing row lock 3, which should let put thread continue"); + rowLock3.release(); + + LOG.info("...waiting for put thread to sync 4th time"); + waitForCounter(source, "syncTimeNumOps", syncs + 4); + + LOG.info("...joining on put thread"); ctx.stop(); - LOG.info("...checking that next batch was synced"); - metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source); - codes = retFromThread.get(); - for (int i = 0; i < 10; i++) { + regionCloseThread.join(); + + OperationStatus[] codes = retFromThread.get(); + for (int i = 0; i < codes.length; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } - } finally { HRegion.closeHRegion(this.region); this.region = null; } } + private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount) + throws InterruptedException { + long startWait = System.currentTimeMillis(); + long currentCount; + while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) { + Thread.sleep(100); + if (System.currentTimeMillis() - startWait > 10000) { + fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName, + expectedCount, currentCount)); + } + } + } + @Test public void testBatchPutWithTsSlop() throws Exception { byte[] b = Bytes.toBytes(getName());