HBASE-19163 Maximum lock count exceeded from region server's batch processing

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
huaxiangsun 2017-12-06 16:58:31 -08:00
parent f55e81e6c0
commit 428e5672e7
3 changed files with 90 additions and 14 deletions

View File

@ -226,6 +226,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize"; public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize";
public static final int DEFAULT_MAX_CELL_SIZE = 10485760; public static final int DEFAULT_MAX_CELL_SIZE = 10485760;
public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE =
"hbase.regionserver.minibatch.size";
public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000;
/** /**
* This is the global default value for durability. All tables/mutations not * This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value. * defining a durability or using USE_DEFAULT will default to this value.
@ -338,6 +342,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// in bytes // in bytes
final long maxCellSize; final long maxCellSize;
// Number of mutations for minibatch processing.
private final int miniBatchSize;
// negative number indicates infinite timeout // negative number indicates infinite timeout
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
@ -809,6 +816,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE); this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE);
this.miniBatchSize = conf.getInt(HBASE_REGIONSERVER_MINIBATCH_SIZE,
DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE);
} }
void setHTableSpecificConf() { void setHTableSpecificConf() {
@ -3137,7 +3146,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List<RowLock> acquiredRowLocks) throws IOException { List<RowLock> acquiredRowLocks) throws IOException {
int readyToWriteCount = 0; int readyToWriteCount = 0;
int lastIndexExclusive = 0; int lastIndexExclusive = 0;
RowLock prevRowLock = null;
for (; lastIndexExclusive < size(); lastIndexExclusive++) { for (; lastIndexExclusive < size(); lastIndexExclusive++) {
// It reaches the miniBatchSize, stop here and process the miniBatch
// This only applies to non-atomic batch operations.
if (!isAtomic() && (readyToWriteCount == region.miniBatchSize)) {
break;
}
if (!isOperationPending(lastIndexExclusive)) { if (!isOperationPending(lastIndexExclusive)) {
continue; continue;
} }
@ -3146,7 +3162,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RowLock rowLock = null; RowLock rowLock = null;
try { try {
// if atomic then get exclusive lock, else shared lock // if atomic then get exclusive lock, else shared lock
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic()); rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
} catch (TimeoutIOException e) { } catch (TimeoutIOException e) {
// We will retry when other exceptions, but we should stop if we timeout . // We will retry when other exceptions, but we should stop if we timeout .
throw e; throw e;
@ -3163,8 +3179,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
break; // Stop acquiring more rows for this batch break; // Stop acquiring more rows for this batch
} else { } else {
if (rowLock != prevRowLock) {
// It is a different row now, add this to the acquiredRowLocks and
// set prevRowLock to the new returned rowLock
acquiredRowLocks.add(rowLock); acquiredRowLocks.add(rowLock);
prevRowLock = rowLock;
} }
}
readyToWriteCount++; readyToWriteCount++;
} }
return createMiniBatch(lastIndexExclusive, readyToWriteCount); return createMiniBatch(lastIndexExclusive, readyToWriteCount);
@ -3553,7 +3575,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.checkAndPrepareMutation(cpMutation, timestamp); this.checkAndPrepareMutation(cpMutation, timestamp);
// Acquire row locks. If not, the whole batch will fail. // Acquire row locks. If not, the whole batch will fail.
acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true)); acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true, null));
// Returned mutations from coprocessor correspond to the Mutation at index i. We can // Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation. // directly add the cells from those mutations to the familyMaps of this mutation.
@ -3930,7 +3952,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
get.addColumn(family, qualifier); get.addColumn(family, qualifier);
// Lock row - note that doBatchMutate will relock this row if called // Lock row - note that doBatchMutate will relock this row if called
checkRow(row, "doCheckAndRowMutate"); checkRow(row, "doCheckAndRowMutate");
RowLock rowLock = getRowLockInternal(get.getRow(), false); RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
try { try {
if (mutation != null && this.getCoprocessorHost() != null) { if (mutation != null && this.getCoprocessorHost() != null) {
// Call coprocessor. // Call coprocessor.
@ -5545,10 +5567,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override @Override
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
checkRow(row, "row lock"); checkRow(row, "row lock");
return getRowLockInternal(row, readLock); return getRowLockInternal(row, readLock, null);
} }
protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException { protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock prevRowLock)
throws IOException {
// create an object to use a a key in the row lock map // create an object to use a a key in the row lock map
HashedBytes rowKey = new HashedBytes(row); HashedBytes rowKey = new HashedBytes(row);
@ -5565,6 +5588,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Now try an get the lock. // Now try an get the lock.
// This can fail as // This can fail as
if (readLock) { if (readLock) {
// For read lock, if the caller has locked the same row previously, it will not try
// to acquire the same read lock. It simply returns the previous row lock.
RowLockImpl prevRowLockImpl = (RowLockImpl)prevRowLock;
if ((prevRowLockImpl != null) && (prevRowLockImpl.getLock() ==
rowLockContext.readWriteLock.readLock())) {
success = true;
return prevRowLock;
}
result = rowLockContext.newReadLock(); result = rowLockContext.newReadLock();
} else { } else {
result = rowLockContext.newWriteLock(); result = rowLockContext.newWriteLock();
@ -5587,7 +5618,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) { if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
TraceUtil.addTimelineAnnotation("Failed to get row lock"); TraceUtil.addTimelineAnnotation("Failed to get row lock");
result = null;
String message = "Timed out waiting for lock for row: " + rowKey + " in region " String message = "Timed out waiting for lock for row: " + rowKey + " in region "
+ getRegionInfo().getEncodedName(); + getRegionInfo().getEncodedName();
if (reachDeadlineFirst) { if (reachDeadlineFirst) {
@ -5607,6 +5637,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock"); TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw iie; throw iie;
} catch (Error error) {
// The maximum lock count for read lock is 64K (hardcoded), when this maximum count
// is reached, it will throw out an Error. This Error needs to be caught so it can
// go ahead to process the minibatch with lock acquired.
LOG.warn("Error to get row lock for " + Bytes.toStringBinary(row) + ", cause: " + error);
IOException ioe = new IOException();
ioe.initCause(error);
TraceUtil.addTimelineAnnotation("Error getting row lock");
throw ioe;
} finally { } finally {
// Clean up the counts just in case this was the thing keeping the context alive. // Clean up the counts just in case this was the thing keeping the context alive.
if (!success && rowLockContext != null) { if (!success && rowLockContext != null) {
@ -7154,10 +7193,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override @Override
public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch( public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch(
List<RowLock> acquiredRowLocks) throws IOException { List<RowLock> acquiredRowLocks) throws IOException {
RowLock prevRowLock = null;
for (byte[] row : rowsToLock) { for (byte[] row : rowsToLock) {
try { try {
RowLock rowLock = region.getRowLockInternal(row, false); // write lock RowLock rowLock = region.getRowLockInternal(row, false, prevRowLock); // write lock
if (rowLock != prevRowLock) {
acquiredRowLocks.add(rowLock); acquiredRowLocks.add(rowLock);
prevRowLock = rowLock;
}
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe); LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe);
throw ioe; throw ioe;
@ -7244,10 +7287,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try { try {
// STEP 2. Acquire the row lock(s) // STEP 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<>(rowsToLock.size()); acquiredRowLocks = new ArrayList<>(rowsToLock.size());
RowLock prevRowLock = null;
for (byte[] row : rowsToLock) { for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out // Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes // use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLockInternal(row, false)); RowLock rowLock = getRowLockInternal(row, false, prevRowLock);
if (rowLock != prevRowLock) {
acquiredRowLocks.add(rowLock);
prevRowLock = rowLock;
}
} }
// STEP 3. Region lock // STEP 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
@ -7425,7 +7473,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RowLock rowLock = null; RowLock rowLock = null;
MemStoreSizing memstoreAccounting = new MemStoreSizing(); MemStoreSizing memstoreAccounting = new MemStoreSizing();
try { try {
rowLock = getRowLockInternal(mutation.getRow(), false); rowLock = getRowLockInternal(mutation.getRow(), false, null);
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());
try { try {
Result cpResult = doCoprocessorPreCall(op, mutation); Result cpResult = doCoprocessorPreCall(op, mutation);
@ -7774,7 +7822,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + ClassSize.OBJECT +
ClassSize.ARRAY + ClassSize.ARRAY +
50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) + (14 * Bytes.SIZEOF_LONG) +
3 * Bytes.SIZEOF_BOOLEAN); 3 * Bytes.SIZEOF_BOOLEAN);

View File

@ -411,6 +411,33 @@ public class TestFromClientSide3 {
} }
} }
// Test Table.batch with large amount of mutations against the same key.
// It used to trigger read lock's "Maximum lock count exceeded" Error.
@Test
public void testHTableWithLargeBatch() throws Exception {
Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
new byte[][] { FAMILY });
int sixtyFourK = 64 * 1024;
try {
List actions = new ArrayList();
Object[] results = new Object[(sixtyFourK + 1) * 2];
for (int i = 0; i < sixtyFourK + 1; i ++) {
Put put1 = new Put(ROW);
put1.addColumn(FAMILY, QUALIFIER, VALUE);
actions.add(put1);
Put put2 = new Put(ANOTHERROW);
put2.addColumn(FAMILY, QUALIFIER, VALUE);
actions.add(put2);
}
table.batch(actions, results);
} finally {
table.close();
}
}
@Test @Test
public void testBatchWithRowMutation() throws Exception { public void testBatchWithRowMutation() throws Exception {
LOG.info("Starting testBatchWithRowMutation"); LOG.info("Starting testBatchWithRowMutation");

View File

@ -667,11 +667,12 @@ public class TestAtomicOperation {
} }
@Override @Override
public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException { public RowLock getRowLockInternal(final byte[] row, boolean readLock,
final RowLock prevRowlock) throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) { if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown(); latch.countDown();
} }
return new WrappedRowLock(super.getRowLockInternal(row, readLock)); return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
} }
public class WrappedRowLock implements RowLock { public class WrappedRowLock implements RowLock {