diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 7d904b39fc8..1adf3538002 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.ClusterStatus;
@@ -36,12 +37,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.LockInfo;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.util.Pair;
+import com.google.protobuf.RpcChannel;
+
/**
* The asynchronous administrative API for HBase.
*
@@ -1060,4 +1064,49 @@ public interface AsyncAdmin {
* {@link CompletableFuture}
*/
CompletableFuture runCatalogJanitor();
+
+ /**
+ * Execute the given coprocessor call on the master.
+ *
+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
+ * one line lambda expression, like:
+ *
+ *
+ *
+ * channel -> xxxService.newStub(channel)
+ *
+ *
+ * @param stubMaker a delegation to the actual {@code newStub} call.
+ * @param callable a delegation to the actual protobuf rpc call. See the comment of
+ * {@link CoprocessorCallable} for more details.
+ * @param the type of the asynchronous stub
+ * @param the type of the return value
+ * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
+ * @see CoprocessorCallable
+ */
+ CompletableFuture coprocessorService(Function stubMaker,
+ CoprocessorCallable callable);
+
+ /**
+ * Execute the given coprocessor call on the given region server.
+ *
+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
+ * one line lambda expression, like:
+ *
+ *
+ *
+ * channel -> xxxService.newStub(channel)
+ *
+ *
+ * @param stubMaker a delegation to the actual {@code newStub} call.
+ * @param callable a delegation to the actual protobuf rpc call. See the comment of
+ * {@link CoprocessorCallable} for more details.
+ * @param serverName the given region server
+ * @param the type of the asynchronous stub
+ * @param the type of the return value
+ * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
+ * @see CoprocessorCallable
+ */
+ CompletableFuture coprocessorService(Function stubMaker,
+ CoprocessorCallable callable, ServerName serverName);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 8e5a28c7331..ed7ac4d2cb9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.procedure2.LockInfo;
@@ -45,6 +47,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.util.Pair;
+import com.google.protobuf.RpcChannel;
+
/**
* The implementation of AsyncAdmin.
*/
@@ -617,4 +621,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture runCatalogJanitor() {
return wrap(rawAdmin.runCatalogJanitor());
}
+
+ @Override
+ public CompletableFuture coprocessorService(Function stubMaker,
+ CoprocessorCallable callable) {
+ return wrap(rawAdmin.coprocessorService(stubMaker, callable));
+ }
+
+ @Override
+ public CompletableFuture coprocessorService(Function stubMaker,
+ CoprocessorCallable callable, ServerName serverName) {
+ return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName));
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 270f265be1a..0ee3b52c1db 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -378,7 +378,7 @@ class AsyncRpcRetryingCallerFactory {
return new MasterRequestCallerBuilder<>();
}
- public class AdminRequestCallerBuilder extends BuilderBase{
+ public class AdminRequestCallerBuilder extends BuilderBase {
// TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
private AsyncAdminRequestRetryingCaller.Callable callable;
@@ -438,4 +438,65 @@ class AsyncRpcRetryingCallerFactory {
public AdminRequestCallerBuilder adminRequest(){
return new AdminRequestCallerBuilder<>();
}
+
+ public class ServerRequestCallerBuilder extends BuilderBase {
+
+ private AsyncServerRequestRpcRetryingCaller.Callable callable;
+
+ private long operationTimeoutNs = -1L;
+
+ private long rpcTimeoutNs = -1L;
+
+ private ServerName serverName;
+
+ public ServerRequestCallerBuilder action(
+ AsyncServerRequestRpcRetryingCaller.Callable callable) {
+ this.callable = callable;
+ return this;
+ }
+
+ public ServerRequestCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
+ this.operationTimeoutNs = unit.toNanos(operationTimeout);
+ return this;
+ }
+
+ public ServerRequestCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
+ this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+ return this;
+ }
+
+ public ServerRequestCallerBuilder pause(long pause, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(pause);
+ return this;
+ }
+
+ public ServerRequestCallerBuilder maxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ public ServerRequestCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ return this;
+ }
+
+ public ServerRequestCallerBuilder serverName(ServerName serverName) {
+ this.serverName = serverName;
+ return this;
+ }
+
+ public AsyncServerRequestRpcRetryingCaller build() {
+ return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, maxAttempts,
+ operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, serverName, checkNotNull(callable,
+ "action is null"));
+ }
+
+ public CompletableFuture call() {
+ return build().call();
+ }
+ }
+
+ public ServerRequestCallerBuilder serverRequest() {
+ return new ServerRequestCallerBuilder<>();
+ }
}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
new file mode 100644
index 00000000000..72241ea2571
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+
+/**
+ * Retry caller for a request call to region server.
+ * Now only used for coprocessor call to region server.
+ */
+@InterfaceAudience.Private
+public class AsyncServerRequestRpcRetryingCaller extends AsyncRpcRetryingCaller {
+
+ @FunctionalInterface
+ public interface Callable {
+ CompletableFuture call(HBaseRpcController controller, ClientService.Interface stub);
+ }
+
+ private final Callable callable;
+ private ServerName serverName;
+
+ public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
+ int startLogErrorsCnt, ServerName serverName, Callable callable) {
+ super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
+ this.serverName = serverName;
+ this.callable = callable;
+ }
+
+ @Override
+ protected void doCall() {
+ ClientService.Interface stub;
+ try {
+ stub = this.conn.getRegionServerStub(serverName);
+ } catch (IOException e) {
+ onError(e, () -> "Get async admin stub to " + serverName + " failed", err -> {
+ });
+ return;
+ }
+ resetCallTimeout();
+ callable.call(controller, stub).whenComplete((result, error) -> {
+ if (error != null) {
+ onError(error, () -> "Call to admin stub failed", err -> {
+ });
+ return;
+ }
+ future.complete(result);
+ });
+ }
+
+ CompletableFuture call() {
+ doCall();
+ return future;
+ }
+}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
new file mode 100644
index 00000000000..fcea508c422
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+/**
+ * The implementation of a master based coprocessor rpc channel.
+ */
+@InterfaceAudience.Private
+class MasterCoprocessorRpcChannelImpl implements RpcChannel {
+
+ MasterRequestCallerBuilder callerBuilder;
+
+ MasterCoprocessorRpcChannelImpl(MasterRequestCallerBuilder callerBuilder) {
+ this.callerBuilder = callerBuilder;
+ }
+
+ private CompletableFuture rpcCall(MethodDescriptor method, Message request,
+ Message responsePrototype, HBaseRpcController controller, MasterService.Interface stub) {
+ CompletableFuture future = new CompletableFuture<>();
+ CoprocessorServiceRequest csr =
+ CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
+ stub.execMasterService(
+ controller,
+ csr,
+ new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback() {
+
+ @Override
+ public void run(CoprocessorServiceResponse resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+ Message responsePrototype, RpcCallback done) {
+ callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
+ .whenComplete(((r, e) -> {
+ if (e != null) {
+ ((ClientCoprocessorRpcController) controller).setFailed(e);
+ }
+ done.run(r);
+ }));
+ }
+}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 0271a50198a..a87f1959ac9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -34,10 +34,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
@@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.client.replication.TableCFs;
@@ -239,7 +244,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -2838,4 +2843,49 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
(s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
.call();
}
+
+ @Override
+ public CompletableFuture coprocessorService(Function stubMaker,
+ CoprocessorCallable callable) {
+ MasterCoprocessorRpcChannelImpl channel =
+ new MasterCoprocessorRpcChannelImpl(this. newMasterCaller());
+ S stub = stubMaker.apply(channel);
+ CompletableFuture future = new CompletableFuture<>();
+ ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
+ callable.call(stub, controller, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture coprocessorService(Function stubMaker,
+ CoprocessorCallable callable, ServerName serverName) {
+ RegionServerCoprocessorRpcChannelImpl channel =
+ new RegionServerCoprocessorRpcChannelImpl(this. newServerCaller().serverName(
+ serverName));
+ S stub = stubMaker.apply(channel);
+ CompletableFuture future = new CompletableFuture<>();
+ ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
+ callable.call(stub, controller, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
+ }
+
+ private ServerRequestCallerBuilder newServerCaller() {
+ return this.connection.callerFactory. serverRequest()
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
+ }
}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
new file mode 100644
index 00000000000..610eb603ee9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+/**
+ * The implementation of a region server based coprocessor rpc channel.
+ */
+@InterfaceAudience.Private
+public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
+
+ ServerRequestCallerBuilder callerBuilder;
+
+ RegionServerCoprocessorRpcChannelImpl(ServerRequestCallerBuilder callerBuilder) {
+ this.callerBuilder = callerBuilder;
+ }
+
+ private CompletableFuture rpcCall(MethodDescriptor method, Message request,
+ Message responsePrototype, HBaseRpcController controller, ClientService.Interface stub) {
+ CompletableFuture future = new CompletableFuture<>();
+ CoprocessorServiceRequest csr =
+ CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
+ stub.execRegionServerService(
+ controller,
+ csr,
+ new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback() {
+
+ @Override
+ public void run(CoprocessorServiceResponse resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+ Message responsePrototype, RpcCallback done) {
+ callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
+ .whenComplete(((r, e) -> {
+ if (e != null) {
+ ((ClientCoprocessorRpcController) controller).setFailed(e);
+ }
+ done.run(r);
+ }));
+ }
+}
\ No newline at end of file
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
new file mode 100644
index 00000000000..16fb03c378d
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
+import org.apache.hadoop.hbase.coprocessor.TestRegionServerCoprocessorEndpoint.DummyRegionServerEndpoint;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+@RunWith(Parameterized.class)
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
+
+ private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
+ private static final String DUMMY_VALUE = "val";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ ProtobufCoprocessorService.class.getName());
+ TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+ DummyRegionServerEndpoint.class.getName());
+ TEST_UTIL.startMiniCluster(2);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ }
+
+ @Test
+ public void testMasterCoprocessorService() throws Exception {
+ TestProtos.EchoRequestProto request =
+ TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+ TestProtos.EchoResponseProto response =
+ admin
+ . coprocessorService(
+ TestRpcServiceProtos.TestProtobufRpcProto::newStub,
+ (s, c, done) -> s.echo(c, request, done)).get();
+ assertEquals("hello", response.getMessage());
+ }
+
+ @Test
+ public void testMasterCoprocessorError() throws Exception {
+ TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance();
+ try {
+ admin
+ . coprocessorService(
+ TestRpcServiceProtos.TestProtobufRpcProto::newStub,
+ (s, c, done) -> s.error(c, emptyRequest, done)).get();
+ fail("Should have thrown an exception");
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testRegionServerCoprocessorService() throws Exception {
+ final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+ DummyRegionServerEndpointProtos.DummyRequest request =
+ DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
+ DummyRegionServerEndpointProtos.DummyResponse response =
+ admin
+ . coprocessorService(
+ DummyRegionServerEndpointProtos.DummyService::newStub,
+ (s, c, done) -> s.dummyCall(c, request, done), serverName).get();
+ assertEquals(DUMMY_VALUE, response.getValue());
+ }
+
+ @Test
+ public void testRegionServerCoprocessorServiceError() throws Exception {
+ final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+ DummyRegionServerEndpointProtos.DummyRequest request =
+ DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
+ try {
+ admin
+ . coprocessorService(
+ DummyRegionServerEndpointProtos.DummyService::newStub,
+ (s, c, done) -> s.dummyThrow(c, request, done), serverName).get();
+ fail("Should have thrown an exception");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof RetriesExhaustedException);
+ assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim()));
+ }
+ }
+
+ static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
+
+ public DummyRegionServerEndpoint() {
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ }
+
+ @Override
+ public void dummyCall(RpcController controller, DummyRequest request,
+ RpcCallback callback) {
+ callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
+ }
+
+ @Override
+ public void dummyThrow(RpcController controller,
+ DummyRequest request,
+ RpcCallback done) {
+ CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW);
+ }
+ }
+}