diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 77aeb20c07e..22256df3a06 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -50,23 +50,22 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
import org.apache.hadoop.hbase.util.Bytes;
@@ -79,6 +78,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
/**
*
@@ -1735,56 +1735,41 @@ public class MetaTableAccessor {
private static void multiMutate(Connection connection, final Table table, byte[] row,
final List mutations) throws IOException {
debugLogMutations(mutations);
- // TODO: Need rollback!!!!
- // TODO: Need Retry!!!
- // TODO: What for a timeout? Default write timeout? GET FROM HTABLE?
- // TODO: Review when we come through with ProcedureV2.
- RegionServerCallable callable =
- new RegionServerCallable(
- connection, table.getName(), row, null/*RpcController not used in this CPEP!*/) {
- @Override
- protected MutateRowsResponse rpcCall() throws Exception {
- final MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
- for (Mutation mutation : mutations) {
- if (mutation instanceof Put) {
- builder.addMutationRequest(ProtobufUtil.toMutation(
- ClientProtos.MutationProto.MutationType.PUT, mutation));
- } else if (mutation instanceof Delete) {
- builder.addMutationRequest(ProtobufUtil.toMutation(
- ClientProtos.MutationProto.MutationType.DELETE, mutation));
- } else {
- throw new DoNotRetryIOException("multi in MetaEditor doesn't support "
- + mutation.getClass().getName());
- }
- }
- // The call to #prepare that ran before this invocation will have populated HRegionLocation.
- HRegionLocation hrl = getLocation();
- RegionSpecifier region = ProtobufUtil.buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, hrl.getRegion().getRegionName());
- builder.setRegion(region);
- // The rpcController here is awkward. The Coprocessor Endpoint wants an instance of a
- // com.google.protobuf but we are going over an rpc that is all shaded protobuf so it
- // wants a org.apache.h.h.shaded.com.google.protobuf.RpcController. Set up a factory
- // that makes com.google.protobuf.RpcController and then copy into it configs.
- return getStub().mutateRows(null, builder.build());
- }
+ Batch.Call callable =
+ new Batch.Call() {
- @Override
- // Called on the end of the super.prepare call. Set the stub.
- protected void setStubByServiceName(ServerName serviceName/*Ignored*/) throws IOException {
- CoprocessorRpcChannel channel = table.coprocessorService(getRow());
- setStub(MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel));
- }
- };
- int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- // The region location should be cached in connection. Call prepare so this callable picks
- // up the region location (see super.prepare method).
- callable.prepare(false);
- callable.call(writeTimeout);
+ @Override
+ public MutateRowsResponse call(MultiRowMutationService instance) throws IOException {
+ MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
+ for (Mutation mutation : mutations) {
+ if (mutation instanceof Put) {
+ builder.addMutationRequest(
+ ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation));
+ } else if (mutation instanceof Delete) {
+ builder.addMutationRequest(
+ ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation));
+ } else {
+ throw new DoNotRetryIOException(
+ "multi in MetaEditor doesn't support " + mutation.getClass().getName());
+ }
+ }
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<>();
+ instance.mutateRows(controller, builder.build(), rpcCallback);
+ MutateRowsResponse resp = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ return resp;
+ }
+ };
+ try {
+ table.coprocessorService(MultiRowMutationService.class, row, row, callable);
+ } catch (Throwable e) {
+ Throwables.propagateIfPossible(e, IOException.class);
+ throw new IOException(e);
+ }
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 8054bad7b98..3f69e798f09 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -172,7 +172,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* avoiding port contention if another local HBase instance is already running).
* To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
* setting it to true.
- * For triggering pre commit
*/
@InterfaceAudience.Public
@SuppressWarnings("deprecation")