HBASE-26002 MultiRowMutationEndpoint should return the result of the conditional update (#3386)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Toshihiro Suzuki 2021-06-15 13:55:21 +09:00
parent 5159eaec12
commit b061d477a8
3 changed files with 29 additions and 12 deletions

View File

@ -41,6 +41,7 @@ message MutateRowsRequest {
} }
message MutateRowsResponse { message MutateRowsResponse {
optional bool processed = 1;
} }
service MultiRowMutationService { service MultiRowMutationService {

View File

@ -96,7 +96,10 @@ import com.google.protobuf.Service;
* MultiRowMutationService.BlockingInterface service = * MultiRowMutationService.BlockingInterface service =
* MultiRowMutationService.newBlockingStub(channel); * MultiRowMutationService.newBlockingStub(channel);
* MutateRowsRequest mrm = mrmBuilder.build(); * MutateRowsRequest mrm = mrmBuilder.build();
* service.mutateRows(null, mrm); * MutateRowsResponse response = service.mutateRows(null, mrm);
*
* // We can get the result of the conditional update
* boolean processed = response.getProcessed();
* </code> * </code>
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@ -109,8 +112,7 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
@Override @Override
public void mutateRows(RpcController controller, MutateRowsRequest request, public void mutateRows(RpcController controller, MutateRowsRequest request,
RpcCallback<MutateRowsResponse> done) { RpcCallback<MutateRowsResponse> done) {
MutateRowsResponse response = MutateRowsResponse.getDefaultInstance(); boolean matches = true;
List<Region.RowLock> rowLocks = null; List<Region.RowLock> rowLocks = null;
try { try {
// set of rows to lock, sorted to avoid deadlocks // set of rows to lock, sorted to avoid deadlocks
@ -141,7 +143,6 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
rowsToLock.add(m.getRow()); rowsToLock.add(m.getRow());
} }
boolean matches = true;
if (request.getConditionCount() > 0) { if (request.getConditionCount() > 0) {
// Get row locks for the mutations and the conditions // Get row locks for the mutations and the conditions
rowLocks = new ArrayList<>(); rowLocks = new ArrayList<>();
@ -184,7 +185,7 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
} }
} }
} }
done.run(response); done.run(MutateRowsResponse.newBuilder().setProcessed(matches).build());
} }
private boolean matches(Region region, ClientProtos.Condition condition) throws IOException { private boolean matches(Region region, ClientProtos.Condition condition) throws IOException {

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -299,9 +300,11 @@ public class TestFromClientSide5 extends FromClientSideBase {
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build()); MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build());
// Assert // Assert
assertTrue(response.getProcessed());
Result r = t.get(new Get(ROW)); Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
@ -347,9 +350,11 @@ public class TestFromClientSide5 extends FromClientSideBase {
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build()); MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build());
// Assert // Assert
assertTrue(response.getProcessed());
Result r = t.get(new Get(ROW)); Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
@ -391,9 +396,11 @@ public class TestFromClientSide5 extends FromClientSideBase {
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build()); MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build());
// Assert // Assert
assertFalse(response.getProcessed());
Result r = t.get(new Get(ROW)); Result r = t.get(new Get(ROW));
assertTrue(r.isEmpty()); assertTrue(r.isEmpty());
@ -437,9 +444,11 @@ public class TestFromClientSide5 extends FromClientSideBase {
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build()); MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build());
// Assert // Assert
assertTrue(response.getProcessed());
Result r = t.get(new Get(ROW)); Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
@ -483,9 +492,11 @@ public class TestFromClientSide5 extends FromClientSideBase {
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build()); MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build());
// Assert // Assert
assertFalse(response.getProcessed());
Result r = t.get(new Get(ROW)); Result r = t.get(new Get(ROW));
assertTrue(r.isEmpty()); assertTrue(r.isEmpty());
@ -531,9 +542,11 @@ public class TestFromClientSide5 extends FromClientSideBase {
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build()); MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build());
// Assert // Assert
assertTrue(response.getProcessed());
Result r = t.get(new Get(ROW)); Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
@ -579,9 +592,11 @@ public class TestFromClientSide5 extends FromClientSideBase {
CoprocessorRpcChannel channel = t.coprocessorService(ROW); CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel); MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build()); MutateRowsResponse response = service.mutateRows(null, mrmBuilder.build());
// Assert // Assert
assertFalse(response.getProcessed());
Result r = t.get(new Get(ROW)); Result r = t.get(new Get(ROW));
assertTrue(r.isEmpty()); assertTrue(r.isEmpty());