HBASE-14689 Addendum and unit test for HBASE-13471
This commit is contained in:
parent
5898b95329
commit
90bdb0dc74
|
@ -2974,7 +2974,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
if (rowLock == null) {
|
if (rowLock == null) {
|
||||||
// We failed to grab another lock
|
// We failed to grab another lock
|
||||||
assert false: "Should never fail to get lock when blocking";
|
|
||||||
break; // stop acquiring more rows for this batch
|
break; // stop acquiring more rows for this batch
|
||||||
} else {
|
} else {
|
||||||
acquiredRowLocks.add(rowLock);
|
acquiredRowLocks.add(rowLock);
|
||||||
|
|
|
@ -56,7 +56,12 @@ import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -99,6 +104,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
|
@ -6167,7 +6173,7 @@ public class TestHRegion {
|
||||||
key.setWriteEntry(we);
|
key.setWriteEntry(we);
|
||||||
return 1L;
|
return 1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
return wal;
|
return wal;
|
||||||
}
|
}
|
||||||
|
@ -6190,7 +6196,7 @@ public class TestHRegion {
|
||||||
// capture append() calls
|
// capture append() calls
|
||||||
WAL wal = mockWAL();
|
WAL wal = mockWAL();
|
||||||
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
|
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
|
||||||
|
|
||||||
|
|
||||||
// open a region first so that it can be closed later
|
// open a region first so that it can be closed later
|
||||||
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
|
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
|
||||||
|
@ -6497,6 +6503,68 @@ public class TestHRegion {
|
||||||
qual2, 0, qual2.length));
|
qual2, 0, qual2.length));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testBatchMutateWithWrongRegionException() throws Exception {
|
||||||
|
final byte[] a = Bytes.toBytes("a");
|
||||||
|
final byte[] b = Bytes.toBytes("b");
|
||||||
|
final byte[] c = Bytes.toBytes("c"); // exclusive
|
||||||
|
|
||||||
|
int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
|
||||||
|
CONF.setInt("hbase.rowlock.wait.duration", 1000);
|
||||||
|
final HRegion region = initHRegion(tableName, a, c, name.getMethodName(), CONF, false, fam1);
|
||||||
|
|
||||||
|
Mutation[] mutations = new Mutation[] {
|
||||||
|
new Put(a).addImmutable(fam1, null, null),
|
||||||
|
new Put(c).addImmutable(fam1, null, null), // this is outside the region boundary
|
||||||
|
new Put(b).addImmutable(fam1, null, null),
|
||||||
|
};
|
||||||
|
|
||||||
|
OperationStatus[] status = region.batchMutate(mutations);
|
||||||
|
assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||||
|
assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SANITY_CHECK_FAILURE);
|
||||||
|
assertEquals(status[2].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||||
|
|
||||||
|
|
||||||
|
// test with a row lock held for a long time
|
||||||
|
final CountDownLatch obtainedRowLock = new CountDownLatch(1);
|
||||||
|
ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||||
|
Future<Void> f1 = exec.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
LOG.info("Acquiring row lock");
|
||||||
|
RowLock rl = region.getRowLock(b);
|
||||||
|
obtainedRowLock.countDown();
|
||||||
|
LOG.info("Waiting for 5 seconds before releasing lock");
|
||||||
|
Threads.sleep(5000);
|
||||||
|
LOG.info("Releasing row lock");
|
||||||
|
rl.release();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
obtainedRowLock.await(30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
Future<Void> f2 = exec.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
Mutation[] mutations = new Mutation[] {
|
||||||
|
new Put(a).addImmutable(fam1, null, null),
|
||||||
|
new Put(b).addImmutable(fam1, null, null),
|
||||||
|
};
|
||||||
|
|
||||||
|
// this will wait for the row lock, and it will eventually succeed
|
||||||
|
OperationStatus[] status = region.batchMutate(mutations);
|
||||||
|
assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||||
|
assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
f1.get();
|
||||||
|
f2.get();
|
||||||
|
|
||||||
|
CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
|
public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
|
||||||
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
|
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
|
||||||
|
|
Loading…
Reference in New Issue