HBASE-17158 Avoid deadlock caused by HRegion#doDelta (ChiaPing Tsai)
This commit is contained in:
parent
511398f43a
commit
9f5b8a83b7
|
@ -349,6 +349,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
private volatile Optional<ConfigurationManager> configurationManager;
|
||||
|
||||
// Used for testing.
|
||||
private volatile Long timeoutForWriteLock = null;
|
||||
|
||||
/**
|
||||
* @return The smallest mvcc readPoint across all the scanners in this
|
||||
* region. Writes older than this readPoint, are included in every
|
||||
|
@ -1446,6 +1449,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
this.closing.set(closing);
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link HRegion#doClose} will block forever if someone tries proving the dead lock via the unit test.
|
||||
* Instead of blocking, the {@link HRegion#doClose} will throw exception if you set the timeout.
|
||||
* @param timeoutForWriteLock the second time to wait for the write lock in {@link HRegion#doClose}
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void setTimeoutForWriteLock(long timeoutForWriteLock) {
|
||||
assert timeoutForWriteLock >= 0;
|
||||
this.timeoutForWriteLock = timeoutForWriteLock;
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH",
|
||||
justification="I think FindBugs is confused")
|
||||
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
|
||||
|
@ -1484,8 +1498,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
if (timeoutForWriteLock == null
|
||||
|| timeoutForWriteLock == Long.MAX_VALUE) {
|
||||
// block waiting for the lock for closing
|
||||
lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
|
||||
} else {
|
||||
try {
|
||||
boolean succeed = lock.writeLock().tryLock(timeoutForWriteLock, TimeUnit.SECONDS);
|
||||
if (!succeed) {
|
||||
throw new IOException("Failed to get write lock when closing region");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}
|
||||
this.closing.set(true);
|
||||
status.setStatus("Disabling writes for close");
|
||||
try {
|
||||
|
@ -5345,6 +5371,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getReadLockCount() {
|
||||
return lock.getReadLockCount();
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
|
||||
return lockedRows;
|
||||
}
|
||||
|
@ -7257,9 +7288,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
WriteEntry writeEntry = null;
|
||||
startRegionOperation(op);
|
||||
List<Cell> results = returnResults? new ArrayList<Cell>(mutation.size()): null;
|
||||
RowLock rowLock = getRowLockInternal(mutation.getRow(), false);
|
||||
RowLock rowLock = null;
|
||||
MemstoreSize memstoreSize = new MemstoreSize();
|
||||
try {
|
||||
rowLock = getRowLockInternal(mutation.getRow(), false);
|
||||
lock(this.updatesLock.readLock());
|
||||
try {
|
||||
Result cpResult = doCoprocessorPreCall(op, mutation);
|
||||
|
@ -7307,7 +7339,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// the client. Means only way to read-your-own-increment or append is to come in with an
|
||||
// a 0 increment.
|
||||
if (writeEntry != null) mvcc.complete(writeEntry);
|
||||
if (rowLock != null) {
|
||||
rowLock.release();
|
||||
}
|
||||
// Request a cache flush if over the limit. Do it outside update lock.
|
||||
if (isFlushSize(addAndGetMemstoreSize(memstoreSize))) {
|
||||
requestFlush();
|
||||
|
|
|
@ -23,14 +23,11 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -57,7 +54,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -567,6 +563,59 @@ public class TestFromClientSide3 {
|
|||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testLockLeakWithDelta() throws Exception, Throwable {
|
||||
TableName tableName = TableName.valueOf("testLockLeakWithDelta");
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
|
||||
desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
TEST_UTIL.getAdmin().createTable(desc);
|
||||
// new a connection for lower retry number.
|
||||
Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
|
||||
copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
try (Connection con = ConnectionFactory.createConnection(copy)) {
|
||||
HRegion region = (HRegion) find(tableName);
|
||||
region.setTimeoutForWriteLock(10);
|
||||
ExecutorService putService = Executors.newSingleThreadExecutor();
|
||||
putService.execute(() -> {
|
||||
try (Table table = con.getTable(tableName)) {
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
// the put will be blocked by WatiingForMultiMutationsObserver.
|
||||
table.put(put);
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
});
|
||||
ExecutorService appendService = Executors.newSingleThreadExecutor();
|
||||
appendService.execute(() -> {
|
||||
Append append = new Append(ROW);
|
||||
append.add(FAMILY, QUALIFIER, VALUE);
|
||||
try (Table table = con.getTable(tableName)) {
|
||||
table.append(append);
|
||||
fail("The APPEND should fail because the target lock is blocked by previous put");
|
||||
} catch (Throwable ex) {
|
||||
}
|
||||
});
|
||||
appendService.shutdown();
|
||||
appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
|
||||
WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
|
||||
observer.latch.countDown();
|
||||
putService.shutdown();
|
||||
putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
|
||||
try (Table table = con.getTable(tableName)) {
|
||||
Result r = table.get(new Get(ROW));
|
||||
assertFalse(r.isEmpty());
|
||||
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
|
||||
}
|
||||
}
|
||||
HRegion region = (HRegion) find(tableName);
|
||||
int readLockCount = region.getReadLockCount();
|
||||
LOG.info("readLockCount:" + readLockCount);
|
||||
assertEquals(0, readLockCount);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMultiRowMutations() throws Exception, Throwable {
|
||||
TableName tableName = TableName.valueOf("testMultiRowMutations");
|
||||
|
|
Loading…
Reference in New Issue