HBASE-17131 Avoid livelock caused by HRegion#processRowsWithLocks (ChiaPing Tsai)
This commit is contained in:
parent
ec9c9e201a
commit
bb645bcfda
|
@ -7044,8 +7044,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return;
|
||||
}
|
||||
|
||||
boolean locked;
|
||||
List<RowLock> acquiredRowLocks;
|
||||
boolean locked = false;
|
||||
List<RowLock> acquiredRowLocks = null;
|
||||
List<Mutation> mutations = new ArrayList<Mutation>();
|
||||
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
||||
// This is assigned by mvcc either explicity in the below or in the guts of the WAL append
|
||||
|
@ -7053,19 +7053,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
WriteEntry writeEntry = null;
|
||||
MemstoreSize memstoreSize = new MemstoreSize();
|
||||
try {
|
||||
// STEP 2. Acquire the row lock(s)
|
||||
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
|
||||
for (byte[] row : rowsToLock) {
|
||||
// Attempt to lock all involved rows, throw if any lock times out
|
||||
// use a writer lock for mixed reads and writes
|
||||
acquiredRowLocks.add(getRowLockInternal(row, false));
|
||||
}
|
||||
// STEP 3. Region lock
|
||||
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
|
||||
locked = true;
|
||||
boolean success = false;
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
try {
|
||||
// STEP 2. Acquire the row lock(s)
|
||||
acquiredRowLocks = new ArrayList<>(rowsToLock.size());
|
||||
for (byte[] row : rowsToLock) {
|
||||
// Attempt to lock all involved rows, throw if any lock times out
|
||||
// use a writer lock for mixed reads and writes
|
||||
acquiredRowLocks.add(getRowLockInternal(row, false));
|
||||
}
|
||||
// STEP 3. Region lock
|
||||
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
|
||||
locked = true;
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
|
||||
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
|
||||
if (!mutations.isEmpty()) {
|
||||
|
|
|
@ -23,13 +23,17 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
|
@ -43,10 +47,17 @@ import org.apache.hadoop.hbase.regionserver.Region;
|
|||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -54,6 +65,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -494,7 +506,7 @@ public class TestFromClientSide3 {
|
|||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPutWithPreBatchMutate ()throws Exception {
|
||||
public void testPutWithPreBatchMutate() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testPutWithPreBatchMutate");
|
||||
testPreBatchMutate(tableName, () -> {
|
||||
try {
|
||||
|
@ -509,7 +521,7 @@ public class TestFromClientSide3 {
|
|||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRowMutationsWithPreBatchMutate ()throws Exception {
|
||||
public void testRowMutationsWithPreBatchMutate() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate");
|
||||
testPreBatchMutate(tableName, () -> {
|
||||
try {
|
||||
|
@ -525,7 +537,7 @@ public class TestFromClientSide3 {
|
|||
});
|
||||
}
|
||||
|
||||
private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception {
|
||||
private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addCoprocessor(WatiingForScanObserver.class.getName());
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
|
@ -555,22 +567,118 @@ public class TestFromClientSide3 {
|
|||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
|
||||
private static <T extends RegionObserver> T find(final TableName tableName,
|
||||
Class<T> clz) throws IOException, InterruptedException {
|
||||
@Test(timeout = 30000)
|
||||
public void testMultiRowMutations() throws Exception, Throwable {
|
||||
TableName tableName = TableName.valueOf("testMultiRowMutations");
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
|
||||
desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
|
||||
desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
TEST_UTIL.getAdmin().createTable(desc);
|
||||
// new a connection for lower retry number.
|
||||
Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
|
||||
copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
try (Connection con = ConnectionFactory.createConnection(copy)) {
|
||||
byte[] row = Bytes.toBytes("ROW-0");
|
||||
byte[] rowLocked= Bytes.toBytes("ROW-1");
|
||||
byte[] value0 = Bytes.toBytes("VALUE-0");
|
||||
byte[] value1 = Bytes.toBytes("VALUE-1");
|
||||
byte[] value2 = Bytes.toBytes("VALUE-2");
|
||||
assertNoLocks(tableName);
|
||||
ExecutorService putService = Executors.newSingleThreadExecutor();
|
||||
putService.execute(() -> {
|
||||
try (Table table = con.getTable(tableName)) {
|
||||
Put put0 = new Put(rowLocked);
|
||||
put0.addColumn(FAMILY, QUALIFIER, value0);
|
||||
// the put will be blocked by WatiingForMultiMutationsObserver.
|
||||
table.put(put0);
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
});
|
||||
ExecutorService cpService = Executors.newSingleThreadExecutor();
|
||||
cpService.execute(() -> {
|
||||
Put put1 = new Put(row);
|
||||
Put put2 = new Put(rowLocked);
|
||||
put1.addColumn(FAMILY, QUALIFIER, value1);
|
||||
put2.addColumn(FAMILY, QUALIFIER, value2);
|
||||
try (Table table = con.getTable(tableName)) {
|
||||
MultiRowMutationProtos.MutateRowsRequest request
|
||||
= MultiRowMutationProtos.MutateRowsRequest.newBuilder()
|
||||
.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1))
|
||||
.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2))
|
||||
.build();
|
||||
table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
|
||||
ROW, ROW,
|
||||
(MultiRowMutationProtos.MultiRowMutationService exe) -> {
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
|
||||
rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
||||
exe.mutateRows(controller, request, rpcCallback);
|
||||
return rpcCallback.get();
|
||||
});
|
||||
fail("This cp should fail because the target lock is blocked by previous put");
|
||||
} catch (Throwable ex) {
|
||||
}
|
||||
});
|
||||
cpService.shutdown();
|
||||
cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
|
||||
WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
|
||||
observer.latch.countDown();
|
||||
putService.shutdown();
|
||||
putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
|
||||
try (Table table = con.getTable(tableName)) {
|
||||
Get g0 = new Get(row);
|
||||
Get g1 = new Get(rowLocked);
|
||||
Result r0 = table.get(g0);
|
||||
Result r1 = table.get(g1);
|
||||
assertTrue(r0.isEmpty());
|
||||
assertFalse(r1.isEmpty());
|
||||
assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
|
||||
}
|
||||
assertNoLocks(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
|
||||
HRegion region = (HRegion) find(tableName);
|
||||
assertEquals(0, region.getLockedRows().size());
|
||||
}
|
||||
private static Region find(final TableName tableName)
|
||||
throws IOException, InterruptedException {
|
||||
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||
List<Region> regions = rs.getOnlineRegions(tableName);
|
||||
assertEquals(1, regions.size());
|
||||
Region region = regions.get(0);
|
||||
return regions.get(0);
|
||||
}
|
||||
|
||||
private static <T extends RegionObserver> T find(final TableName tableName,
|
||||
Class<T> clz) throws IOException, InterruptedException {
|
||||
Region region = find(tableName);
|
||||
Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
|
||||
assertTrue("The cp instance should be " + clz.getName()
|
||||
+ ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
|
||||
return clz.cast(cp);
|
||||
}
|
||||
|
||||
public static class WatiingForMultiMutationsObserver extends BaseRegionObserver {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
@Override
|
||||
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException ex) {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class WatiingForScanObserver extends BaseRegionObserver {
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@Override
|
||||
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue