HBASE-17131 Avoid livelock caused by HRegion#processRowsWithLocks (ChiaPing Tsai)

This commit is contained in:
tedyu 2016-11-20 18:17:21 -08:00
parent e30329e372
commit e8f056935b
2 changed files with 152 additions and 15 deletions

View File

@ -7188,28 +7188,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
boolean locked;
boolean locked = false;
boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
List<RowLock> acquiredRowLocks = null;
long addedSize = 0;
List<Mutation> mutations = new ArrayList<Mutation>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
WALKey walKey = null;
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLockInternal(row, false));
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
locked = true;
long now = EnvironmentEdgeManager.currentTime();
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes
acquiredRowLocks.add(getRowLockInternal(row, false));
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
locked = true;
long now = EnvironmentEdgeManager.currentTime();
// 4. Let the processor scan the rows, generate mutations and add
// waledits
doProcessRowWithTimeout(

View File

@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -28,16 +29,36 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static junit.framework.Assert.assertFalse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -486,4 +507,120 @@ public class TestFromClientSide3 {
assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
table.close();
}
@Test(timeout = 30000)
public void testMultiRowMutations() throws Exception, Throwable {
final TableName tableName = TableName.valueOf("testMultiRowMutations");
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
desc.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.getHBaseAdmin().createTable(desc);
// new a connection for lower retry number.
Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
try (Connection con = ConnectionFactory.createConnection(copy)) {
final byte[] row = Bytes.toBytes("ROW-0");
final byte[] rowLocked= Bytes.toBytes("ROW-1");
final byte[] value0 = Bytes.toBytes("VALUE-0");
final byte[] value1 = Bytes.toBytes("VALUE-1");
final byte[] value2 = Bytes.toBytes("VALUE-2");
assertNoLocks(tableName);
ExecutorService putService = Executors.newSingleThreadExecutor();
putService.execute(new Runnable() {
@Override
public void run() {
try (Table table = con.getTable(tableName)) {
Put put0 = new Put(rowLocked);
put0.addColumn(FAMILY, QUALIFIER, value0);
// the put will be blocked by WatiingForMultiMutationsObserver.
table.put(put0);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
});
ExecutorService cpService = Executors.newSingleThreadExecutor();
cpService.execute(new Runnable() {
@Override
public void run() {
Put put1 = new Put(row);
Put put2 = new Put(rowLocked);
put1.addColumn(FAMILY, QUALIFIER, value1);
put2.addColumn(FAMILY, QUALIFIER, value2);
try (Table table = con.getTable(tableName)) {
final MultiRowMutationProtos.MutateRowsRequest request
= MultiRowMutationProtos.MutateRowsRequest.newBuilder()
.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1))
.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2))
.build();
table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW,
new Batch.Call<MultiRowMutationProtos.MultiRowMutationService, MultiRowMutationProtos.MutateRowsResponse>() {
public MultiRowMutationProtos.MutateRowsResponse call(MultiRowMutationProtos.MultiRowMutationService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse> rpcCallback = new BlockingRpcCallback<>();
instance.mutateRows(controller, request, rpcCallback);
return rpcCallback.get();
}
});
fail("This cp should fail because the target lock is blocked by previous put");
} catch (Throwable ex) {
}
}
});
cpService.shutdown();
cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
observer.latch.countDown();
putService.shutdown();
putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
try (Table table = con.getTable(tableName)) {
Get g0 = new Get(row);
Get g1 = new Get(rowLocked);
Result r0 = table.get(g0);
Result r1 = table.get(g1);
assertTrue(r0.isEmpty());
assertFalse(r1.isEmpty());
assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
}
assertNoLocks(tableName);
}
}
private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
HRegion region = (HRegion) find(tableName);
assertEquals(0, region.getLockedRows().size());
}
private static Region find(final TableName tableName)
throws IOException, InterruptedException {
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
List<Region> regions = rs.getOnlineRegions(tableName);
assertEquals(1, regions.size());
return regions.get(0);
}
private static <T extends RegionObserver> T find(final TableName tableName,
Class<T> clz) throws IOException, InterruptedException {
Region region = find(tableName);
Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
assertTrue("The cp instance should be " + clz.getName()
+ ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
return clz.cast(cp);
}
public static class WatiingForMultiMutationsObserver extends BaseRegionObserver {
final CountDownLatch latch = new CountDownLatch(1);
@Override
public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
try {
latch.await();
} catch (InterruptedException ex) {
throw new IOException(ex);
}
}
}
}