HBASE-18233 We shouldn't wait for readlock in doMiniBatchMutation in case of deadlock (Allan Yang)

This patch plus a sorting of the batch (HBASE-17924) fixes a regression
in Increment/CheckAndPut-style operations.

Signed-off-by: Yu Li <carp84@gmail.com>
Signed-off-by: Allan Yang <allanwin@163.com>
This commit is contained in:
Michael Stack 2017-11-28 09:14:58 -08:00
parent 143ceb97ba
commit 2f7a6f21eb
3 changed files with 227 additions and 30 deletions

View File

@ -2191,7 +2191,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Should the store be flushed because it is old enough.
* <p>
* Every FlushPolicy should call this to determine whether a store is old enough to flush(except
* that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
* that you always flush all stores). Otherwise the shouldFlush method will always
* returns true which will make a lot of flush requests.
*/
boolean shouldFlushStore(Store store) {
@ -3243,11 +3243,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
continue;
}
//HBASE-18233
// If we haven't got any rows in our batch, we should block to
// get the next one.
// get the next one's read lock. We need at least one row to mutate.
// If we have got rows, do not block when lock is not available,
// so that we can fail fast and go on with the rows with locks in
// the batch. By doing this, we can reduce contention and prevent
// possible deadlocks.
// The unfinished rows in the batch will be detected in batchMutate,
// and it wil try to finish them by calling doMiniBatchMutation again.
boolean shouldBlock = numReadyToWrite == 0;
RowLock rowLock = null;
try {
rowLock = getRowLockInternal(mutation.getRow(), true);
rowLock = getRowLockInternal(mutation.getRow(), true, shouldBlock);
} catch (TimeoutIOException e) {
// We will retry when other exceptions, but we should stop if we timeout .
throw e;
@ -3256,8 +3265,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ Bytes.toStringBinary(mutation.getRow()), ioe);
}
if (rowLock == null) {
// We failed to grab another lock
break; // stop acquiring more rows for this batch
// We failed to grab another lock. Stop acquiring more rows for this
// batch and go on with the gotten ones
break;
} else {
acquiredRowLocks.add(rowLock);
}
@ -3356,7 +3367,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now);
// Acquire row locks. If not, the whole batch will fail.
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true, true));
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation.
@ -3676,7 +3687,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
get.addColumn(family, qualifier);
checkRow(row, "checkAndMutate");
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLockInternal(get.getRow(), false);
RowLock rowLock = getRowLockInternal(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.await();
try {
@ -3786,7 +3797,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
get.addColumn(family, qualifier);
checkRow(row, "checkAndRowMutate");
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLockInternal(get.getRow(), false);
RowLock rowLock = getRowLockInternal(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.await();
try {
@ -4025,10 +4036,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* <b>not</b> check the families for validity.
*
* @param familyMap Map of kvs per family
* @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
* If null, then this method internally creates a mvcc transaction.
* @param output newly added KVs into memstore
* @param isInReplay true when adding replayed KVs into memstore
* @return the additional memory usage of the memstore caused by the
* new entries.
* @throws IOException
@ -5429,7 +5436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Get an exclusive ( write lock ) lock on a given row.
* @param row Which row to lock.
* @return A locked RowLock. The lock is exclusive and already aqquired.
* @throws IOException
* @throws IOException if any error occurred
*/
public RowLock getRowLock(byte[] row) throws IOException {
return getRowLock(row, false);
@ -5437,12 +5444,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
// Make sure the row is inside of this region before getting the lock for it.
checkRow(row, "row lock");
return getRowLockInternal(row, readLock);
return getRowLock(row, readLock, true);
}
protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException {
/**
*
* Get a row lock for the specified row. All locks are reentrant.
*
* Before calling this function make sure that a region operation has already been
* started (the calling thread has already acquired the region-close-guard lock).
* @param row The row actions will be performed against
* @param readLock is the lock reader or writer. True indicates that a non-exlcusive
* lock is requested
* @param waitForLock whether should wait for this lock
* @return A locked RowLock, or null if {@code waitForLock} set to false and tryLock failed
* @throws IOException if any error occurred
*/
public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException {
// Make sure the row is inside of this region before getting the lock for it.
checkRow(row, "row lock");
return getRowLockInternal(row, readLock, waitForLock);
}
// getRowLock calls checkRow. Call this to skip checkRow.
protected RowLock getRowLockInternal(byte[] row)
throws IOException {
return getRowLockInternal(row, false, true);
}
protected RowLock getRowLockInternal(byte[] row, boolean readLock, boolean waitForLock)
throws IOException {
// create an object to use a a key in the row lock map
HashedBytes rowKey = new HashedBytes(row);
@ -5493,18 +5524,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
boolean lockAvailable = false;
if (timeout > 0) {
if (waitForLock) {
// if waiting for lock, wait for timeout milliseconds
lockAvailable = result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS);
} else {
// If we are not waiting, tryLock() returns immediately whether we have the lock or not.
lockAvailable = result.getLock().tryLock();
}
}
if (!lockAvailable) {
if (traceScope != null) {
traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
}
result = null;
String message = "Timed out waiting for lock for row: " + rowKey + " in region "
+ getRegionInfo().getEncodedName();
if (reachDeadlineFirst) {
throw new TimeoutIOException(message);
String message = "Timed out waiting for lock for row: " + rowKey + " in region " +
getRegionInfo().getEncodedName() + ", timeout=" + timeout + ", deadlined=" +
reachDeadlineFirst + ", waitForLock=" + waitForLock;
if (waitForLock) {
if (reachDeadlineFirst) {
LOG.info("TIMEOUT: " + message);
throw new TimeoutIOException(message);
} else {
// If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
LOG.info("IOE " + message);
throw new IOException(message);
}
} else {
// If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
throw new IOException(message);
// We are here if we did a tryLock w/o waiting on it.
return null;
}
}
rowLockContext.setThreadName(Thread.currentThread().getName());
@ -7393,7 +7442,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLockInternal(row, false));
acquiredRowLocks.add(getRowLockInternal(row));
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
@ -7625,7 +7674,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALKey walKey = null;
boolean doRollBackMemstore = false;
try {
rowLock = getRowLockInternal(row, false);
rowLock = getRowLockInternal(row);
assert rowLock != null;
try {
lock(this.updatesLock.readLock());
@ -7907,7 +7956,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Map<Store, List<Cell>> forMemStore = new HashMap<>();
Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
try {
rowLock = getRowLockInternal(increment.getRow(), false);
rowLock = getRowLockInternal(increment.getRow());
long txid = 0;
try {
lock(this.updatesLock.readLock());
@ -8126,7 +8175,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* from <code>incrementCoordinates</code> only.
* @param increment
* @param columnFamily
* @param incrementCoordinates
* @return Return the Cells to Increment
* @throws IOException
*/

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -63,6 +64,7 @@ public class TestMultiParallel {
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
private static final String FAMILY = "family";
private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2");
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte [][] KEYS = makeKeys();
@ -761,4 +763,150 @@ public class TestMultiParallel {
validateEmpty(result);
}
}
private static class MultiThread extends Thread {
public Throwable throwable = null;
private CountDownLatch endLatch;
private CountDownLatch beginLatch;
List<Put> puts;
public MultiThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) {
this.puts = puts;
this.beginLatch = beginLatch;
this.endLatch = endLatch;
}
@Override
public void run() {
LOG.info("Start multi");
HTable table = null;
try {
table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
table.setAutoFlush(false);
beginLatch.await();
for (int i = 0; i < 100; i++) {
for(Put put : puts) {
table.put(put);
}
table.flushCommits();
}
} catch (Throwable t) {
throwable = t;
LOG.warn("Error when put:", t);
} finally {
endLatch.countDown();
if(table != null) {
try {
table.close();
} catch (IOException ioe) {
LOG.error("Error when close table", ioe);
}
}
}
LOG.info("End multi");
}
}
private static class IncrementThread extends Thread {
public Throwable throwable = null;
private CountDownLatch endLatch;
private CountDownLatch beginLatch;
List<Put> puts;
public IncrementThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) {
this.puts = puts;
this.beginLatch = beginLatch;
this.endLatch = endLatch;
}
@Override
public void run() {
LOG.info("Start inc");
HTable table = null;
try {
table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
beginLatch.await();
for (int i = 0; i < 100; i++) {
for(Put put : puts) {
Increment inc = new Increment(put.getRow());
inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1);
table.increment(inc);
}
}
} catch (Throwable t) {
throwable = t;
LOG.warn("Error when incr:", t);
} finally {
endLatch.countDown();
if(table != null) {
try {
table.close();
} catch (IOException ioe) {
LOG.error("Error when close table", ioe);
}
}
}
LOG.info("End inc");
}
}
/**
* UT for HBASE-18233, test for disordered batch mutation thread and
* increment won't lock each other
* @throws Exception if any error occurred
*/
@Test(timeout=300000)
public void testMultiThreadWithRowLocks() throws Exception {
//set a short timeout to get timeout exception when getting row lock fail
UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000);
UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000);
UTIL.getConfiguration().setInt("hbase.client.retries.number", 10);
UTIL.createTable(TEST_TABLE2, BYTES_FAMILY);
List<Put> puts = new ArrayList<>();
for(int i = 0; i < 10; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0));
puts.add(put);
}
List<Put> reversePuts = new ArrayList<>(puts);
Collections.reverse(reversePuts);
int NUM_OF_THREAD = 12;
CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD);
CountDownLatch beginLatch = new CountDownLatch(1);
int threadNum = NUM_OF_THREAD / 4;
List<MultiThread> multiThreads = new ArrayList<>();
List<IncrementThread> incThreads = new ArrayList<>();
for(int i = 0; i < threadNum; i ++) {
MultiThread thread = new MultiThread(reversePuts, beginLatch, latch);
thread.start();
multiThreads.add(thread);
}
for(int i = 0; i < threadNum; i++) {
MultiThread thread = new MultiThread(puts, beginLatch, latch);
thread.start();
multiThreads.add(thread);
}
for(int i = 0; i < threadNum; i ++) {
IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch);
thread.start();
incThreads.add(thread);
}
for(int i = 0; i < threadNum; i++) {
IncrementThread thread = new IncrementThread(puts, beginLatch, latch);
thread.start();
incThreads.add(thread);
}
long timeBegin = System.currentTimeMillis();
beginLatch.countDown();
latch.await();
LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin));
for(MultiThread thread : multiThreads) {
if (thread != null && thread.throwable != null) {
LOG.error(thread.throwable);
}
Assert.assertTrue(thread.throwable == null);
}
for(IncrementThread thread : incThreads) {
Assert.assertTrue(thread.throwable == null);
}
}
}

View File

@ -663,11 +663,12 @@ public class TestAtomicOperation {
}
@Override
public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException {
public RowLock getRowLockInternal(final byte[] row, boolean readLock, boolean waitForLock)
throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
return new WrappedRowLock(super.getRowLockInternal(row, readLock));
return new WrappedRowLock(super.getRowLockInternal(row, readLock, waitForLock));
}
public class WrappedRowLock implements RowLock {