diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 77aeb20c07e..22256df3a06 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -50,23 +50,22 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.util.Bytes; @@ -79,6 +78,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; /** *

@@ -1735,56 +1735,41 @@ public class MetaTableAccessor { private static void multiMutate(Connection connection, final Table table, byte[] row, final List mutations) throws IOException { debugLogMutations(mutations); - // TODO: Need rollback!!!! - // TODO: Need Retry!!! - // TODO: What for a timeout? Default write timeout? GET FROM HTABLE? - // TODO: Review when we come through with ProcedureV2. - RegionServerCallable callable = - new RegionServerCallable( - connection, table.getName(), row, null/*RpcController not used in this CPEP!*/) { - @Override - protected MutateRowsResponse rpcCall() throws Exception { - final MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder(); - for (Mutation mutation : mutations) { - if (mutation instanceof Put) { - builder.addMutationRequest(ProtobufUtil.toMutation( - ClientProtos.MutationProto.MutationType.PUT, mutation)); - } else if (mutation instanceof Delete) { - builder.addMutationRequest(ProtobufUtil.toMutation( - ClientProtos.MutationProto.MutationType.DELETE, mutation)); - } else { - throw new DoNotRetryIOException("multi in MetaEditor doesn't support " - + mutation.getClass().getName()); - } - } - // The call to #prepare that ran before this invocation will have populated HRegionLocation. - HRegionLocation hrl = getLocation(); - RegionSpecifier region = ProtobufUtil.buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, hrl.getRegion().getRegionName()); - builder.setRegion(region); - // The rpcController here is awkward. The Coprocessor Endpoint wants an instance of a - // com.google.protobuf but we are going over an rpc that is all shaded protobuf so it - // wants a org.apache.h.h.shaded.com.google.protobuf.RpcController. Set up a factory - // that makes com.google.protobuf.RpcController and then copy into it configs. - return getStub().mutateRows(null, builder.build()); - } + Batch.Call callable = + new Batch.Call() { - @Override - // Called on the end of the super.prepare call. Set the stub. - protected void setStubByServiceName(ServerName serviceName/*Ignored*/) throws IOException { - CoprocessorRpcChannel channel = table.coprocessorService(getRow()); - setStub(MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel)); - } - }; - int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - // The region location should be cached in connection. Call prepare so this callable picks - // up the region location (see super.prepare method). - callable.prepare(false); - callable.call(writeTimeout); + @Override + public MutateRowsResponse call(MultiRowMutationService instance) throws IOException { + MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder(); + for (Mutation mutation : mutations) { + if (mutation instanceof Put) { + builder.addMutationRequest( + ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation)); + } else if (mutation instanceof Delete) { + builder.addMutationRequest( + ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation)); + } else { + throw new DoNotRetryIOException( + "multi in MetaEditor doesn't support " + mutation.getClass().getName()); + } + } + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<>(); + instance.mutateRows(controller, builder.build(), rpcCallback); + MutateRowsResponse resp = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return resp; + } + }; + try { + table.coprocessorService(MultiRowMutationService.class, row, row, callable); + } catch (Throwable e) { + Throwables.propagateIfPossible(e, IOException.class); + throw new IOException(e); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 92ea58ea8e5..74d168cc49b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -172,7 +172,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; * avoiding port contention if another local HBase instance is already running). *

To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" * setting it to true. - * For triggering pre commit */ @InterfaceAudience.Public @SuppressWarnings("deprecation")