diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4972fd3c094..8f8add82003 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7188,28 +7188,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - boolean locked; + boolean locked = false; boolean walSyncSuccessful = false; - List acquiredRowLocks; + List acquiredRowLocks = null; long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); long mvccNum = 0; WALKey walKey = null; try { - // 2. Acquire the row lock(s) - acquiredRowLocks = new ArrayList(rowsToLock.size()); - for (byte[] row : rowsToLock) { - // Attempt to lock all involved rows, throw if any lock times out - // use a writer lock for mixed reads and writes - acquiredRowLocks.add(getRowLockInternal(row, false)); - } - // 3. Region lock - lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); - locked = true; - - long now = EnvironmentEdgeManager.currentTime(); try { + // 2. Acquire the row lock(s) + acquiredRowLocks = new ArrayList(rowsToLock.size()); + for (byte[] row : rowsToLock) { + // Attempt to lock all involved rows, throw if any lock times out + // use a writer lock for mixed reads and writes + acquiredRowLocks.add(getRowLockInternal(row, false)); + } + // 3. Region lock + lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); + locked = true; + + long now = EnvironmentEdgeManager.currentTime(); // 4. Let the processor scan the rows, generate mutations and add // waledits doProcessRowWithTimeout( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 09c7e863ee5..08ccc4289b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -28,16 +29,36 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import static junit.framework.Assert.assertFalse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -486,4 +507,120 @@ public class TestFromClientSide3 { assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); table.close(); } + + @Test(timeout = 30000) + public void testMultiRowMutations() throws Exception, Throwable { + final TableName tableName = TableName.valueOf("testMultiRowMutations"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); + desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName()); + desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + TEST_UTIL.getHBaseAdmin().createTable(desc); + // new a connection for lower retry number. + Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); + copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + try (Connection con = ConnectionFactory.createConnection(copy)) { + final byte[] row = Bytes.toBytes("ROW-0"); + final byte[] rowLocked= Bytes.toBytes("ROW-1"); + final byte[] value0 = Bytes.toBytes("VALUE-0"); + final byte[] value1 = Bytes.toBytes("VALUE-1"); + final byte[] value2 = Bytes.toBytes("VALUE-2"); + assertNoLocks(tableName); + ExecutorService putService = Executors.newSingleThreadExecutor(); + putService.execute(new Runnable() { + @Override + public void run() { + try (Table table = con.getTable(tableName)) { + Put put0 = new Put(rowLocked); + put0.addColumn(FAMILY, QUALIFIER, value0); + // the put will be blocked by WatiingForMultiMutationsObserver. + table.put(put0); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + }); + ExecutorService cpService = Executors.newSingleThreadExecutor(); + cpService.execute(new Runnable() { + @Override + public void run() { + Put put1 = new Put(row); + Put put2 = new Put(rowLocked); + put1.addColumn(FAMILY, QUALIFIER, value1); + put2.addColumn(FAMILY, QUALIFIER, value2); + try (Table table = con.getTable(tableName)) { + final MultiRowMutationProtos.MutateRowsRequest request + = MultiRowMutationProtos.MutateRowsRequest.newBuilder() + .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1)) + .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2)) + .build(); + table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW, + new Batch.Call() { + public MultiRowMutationProtos.MutateRowsResponse call(MultiRowMutationProtos.MultiRowMutationService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>(); + instance.mutateRows(controller, request, rpcCallback); + return rpcCallback.get(); + } + }); + fail("This cp should fail because the target lock is blocked by previous put"); + } catch (Throwable ex) { + } + } + }); + cpService.shutdown(); + cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class); + observer.latch.countDown(); + putService.shutdown(); + putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + try (Table table = con.getTable(tableName)) { + Get g0 = new Get(row); + Get g1 = new Get(rowLocked); + Result r0 = table.get(g0); + Result r1 = table.get(g1); + assertTrue(r0.isEmpty()); + assertFalse(r1.isEmpty()); + assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); + } + assertNoLocks(tableName); + } + } + + private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException { + HRegion region = (HRegion) find(tableName); + assertEquals(0, region.getLockedRows().size()); + } + private static Region find(final TableName tableName) + throws IOException, InterruptedException { + HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); + List regions = rs.getOnlineRegions(tableName); + assertEquals(1, regions.size()); + return regions.get(0); + } + + private static T find(final TableName tableName, + Class clz) throws IOException, InterruptedException { + Region region = find(tableName); + Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); + assertTrue("The cp instance should be " + clz.getName() + + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); + return clz.cast(cp); + } + + public static class WatiingForMultiMutationsObserver extends BaseRegionObserver { + final CountDownLatch latch = new CountDownLatch(1); + @Override + public void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + try { + latch.await(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + } }