HBASE-15714 We are calling checkRow() twice in doMiniBatchMutation()

This commit is contained in:
chenheng 2016-05-03 09:42:41 +10:00
parent 58c4c3d174
commit d77972ff16
2 changed files with 11 additions and 7 deletions

View File

@ -3019,7 +3019,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we haven't got any rows in our batch, we should block to get the next one.
RowLock rowLock = null;
try {
rowLock = getRowLock(mutation.getRow(), true);
rowLock = getRowLockInternal(mutation.getRow(), true);
} catch (IOException ioe) {
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
}
@ -3423,7 +3423,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkFamily(family);
get.addColumn(family, qualifier);
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
checkRow(row, "doCheckAndRowMutate");
RowLock rowLock = getRowLockInternal(get.getRow(), false);
try {
if (mutation != null && this.getCoprocessorHost() != null) {
// Call coprocessor.
@ -5107,8 +5108,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
@Override
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
// Make sure the row is inside of this region before getting the lock for it.
checkRow(row, "row lock");
return getRowLockInternal(row, readLock);
}
protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException {
// create an object to use a a key in the row lock map
HashedBytes rowKey = new HashedBytes(row);
@ -6827,7 +6831,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLock(row));
acquiredRowLocks.add(getRowLockInternal(row, false));
}
// STEP 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
@ -7007,7 +7011,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(op);
long accumulatedResultSize = 0;
List<Cell> results = returnResults? new ArrayList<Cell>(mutation.size()): null;
RowLock rowLock = getRowLock(mutation.getRow());
RowLock rowLock = getRowLockInternal(mutation.getRow(), false);
try {
lock(this.updatesLock.readLock());
try {

View File

@ -666,11 +666,11 @@ public class TestAtomicOperation {
}
@Override
public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException {
public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
return new WrappedRowLock(super.getRowLock(row, readLock));
return new WrappedRowLock(super.getRowLockInternal(row, readLock));
}
public class WrappedRowLock implements RowLock {