From 81ffd6a13e866e920de6705ba12aa59a56115c60 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Mon, 10 Jul 2017 09:25:47 +0800 Subject: [PATCH] HBASE-18342 Add coprocessor service support for async admin --- .../hadoop/hbase/client/AsyncAdmin.java | 49 +++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 16 ++ .../client/AsyncRpcRetryingCallerFactory.java | 63 ++++++- .../AsyncServerRequestRpcRetryingCaller.java | 79 +++++++++ .../MasterCoprocessorRpcChannelImpl.java | 86 +++++++++ .../hbase/client/RawAsyncHBaseAdmin.java | 52 +++++- ...RegionServerCoprocessorRpcChannelImpl.java | 86 +++++++++ .../TestAsyncCoprocessorEndpoint.java | 167 ++++++++++++++++++ 8 files changed, 596 insertions(+), 2 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 7d904b39fc8..1adf3538002 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.regex.Pattern; import org.apache.hadoop.hbase.ClusterStatus; @@ -36,12 +37,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.procedure2.LockInfo; import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaSettings; +import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Pair; +import com.google.protobuf.RpcChannel; + /** * The asynchronous administrative API for HBase. *

@@ -1060,4 +1064,49 @@ public interface AsyncAdmin { * {@link CompletableFuture} */ CompletableFuture runCatalogJanitor(); + + /** + * Execute the given coprocessor call on the master. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * 
+   * channel -> xxxService.newStub(channel)
+   * 
+   * 
+ * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link CoprocessorCallable} for more details. + * @param the type of the asynchronous stub + * @param the type of the return value + * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. + * @see CoprocessorCallable + */ + CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable); + + /** + * Execute the given coprocessor call on the given region server. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * 
+   * channel -> xxxService.newStub(channel)
+   * 
+   * 
+ * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link CoprocessorCallable} for more details. + * @param serverName the given region server + * @param the type of the asynchronous stub + * @param the type of the return value + * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. + * @see CoprocessorCallable + */ + CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, ServerName serverName); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 8e5a28c7331..ed7ac4d2cb9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.procedure2.LockInfo; @@ -45,6 +47,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Pair; +import com.google.protobuf.RpcChannel; + /** * The implementation of AsyncAdmin. */ @@ -617,4 +621,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture runCatalogJanitor() { return wrap(rawAdmin.runCatalogJanitor()); } + + @Override + public CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable) { + return wrap(rawAdmin.coprocessorService(stubMaker, callable)); + } + + @Override + public CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, ServerName serverName) { + return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName)); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 270f265be1a..0ee3b52c1db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -378,7 +378,7 @@ class AsyncRpcRetryingCallerFactory { return new MasterRequestCallerBuilder<>(); } - public class AdminRequestCallerBuilder extends BuilderBase{ + public class AdminRequestCallerBuilder extends BuilderBase { // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc. private AsyncAdminRequestRetryingCaller.Callable callable; @@ -438,4 +438,65 @@ class AsyncRpcRetryingCallerFactory { public AdminRequestCallerBuilder adminRequest(){ return new AdminRequestCallerBuilder<>(); } + + public class ServerRequestCallerBuilder extends BuilderBase { + + private AsyncServerRequestRpcRetryingCaller.Callable callable; + + private long operationTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + private ServerName serverName; + + public ServerRequestCallerBuilder action( + AsyncServerRequestRpcRetryingCaller.Callable callable) { + this.callable = callable; + return this; + } + + public ServerRequestCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(operationTimeout); + return this; + } + + public ServerRequestCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public ServerRequestCallerBuilder pause(long pause, TimeUnit unit) { + this.pauseNs = unit.toNanos(pause); + return this; + } + + public ServerRequestCallerBuilder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public ServerRequestCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } + + public ServerRequestCallerBuilder serverName(ServerName serverName) { + this.serverName = serverName; + return this; + } + + public AsyncServerRequestRpcRetryingCaller build() { + return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, serverName, checkNotNull(callable, + "action is null")); + } + + public CompletableFuture call() { + return build().call(); + } + } + + public ServerRequestCallerBuilder serverRequest() { + return new ServerRequestCallerBuilder<>(); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java new file mode 100644 index 00000000000..72241ea2571 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.HashedWheelTimer; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; + +/** + * Retry caller for a request call to region server. + * Now only used for coprocessor call to region server. + */ +@InterfaceAudience.Private +public class AsyncServerRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { + + @FunctionalInterface + public interface Callable { + CompletableFuture call(HBaseRpcController controller, ClientService.Interface stub); + } + + private final Callable callable; + private ServerName serverName; + + public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt, ServerName serverName, Callable callable) { + super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); + this.serverName = serverName; + this.callable = callable; + } + + @Override + protected void doCall() { + ClientService.Interface stub; + try { + stub = this.conn.getRegionServerStub(serverName); + } catch (IOException e) { + onError(e, () -> "Get async admin stub to " + serverName + " failed", err -> { + }); + return; + } + resetCallTimeout(); + callable.call(controller, stub).whenComplete((result, error) -> { + if (error != null) { + onError(error, () -> "Call to admin stub failed", err -> { + }); + return; + } + future.complete(result); + }); + } + + CompletableFuture call() { + doCall(); + return future; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java new file mode 100644 index 00000000000..fcea508c422 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; + +/** + * The implementation of a master based coprocessor rpc channel. + */ +@InterfaceAudience.Private +class MasterCoprocessorRpcChannelImpl implements RpcChannel { + + MasterRequestCallerBuilder callerBuilder; + + MasterCoprocessorRpcChannelImpl(MasterRequestCallerBuilder callerBuilder) { + this.callerBuilder = callerBuilder; + } + + private CompletableFuture rpcCall(MethodDescriptor method, Message request, + Message responsePrototype, HBaseRpcController controller, MasterService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + CoprocessorServiceRequest csr = + CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request); + stub.execMasterService( + controller, + csr, + new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback() { + + @Override + public void run(CoprocessorServiceResponse resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + return future; + } + + @Override + public void callMethod(MethodDescriptor method, RpcController controller, Message request, + Message responsePrototype, RpcCallback done) { + callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call() + .whenComplete(((r, e) -> { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); + })); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 0271a50198a..a87f1959ac9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -34,10 +34,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Message; +import com.google.protobuf.RpcChannel; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; +import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; import org.apache.hadoop.hbase.client.replication.TableCFs; @@ -239,7 +244,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.*; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; @@ -2838,4 +2843,49 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) .call(); } + + @Override + public CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable) { + MasterCoprocessorRpcChannelImpl channel = + new MasterCoprocessorRpcChannelImpl(this. newMasterCaller()); + S stub = stubMaker.apply(channel); + CompletableFuture future = new CompletableFuture<>(); + ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); + callable.call(stub, controller, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + }); + return future; + } + + @Override + public CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, ServerName serverName) { + RegionServerCoprocessorRpcChannelImpl channel = + new RegionServerCoprocessorRpcChannelImpl(this. newServerCaller().serverName( + serverName)); + S stub = stubMaker.apply(channel); + CompletableFuture future = new CompletableFuture<>(); + ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); + callable.call(stub, controller, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + }); + return future; + } + + private ServerRequestCallerBuilder newServerCaller() { + return this.connection.callerFactory. serverRequest() + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java new file mode 100644 index 00000000000..610eb603ee9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; + +/** + * The implementation of a region server based coprocessor rpc channel. + */ +@InterfaceAudience.Private +public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel { + + ServerRequestCallerBuilder callerBuilder; + + RegionServerCoprocessorRpcChannelImpl(ServerRequestCallerBuilder callerBuilder) { + this.callerBuilder = callerBuilder; + } + + private CompletableFuture rpcCall(MethodDescriptor method, Message request, + Message responsePrototype, HBaseRpcController controller, ClientService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + CoprocessorServiceRequest csr = + CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request); + stub.execRegionServerService( + controller, + csr, + new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback() { + + @Override + public void run(CoprocessorServiceResponse resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + return future; + } + + @Override + public void callMethod(MethodDescriptor method, RpcController controller, Message request, + Message responsePrototype, RpcCallback done) { + callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call() + .whenComplete(((r, e) -> { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); + })); + } +} \ No newline at end of file diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java new file mode 100644 index 00000000000..16fb03c378d --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TestAsyncAdminBase; +import org.apache.hadoop.hbase.coprocessor.TestRegionServerCoprocessorEndpoint.DummyRegionServerEndpoint; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +@RunWith(Parameterized.class) +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase { + + private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt"); + private static final String DUMMY_VALUE = "val"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + DummyRegionServerEndpoint.class.getName()); + TEST_UTIL.startMiniCluster(2); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @Test + public void testMasterCoprocessorService() throws Exception { + TestProtos.EchoRequestProto request = + TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); + TestProtos.EchoResponseProto response = + admin + . coprocessorService( + TestRpcServiceProtos.TestProtobufRpcProto::newStub, + (s, c, done) -> s.echo(c, request, done)).get(); + assertEquals("hello", response.getMessage()); + } + + @Test + public void testMasterCoprocessorError() throws Exception { + TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance(); + try { + admin + . coprocessorService( + TestRpcServiceProtos.TestProtobufRpcProto::newStub, + (s, c, done) -> s.error(c, emptyRequest, done)).get(); + fail("Should have thrown an exception"); + } catch (Exception e) { + } + } + + @Test + public void testRegionServerCoprocessorService() throws Exception { + final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + DummyRegionServerEndpointProtos.DummyRequest request = + DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); + DummyRegionServerEndpointProtos.DummyResponse response = + admin + . coprocessorService( + DummyRegionServerEndpointProtos.DummyService::newStub, + (s, c, done) -> s.dummyCall(c, request, done), serverName).get(); + assertEquals(DUMMY_VALUE, response.getValue()); + } + + @Test + public void testRegionServerCoprocessorServiceError() throws Exception { + final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + DummyRegionServerEndpointProtos.DummyRequest request = + DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(); + try { + admin + . coprocessorService( + DummyRegionServerEndpointProtos.DummyService::newStub, + (s, c, done) -> s.dummyThrow(c, request, done), serverName).get(); + fail("Should have thrown an exception"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof RetriesExhaustedException); + assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim())); + } + } + + static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService { + + public DummyRegionServerEndpoint() { + } + + @Override + public Service getService() { + return this; + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } + + @Override + public void dummyCall(RpcController controller, DummyRequest request, + RpcCallback callback) { + callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build()); + } + + @Override + public void dummyThrow(RpcController controller, + DummyRequest request, + RpcCallback done) { + CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW); + } + } +}