HBASE-14689 Addendum and unit test for HBASE-13471
This commit is contained in:
parent
899857609c
commit
4c04e8065f
|
@ -2961,11 +2961,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed getting lock in batch put, row="
|
||||
+ Bytes.toStringBinary(mutation.getRow()), ioe);
|
||||
throw ioe;
|
||||
}
|
||||
if (rowLock == null) {
|
||||
// We failed to grab another lock
|
||||
assert false: "Should never fail to get lock when blocking";
|
||||
break; // stop acquiring more rows for this batch
|
||||
throw new IOException("Failed getting lock in batch put, row=" +
|
||||
Bytes.toStringBinary(mutation.getRow()));
|
||||
} else {
|
||||
acquiredRowLocks.add(rowLock);
|
||||
}
|
||||
|
@ -5055,6 +5056,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @param readLock is the lock reader or writer. True indicates that a non-exlcusive
|
||||
* lock is requested
|
||||
*/
|
||||
@Override
|
||||
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
|
||||
// Make sure the row is inside of this region before getting the lock for it.
|
||||
checkRow(row, "row lock");
|
||||
|
|
|
@ -56,7 +56,11 @@ import java.util.Map;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -99,6 +103,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
|||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
|
@ -6529,6 +6534,53 @@ public class TestHRegion {
|
|||
qual2, 0, qual2.length));
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testBatchMutateWithWrongRegionException() throws IOException, InterruptedException {
|
||||
final byte[] a = Bytes.toBytes("a");
|
||||
final byte[] b = Bytes.toBytes("b");
|
||||
final byte[] c = Bytes.toBytes("c"); // exclusive
|
||||
|
||||
int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
|
||||
CONF.setInt("hbase.rowlock.wait.duration", 3000);
|
||||
final HRegion region = initHRegion(tableName, a, c, name.getMethodName(), CONF, false, fam1);
|
||||
|
||||
Mutation[] mutations = new Mutation[] {
|
||||
new Put(a).addImmutable(fam1, null, null),
|
||||
new Put(c).addImmutable(fam1, null, null), // this is outside the region boundary
|
||||
new Put(b).addImmutable(fam1, null, null),
|
||||
};
|
||||
|
||||
OperationStatus[] status = region.batchMutate(mutations);
|
||||
assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||
assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SANITY_CHECK_FAILURE);
|
||||
assertEquals(status[2].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||
|
||||
// test with a leaked row lock
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
exec.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
region.getRowLock(b);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
exec.shutdown();
|
||||
exec.awaitTermination(30, TimeUnit.SECONDS);
|
||||
|
||||
mutations = new Mutation[] {
|
||||
new Put(a).addImmutable(fam1, null, null),
|
||||
new Put(b).addImmutable(fam1, null, null),
|
||||
};
|
||||
|
||||
try {
|
||||
status = region.batchMutate(mutations);
|
||||
fail("Failed to throw exception");
|
||||
} catch (IOException expected) {
|
||||
}
|
||||
|
||||
CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
|
||||
}
|
||||
|
||||
static HRegion initHRegion(TableName tableName, String callingMethod,
|
||||
byte[]... families) throws IOException {
|
||||
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
|
||||
|
|
Loading…
Reference in New Issue