From b061d477a8f1206658d1f584c547e3a2322fe280 Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Tue, 15 Jun 2021 13:55:21 +0900 Subject: [PATCH] HBASE-26002 MultiRowMutationEndpoint should return the result of the conditional update (#3386) Signed-off-by: Duo Zhang --- .../src/main/protobuf/MultiRowMutation.proto | 1 + .../coprocessor/MultiRowMutationEndpoint.java | 11 +++---- .../hbase/client/TestFromClientSide5.java | 29 ++++++++++++++----- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto index 571e633c15b..1c0bbf7f40c 100644 --- a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto +++ b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto @@ -41,6 +41,7 @@ message MutateRowsRequest { } message MutateRowsResponse { + optional bool processed = 1; } service MultiRowMutationService { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java index c840d545168..cd2fdf6467c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -96,7 +96,10 @@ import com.google.protobuf.Service; * MultiRowMutationService.BlockingInterface service = * MultiRowMutationService.newBlockingStub(channel); * MutateRowsRequest mrm = mrmBuilder.build(); - * service.mutateRows(null, mrm); + * MutateRowsResponse response = service.mutateRows(null, mrm); + * + * // We can get the result of the conditional update + * boolean processed = response.getProcessed(); * */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @@ -109,8 +112,7 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements @Override public void mutateRows(RpcController controller, MutateRowsRequest request, RpcCallback done) { - MutateRowsResponse response = MutateRowsResponse.getDefaultInstance(); - + boolean matches = true; List rowLocks = null; try { // set of rows to lock, sorted to avoid deadlocks @@ -141,7 +143,6 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements rowsToLock.add(m.getRow()); } - boolean matches = true; if (request.getConditionCount() > 0) { // Get row locks for the mutations and the conditions rowLocks = new ArrayList<>(); @@ -184,7 +185,7 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements } } } - done.run(response); + done.run(MutateRowsResponse.newBuilder().setProcessed(matches).build()); } private boolean matches(Region region, ClientProtos.Condition condition) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java index 809fd2ad291..7c3206d42e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; @@ -299,9 +300,11 @@ public class TestFromClientSide5 extends FromClientSideBase { CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrmBuilder.build()); + MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build()); // Assert + assertTrue(response.getProcessed()); + Result r = t.get(new Get(ROW)); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); @@ -347,9 +350,11 @@ public class TestFromClientSide5 extends FromClientSideBase { CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrmBuilder.build()); + MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build()); // Assert + assertTrue(response.getProcessed()); + Result r = t.get(new Get(ROW)); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); @@ -391,9 +396,11 @@ public class TestFromClientSide5 extends FromClientSideBase { CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrmBuilder.build()); + MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build()); // Assert + assertFalse(response.getProcessed()); + Result r = t.get(new Get(ROW)); assertTrue(r.isEmpty()); @@ -437,9 +444,11 @@ public class TestFromClientSide5 extends FromClientSideBase { CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrmBuilder.build()); + MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build()); // Assert + assertTrue(response.getProcessed()); + Result r = t.get(new Get(ROW)); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); @@ -483,9 +492,11 @@ public class TestFromClientSide5 extends FromClientSideBase { CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrmBuilder.build()); + MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build()); // Assert + assertFalse(response.getProcessed()); + Result r = t.get(new Get(ROW)); assertTrue(r.isEmpty()); @@ -531,9 +542,11 @@ public class TestFromClientSide5 extends FromClientSideBase { CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrmBuilder.build()); + MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build()); // Assert + assertTrue(response.getProcessed()); + Result r = t.get(new Get(ROW)); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); @@ -579,9 +592,11 @@ public class TestFromClientSide5 extends FromClientSideBase { CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrmBuilder.build()); + MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build()); // Assert + assertFalse(response.getProcessed()); + Result r = t.get(new Get(ROW)); assertTrue(r.isEmpty());