HBASE-18342 Add coprocessor service support for async admin

This commit is contained in:
Guanghao Zhang 2017-07-10 09:25:47 +08:00
parent 9e0f450c0c
commit 81ffd6a13e
8 changed files with 596 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.ClusterStatus;
@ -36,12 +37,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.LockInfo;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.RpcChannel;
/**
* The asynchronous administrative API for HBase.
* <p>
@ -1060,4 +1064,49 @@ public interface AsyncAdmin {
* {@link CompletableFuture}
*/
CompletableFuture<Integer> runCatalogJanitor();
/**
* Execute the given coprocessor call on the master.
* <p>
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
* one line lambda expression, like:
*
* <pre>
* <code>
* channel -> xxxService.newStub(channel)
* </code>
* </pre>
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link CoprocessorCallable} for more details.
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
* @see CoprocessorCallable
*/
<S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable);
/**
* Execute the given coprocessor call on the given region server.
* <p>
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
* one line lambda expression, like:
*
* <pre>
* <code>
* channel -> xxxService.newStub(channel)
* </code>
* </pre>
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link CoprocessorCallable} for more details.
* @param serverName the given region server
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
* @see CoprocessorCallable
*/
<S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, ServerName serverName);
}

View File

@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.procedure2.LockInfo;
@ -45,6 +47,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.RpcChannel;
/**
* The implementation of AsyncAdmin.
*/
@ -617,4 +621,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<Integer> runCatalogJanitor() {
return wrap(rawAdmin.runCatalogJanitor());
}
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable) {
return wrap(rawAdmin.coprocessorService(stubMaker, callable));
}
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, ServerName serverName) {
return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName));
}
}

View File

@ -378,7 +378,7 @@ class AsyncRpcRetryingCallerFactory {
return new MasterRequestCallerBuilder<>();
}
public class AdminRequestCallerBuilder<T> extends BuilderBase{
public class AdminRequestCallerBuilder<T> extends BuilderBase {
// TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
private AsyncAdminRequestRetryingCaller.Callable<T> callable;
@ -438,4 +438,65 @@ class AsyncRpcRetryingCallerFactory {
public <T> AdminRequestCallerBuilder<T> adminRequest(){
return new AdminRequestCallerBuilder<>();
}
public class ServerRequestCallerBuilder<T> extends BuilderBase {
private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
private long operationTimeoutNs = -1L;
private long rpcTimeoutNs = -1L;
private ServerName serverName;
public ServerRequestCallerBuilder<T> action(
AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(operationTimeout);
return this;
}
public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
return this;
}
public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
this.serverName = serverName;
return this;
}
public AsyncServerRequestRpcRetryingCaller<T> build() {
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, serverName, checkNotNull(callable,
"action is null"));
}
public CompletableFuture<T> call() {
return build().call();
}
}
public <T> ServerRequestCallerBuilder<T> serverRequest() {
return new ServerRequestCallerBuilder<>();
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
/**
* Retry caller for a request call to region server.
* Now only used for coprocessor call to region server.
*/
@InterfaceAudience.Private
public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
@FunctionalInterface
public interface Callable<T> {
CompletableFuture<T> call(HBaseRpcController controller, ClientService.Interface stub);
}
private final Callable<T> callable;
private ServerName serverName;
public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
@Override
protected void doCall() {
ClientService.Interface stub;
try {
stub = this.conn.getRegionServerStub(serverName);
} catch (IOException e) {
onError(e, () -> "Get async admin stub to " + serverName + " failed", err -> {
});
return;
}
resetCallTimeout();
callable.call(controller, stub).whenComplete((result, error) -> {
if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> {
});
return;
}
future.complete(result);
});
}
CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
/**
* The implementation of a master based coprocessor rpc channel.
*/
@InterfaceAudience.Private
class MasterCoprocessorRpcChannelImpl implements RpcChannel {
MasterRequestCallerBuilder<Message> callerBuilder;
MasterCoprocessorRpcChannelImpl(MasterRequestCallerBuilder<Message> callerBuilder) {
this.callerBuilder = callerBuilder;
}
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, MasterService.Interface stub) {
CompletableFuture<Message> future = new CompletableFuture<>();
CoprocessorServiceRequest csr =
CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
stub.execMasterService(
controller,
csr,
new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
.whenComplete(((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
}));
}
}

View File

@ -34,10 +34,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.client.replication.TableCFs;
@ -239,7 +244,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@ -2838,4 +2843,49 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
(s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
.call();
}
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable) {
MasterCoprocessorRpcChannelImpl channel =
new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
callable.call(stub, controller, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
});
return future;
}
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, ServerName serverName) {
RegionServerCoprocessorRpcChannelImpl channel =
new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
serverName));
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
callable.call(stub, controller, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
});
return future;
}
private <T> ServerRequestCallerBuilder<T> newServerCaller() {
return this.connection.callerFactory.<T> serverRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
/**
* The implementation of a region server based coprocessor rpc channel.
*/
@InterfaceAudience.Private
public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
ServerRequestCallerBuilder<Message> callerBuilder;
RegionServerCoprocessorRpcChannelImpl(ServerRequestCallerBuilder<Message> callerBuilder) {
this.callerBuilder = callerBuilder;
}
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, ClientService.Interface stub) {
CompletableFuture<Message> future = new CompletableFuture<>();
CoprocessorServiceRequest csr =
CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
stub.execRegionServerService(
controller,
csr,
new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
.whenComplete(((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
}));
}
}

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
import org.apache.hadoop.hbase.coprocessor.TestRegionServerCoprocessorEndpoint.DummyRegionServerEndpoint;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
private static final String DUMMY_VALUE = "val";
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
DummyRegionServerEndpoint.class.getName());
TEST_UTIL.startMiniCluster(2);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@Test
public void testMasterCoprocessorService() throws Exception {
TestProtos.EchoRequestProto request =
TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
TestProtos.EchoResponseProto response =
admin
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto> coprocessorService(
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
(s, c, done) -> s.echo(c, request, done)).get();
assertEquals("hello", response.getMessage());
}
@Test
public void testMasterCoprocessorError() throws Exception {
TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance();
try {
admin
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto> coprocessorService(
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
(s, c, done) -> s.error(c, emptyRequest, done)).get();
fail("Should have thrown an exception");
} catch (Exception e) {
}
}
@Test
public void testRegionServerCoprocessorService() throws Exception {
final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
DummyRegionServerEndpointProtos.DummyRequest request =
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
DummyRegionServerEndpointProtos.DummyResponse response =
admin
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
DummyRegionServerEndpointProtos.DummyService::newStub,
(s, c, done) -> s.dummyCall(c, request, done), serverName).get();
assertEquals(DUMMY_VALUE, response.getValue());
}
@Test
public void testRegionServerCoprocessorServiceError() throws Exception {
final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
DummyRegionServerEndpointProtos.DummyRequest request =
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
try {
admin
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
DummyRegionServerEndpointProtos.DummyService::newStub,
(s, c, done) -> s.dummyThrow(c, request, done), serverName).get();
fail("Should have thrown an exception");
} catch (Exception e) {
assertTrue(e.getCause() instanceof RetriesExhaustedException);
assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim()));
}
}
static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
public DummyRegionServerEndpoint() {
}
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
@Override
public void dummyCall(RpcController controller, DummyRequest request,
RpcCallback<DummyResponse> callback) {
callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
}
@Override
public void dummyThrow(RpcController controller,
DummyRequest request,
RpcCallback<DummyResponse> done) {
CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW);
}
}
}