diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6b41bc4fbad..f074b0e83f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -349,6 +349,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private volatile Optional configurationManager; + // Used for testing. + private volatile Long timeoutForWriteLock = null; + /** * @return The smallest mvcc readPoint across all the scanners in this * region. Writes older than this readPoint, are included in every @@ -1446,6 +1449,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.closing.set(closing); } + /** + * The {@link HRegion#doClose} will block forever if someone tries proving the dead lock via the unit test. + * Instead of blocking, the {@link HRegion#doClose} will throw exception if you set the timeout. + * @param timeoutForWriteLock the second time to wait for the write lock in {@link HRegion#doClose} + */ + @VisibleForTesting + public void setTimeoutForWriteLock(long timeoutForWriteLock) { + assert timeoutForWriteLock >= 0; + this.timeoutForWriteLock = timeoutForWriteLock; + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH", justification="I think FindBugs is confused") private Map> doClose(final boolean abort, MonitoredTask status) @@ -1484,8 +1498,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // block waiting for the lock for closing - lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine + if (timeoutForWriteLock == null + || timeoutForWriteLock == Long.MAX_VALUE) { + // block waiting for the lock for closing + lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine + } else { + try { + boolean succeed = lock.writeLock().tryLock(timeoutForWriteLock, TimeUnit.SECONDS); + if (!succeed) { + throw new IOException("Failed to get write lock when closing region"); + } + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + } this.closing.set(true); status.setStatus("Disabling writes for close"); try { @@ -5345,6 +5371,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @VisibleForTesting + public int getReadLockCount() { + return lock.getReadLockCount(); + } + public ConcurrentHashMap getLockedRows() { return lockedRows; } @@ -7257,9 +7288,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WriteEntry writeEntry = null; startRegionOperation(op); List results = returnResults? new ArrayList(mutation.size()): null; - RowLock rowLock = getRowLockInternal(mutation.getRow(), false); + RowLock rowLock = null; MemstoreSize memstoreSize = new MemstoreSize(); try { + rowLock = getRowLockInternal(mutation.getRow(), false); lock(this.updatesLock.readLock()); try { Result cpResult = doCoprocessorPreCall(op, mutation); @@ -7307,7 +7339,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // the client. Means only way to read-your-own-increment or append is to come in with an // a 0 increment. if (writeEntry != null) mvcc.complete(writeEntry); - rowLock.release(); + if (rowLock != null) { + rowLock.release(); + } // Request a cache flush if over the limit. Do it outside update lock. if (isFlushSize(addAndGetMemstoreSize(memstoreSize))) { requestFlush(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index cbc97a22014..9fc20ec961d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -23,14 +23,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -57,7 +54,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -567,6 +563,59 @@ public class TestFromClientSide3 { TEST_UTIL.deleteTable(tableName); } + @Test(timeout = 30000) + public void testLockLeakWithDelta() throws Exception, Throwable { + TableName tableName = TableName.valueOf("testLockLeakWithDelta"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName()); + desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + TEST_UTIL.getAdmin().createTable(desc); + // new a connection for lower retry number. + Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); + copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + try (Connection con = ConnectionFactory.createConnection(copy)) { + HRegion region = (HRegion) find(tableName); + region.setTimeoutForWriteLock(10); + ExecutorService putService = Executors.newSingleThreadExecutor(); + putService.execute(() -> { + try (Table table = con.getTable(tableName)) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + // the put will be blocked by WatiingForMultiMutationsObserver. + table.put(put); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + ExecutorService appendService = Executors.newSingleThreadExecutor(); + appendService.execute(() -> { + Append append = new Append(ROW); + append.add(FAMILY, QUALIFIER, VALUE); + try (Table table = con.getTable(tableName)) { + table.append(append); + fail("The APPEND should fail because the target lock is blocked by previous put"); + } catch (Throwable ex) { + } + }); + appendService.shutdown(); + appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class); + observer.latch.countDown(); + putService.shutdown(); + putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + try (Table table = con.getTable(tableName)) { + Result r = table.get(new Get(ROW)); + assertFalse(r.isEmpty()); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); + } + } + HRegion region = (HRegion) find(tableName); + int readLockCount = region.getReadLockCount(); + LOG.info("readLockCount:" + readLockCount); + assertEquals(0, readLockCount); + } + @Test(timeout = 30000) public void testMultiRowMutations() throws Exception, Throwable { TableName tableName = TableName.valueOf("testMultiRowMutations");