HBASE-18342 Add coprocessor service support for async admin
This commit is contained in:
parent
246d42297b
commit
06a0bfc3ba
|
@ -23,6 +23,7 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
|
@ -36,12 +37,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
|
||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
|
||||
/**
|
||||
* The asynchronous administrative API for HBase.
|
||||
* <p>
|
||||
|
@ -1060,4 +1064,49 @@ public interface AsyncAdmin {
|
|||
* {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Integer> runCatalogJanitor();
|
||||
|
||||
/**
|
||||
* Execute the given coprocessor call on the master.
|
||||
* <p>
|
||||
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
|
||||
* one line lambda expression, like:
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* channel -> xxxService.newStub(channel)
|
||||
* </code>
|
||||
* </pre>
|
||||
* @param stubMaker a delegation to the actual {@code newStub} call.
|
||||
* @param callable a delegation to the actual protobuf rpc call. See the comment of
|
||||
* {@link CoprocessorCallable} for more details.
|
||||
* @param <S> the type of the asynchronous stub
|
||||
* @param <R> the type of the return value
|
||||
* @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
|
||||
* @see CoprocessorCallable
|
||||
*/
|
||||
<S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
|
||||
CoprocessorCallable<S, R> callable);
|
||||
|
||||
/**
|
||||
* Execute the given coprocessor call on the given region server.
|
||||
* <p>
|
||||
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
|
||||
* one line lambda expression, like:
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* channel -> xxxService.newStub(channel)
|
||||
* </code>
|
||||
* </pre>
|
||||
* @param stubMaker a delegation to the actual {@code newStub} call.
|
||||
* @param callable a delegation to the actual protobuf rpc call. See the comment of
|
||||
* {@link CoprocessorCallable} for more details.
|
||||
* @param serverName the given region server
|
||||
* @param <S> the type of the asynchronous stub
|
||||
* @param <R> the type of the return value
|
||||
* @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
|
||||
* @see CoprocessorCallable
|
||||
*/
|
||||
<S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
|
||||
CoprocessorCallable<S, R> callable, ServerName serverName);
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.function.Function;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
|
||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
||||
|
@ -45,6 +47,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
|
||||
/**
|
||||
* The implementation of AsyncAdmin.
|
||||
*/
|
||||
|
@ -617,4 +621,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
public CompletableFuture<Integer> runCatalogJanitor() {
|
||||
return wrap(rawAdmin.runCatalogJanitor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
|
||||
CoprocessorCallable<S, R> callable) {
|
||||
return wrap(rawAdmin.coprocessorService(stubMaker, callable));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
|
||||
CoprocessorCallable<S, R> callable, ServerName serverName) {
|
||||
return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -378,7 +378,7 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return new MasterRequestCallerBuilder<>();
|
||||
}
|
||||
|
||||
public class AdminRequestCallerBuilder<T> extends BuilderBase{
|
||||
public class AdminRequestCallerBuilder<T> extends BuilderBase {
|
||||
// TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
|
||||
|
||||
private AsyncAdminRequestRetryingCaller.Callable<T> callable;
|
||||
|
@ -438,4 +438,65 @@ class AsyncRpcRetryingCallerFactory {
|
|||
public <T> AdminRequestCallerBuilder<T> adminRequest(){
|
||||
return new AdminRequestCallerBuilder<>();
|
||||
}
|
||||
|
||||
public class ServerRequestCallerBuilder<T> extends BuilderBase {
|
||||
|
||||
private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
|
||||
|
||||
private long operationTimeoutNs = -1L;
|
||||
|
||||
private long rpcTimeoutNs = -1L;
|
||||
|
||||
private ServerName serverName;
|
||||
|
||||
public ServerRequestCallerBuilder<T> action(
|
||||
AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
|
||||
this.callable = callable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit unit) {
|
||||
this.operationTimeoutNs = unit.toNanos(operationTimeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit) {
|
||||
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
|
||||
this.pauseNs = unit.toNanos(pause);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
|
||||
this.startLogErrorsCnt = startLogErrorsCnt;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
|
||||
this.serverName = serverName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AsyncServerRequestRpcRetryingCaller<T> build() {
|
||||
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
|
||||
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, serverName, checkNotNull(callable,
|
||||
"action is null"));
|
||||
}
|
||||
|
||||
public CompletableFuture<T> call() {
|
||||
return build().call();
|
||||
}
|
||||
}
|
||||
|
||||
public <T> ServerRequestCallerBuilder<T> serverRequest() {
|
||||
return new ServerRequestCallerBuilder<>();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import io.netty.util.HashedWheelTimer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
|
||||
/**
|
||||
* Retry caller for a request call to region server.
|
||||
* Now only used for coprocessor call to region server.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
|
||||
|
||||
@FunctionalInterface
|
||||
public interface Callable<T> {
|
||||
CompletableFuture<T> call(HBaseRpcController controller, ClientService.Interface stub);
|
||||
}
|
||||
|
||||
private final Callable<T> callable;
|
||||
private ServerName serverName;
|
||||
|
||||
public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
|
||||
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
|
||||
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
|
||||
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
|
||||
startLogErrorsCnt);
|
||||
this.serverName = serverName;
|
||||
this.callable = callable;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doCall() {
|
||||
ClientService.Interface stub;
|
||||
try {
|
||||
stub = this.conn.getRegionServerStub(serverName);
|
||||
} catch (IOException e) {
|
||||
onError(e, () -> "Get async admin stub to " + serverName + " failed", err -> {
|
||||
});
|
||||
return;
|
||||
}
|
||||
resetCallTimeout();
|
||||
callable.call(controller, stub).whenComplete((result, error) -> {
|
||||
if (error != null) {
|
||||
onError(error, () -> "Call to admin stub failed", err -> {
|
||||
});
|
||||
return;
|
||||
}
|
||||
future.complete(result);
|
||||
});
|
||||
}
|
||||
|
||||
CompletableFuture<T> call() {
|
||||
doCall();
|
||||
return future;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
|
||||
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import com.google.protobuf.RpcController;
|
||||
|
||||
/**
|
||||
* The implementation of a master based coprocessor rpc channel.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MasterCoprocessorRpcChannelImpl implements RpcChannel {
|
||||
|
||||
MasterRequestCallerBuilder<Message> callerBuilder;
|
||||
|
||||
MasterCoprocessorRpcChannelImpl(MasterRequestCallerBuilder<Message> callerBuilder) {
|
||||
this.callerBuilder = callerBuilder;
|
||||
}
|
||||
|
||||
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
|
||||
Message responsePrototype, HBaseRpcController controller, MasterService.Interface stub) {
|
||||
CompletableFuture<Message> future = new CompletableFuture<>();
|
||||
CoprocessorServiceRequest csr =
|
||||
CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
|
||||
stub.execMasterService(
|
||||
controller,
|
||||
csr,
|
||||
new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
|
||||
|
||||
@Override
|
||||
public void run(CoprocessorServiceResponse resp) {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
try {
|
||||
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
|
||||
Message responsePrototype, RpcCallback<Message> done) {
|
||||
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
|
||||
.whenComplete(((r, e) -> {
|
||||
if (e != null) {
|
||||
((ClientCoprocessorRpcController) controller).setFailed(e);
|
||||
}
|
||||
done.run(r);
|
||||
}));
|
||||
}
|
||||
}
|
|
@ -34,10 +34,13 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
|
@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.UnknownRegionException;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||
|
@ -239,7 +244,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
|
||||
|
@ -2838,4 +2843,49 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
(s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
|
||||
CoprocessorCallable<S, R> callable) {
|
||||
MasterCoprocessorRpcChannelImpl channel =
|
||||
new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
|
||||
S stub = stubMaker.apply(channel);
|
||||
CompletableFuture<R> future = new CompletableFuture<>();
|
||||
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
|
||||
callable.call(stub, controller, resp -> {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
future.complete(resp);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
|
||||
CoprocessorCallable<S, R> callable, ServerName serverName) {
|
||||
RegionServerCoprocessorRpcChannelImpl channel =
|
||||
new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
|
||||
serverName));
|
||||
S stub = stubMaker.apply(channel);
|
||||
CompletableFuture<R> future = new CompletableFuture<>();
|
||||
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
|
||||
callable.call(stub, controller, resp -> {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
future.complete(resp);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
private <T> ServerRequestCallerBuilder<T> newServerCaller() {
|
||||
return this.connection.callerFactory.<T> serverRequest()
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
|
||||
.startLogErrorsCnt(startLogErrorsCnt);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import com.google.protobuf.RpcController;
|
||||
|
||||
/**
|
||||
* The implementation of a region server based coprocessor rpc channel.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
|
||||
|
||||
ServerRequestCallerBuilder<Message> callerBuilder;
|
||||
|
||||
RegionServerCoprocessorRpcChannelImpl(ServerRequestCallerBuilder<Message> callerBuilder) {
|
||||
this.callerBuilder = callerBuilder;
|
||||
}
|
||||
|
||||
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
|
||||
Message responsePrototype, HBaseRpcController controller, ClientService.Interface stub) {
|
||||
CompletableFuture<Message> future = new CompletableFuture<>();
|
||||
CoprocessorServiceRequest csr =
|
||||
CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
|
||||
stub.execRegionServerService(
|
||||
controller,
|
||||
csr,
|
||||
new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
|
||||
|
||||
@Override
|
||||
public void run(CoprocessorServiceResponse resp) {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
try {
|
||||
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
|
||||
Message responsePrototype, RpcCallback<Message> done) {
|
||||
callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
|
||||
.whenComplete(((r, e) -> {
|
||||
if (e != null) {
|
||||
((ClientCoprocessorRpcController) controller).setFailed(e);
|
||||
}
|
||||
done.run(r);
|
||||
}));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
|
||||
import org.apache.hadoop.hbase.coprocessor.TestRegionServerCoprocessorEndpoint.DummyRegionServerEndpoint;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
|
||||
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
|
||||
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ ClientTests.class, MediumTests.class })
|
||||
public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
|
||||
|
||||
private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
|
||||
private static final String DUMMY_VALUE = "val";
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
ProtobufCoprocessorService.class.getName());
|
||||
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
|
||||
DummyRegionServerEndpoint.class.getName());
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMasterCoprocessorService() throws Exception {
|
||||
TestProtos.EchoRequestProto request =
|
||||
TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
TestProtos.EchoResponseProto response =
|
||||
admin
|
||||
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto> coprocessorService(
|
||||
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
|
||||
(s, c, done) -> s.echo(c, request, done)).get();
|
||||
assertEquals("hello", response.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMasterCoprocessorError() throws Exception {
|
||||
TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance();
|
||||
try {
|
||||
admin
|
||||
.<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto> coprocessorService(
|
||||
TestRpcServiceProtos.TestProtobufRpcProto::newStub,
|
||||
(s, c, done) -> s.error(c, emptyRequest, done)).get();
|
||||
fail("Should have thrown an exception");
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionServerCoprocessorService() throws Exception {
|
||||
final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
|
||||
DummyRegionServerEndpointProtos.DummyRequest request =
|
||||
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
|
||||
DummyRegionServerEndpointProtos.DummyResponse response =
|
||||
admin
|
||||
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
|
||||
DummyRegionServerEndpointProtos.DummyService::newStub,
|
||||
(s, c, done) -> s.dummyCall(c, request, done), serverName).get();
|
||||
assertEquals(DUMMY_VALUE, response.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionServerCoprocessorServiceError() throws Exception {
|
||||
final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
|
||||
DummyRegionServerEndpointProtos.DummyRequest request =
|
||||
DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
|
||||
try {
|
||||
admin
|
||||
.<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse> coprocessorService(
|
||||
DummyRegionServerEndpointProtos.DummyService::newStub,
|
||||
(s, c, done) -> s.dummyThrow(c, request, done), serverName).get();
|
||||
fail("Should have thrown an exception");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getCause() instanceof RetriesExhaustedException);
|
||||
assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim()));
|
||||
}
|
||||
}
|
||||
|
||||
static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
|
||||
|
||||
public DummyRegionServerEndpoint() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Service getService() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dummyCall(RpcController controller, DummyRequest request,
|
||||
RpcCallback<DummyResponse> callback) {
|
||||
callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dummyThrow(RpcController controller,
|
||||
DummyRequest request,
|
||||
RpcCallback<DummyResponse> done) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue