HBASE-15714 We are calling checkRow() twice in doMiniBatchMutation()

This commit is contained in:
chenheng 2016-05-03 12:45:18 +10:00
parent 330b3b281a
commit 0ee3ca2a78
2 changed files with 15 additions and 11 deletions

View File

@ -3104,7 +3104,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// get the next one. // get the next one.
RowLock rowLock = null; RowLock rowLock = null;
try { try {
rowLock = getRowLock(mutation.getRow(), true); rowLock = getRowLockInternal(mutation.getRow(), true);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Failed getting lock in batch put, row=" LOG.warn("Failed getting lock in batch put, row="
+ Bytes.toStringBinary(mutation.getRow()), ioe); + Bytes.toStringBinary(mutation.getRow()), ioe);
@ -3455,9 +3455,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Get get = new Get(row); Get get = new Get(row);
checkFamily(family); checkFamily(family);
get.addColumn(family, qualifier); get.addColumn(family, qualifier);
checkRow(row, "checkAndMutate");
// Lock row - note that doBatchMutate will relock this row if called // Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow()); RowLock rowLock = getRowLockInternal(get.getRow(), false);
// wait for all previous transactions to complete (with lock held) // wait for all previous transactions to complete (with lock held)
mvcc.await(); mvcc.await();
try { try {
@ -3565,9 +3565,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Get get = new Get(row); Get get = new Get(row);
checkFamily(family); checkFamily(family);
get.addColumn(family, qualifier); get.addColumn(family, qualifier);
checkRow(row, "checkAndRowMutate");
// Lock row - note that doBatchMutate will relock this row if called // Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow()); RowLock rowLock = getRowLockInternal(get.getRow(), false);
// wait for all previous transactions to complete (with lock held) // wait for all previous transactions to complete (with lock held)
mvcc.await(); mvcc.await();
try { try {
@ -5202,6 +5202,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
// Make sure the row is inside of this region before getting the lock for it. // Make sure the row is inside of this region before getting the lock for it.
checkRow(row, "row lock"); checkRow(row, "row lock");
return getRowLockInternal(row, readLock);
}
protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException {
// create an object to use a a key in the row lock map // create an object to use a a key in the row lock map
HashedBytes rowKey = new HashedBytes(row); HashedBytes rowKey = new HashedBytes(row);
@ -7022,7 +7026,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (byte[] row : rowsToLock) { for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out // Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes // use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLock(row)); acquiredRowLocks.add(getRowLockInternal(row, false));
} }
// 3. Region lock // 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
@ -7259,7 +7263,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALKey walKey = null; WALKey walKey = null;
boolean doRollBackMemstore = false; boolean doRollBackMemstore = false;
try { try {
rowLock = getRowLock(row); rowLock = getRowLockInternal(row, false);
assert rowLock != null; assert rowLock != null;
try { try {
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());
@ -7549,7 +7553,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// changing it. These latter increments by zero are NOT added to the WAL. // changing it. These latter increments by zero are NOT added to the WAL.
List<Cell> allKVs = new ArrayList<Cell>(increment.size()); List<Cell> allKVs = new ArrayList<Cell>(increment.size());
Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
RowLock rowLock = getRowLock(increment.getRow()); RowLock rowLock = getRowLockInternal(increment.getRow(), false);
try { try {
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());
try { try {
@ -7645,7 +7649,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List<Cell> memstoreCells = new ArrayList<Cell>(); List<Cell> memstoreCells = new ArrayList<Cell>();
Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
try { try {
rowLock = getRowLock(increment.getRow()); rowLock = getRowLockInternal(increment.getRow(), false);
long txid = 0; long txid = 0;
try { try {
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());

View File

@ -685,11 +685,11 @@ public class TestAtomicOperation {
} }
@Override @Override
public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException { public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) { if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown(); latch.countDown();
} }
return new WrappedRowLock(super.getRowLock(row, readLock)); return new WrappedRowLock(super.getRowLockInternal(row, readLock));
} }
public class WrappedRowLock implements RowLock { public class WrappedRowLock implements RowLock {