HBASE-15798 Add Async RpcChannels to all RpcClients

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jurriaan Mous 2016-05-08 10:20:15 +02:00 committed by stack
parent 3b74b6f329
commit a11091c49c
11 changed files with 483 additions and 227 deletions

View File

@ -270,6 +270,27 @@ public abstract class AbstractRpcClient implements RpcClient {
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
}
/**
* Configure a payload carrying controller
* @param controller to configure
* @param channelOperationTimeout timeout for operation
* @return configured payload controller
*/
static PayloadCarryingRpcController configurePayloadCarryingRpcController(
RpcController controller, int channelOperationTimeout) {
PayloadCarryingRpcController pcrc;
if (controller != null && controller instanceof PayloadCarryingRpcController) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()) {
pcrc.setCallTimeout(channelOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(channelOperationTimeout);
}
return pcrc;
}
/**
* Takes an Exception and the address we were trying to connect to and return an IOException with
* the input exception as the cause. The new exception provides the stack trace of the place where
@ -321,16 +342,9 @@ public abstract class AbstractRpcClient implements RpcClient {
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
PayloadCarryingRpcController pcrc;
if (controller != null && controller instanceof PayloadCarryingRpcController) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()) {
pcrc.setCallTimeout(channelOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(channelOperationTimeout);
}
PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController(
controller,
channelOperationTimeout);
return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
}

View File

@ -51,14 +51,15 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
final Message responseDefaultType;
private final MessageConverter<M,T> messageConverter;
final long startTime;
final long rpcTimeout;
private final IOExceptionConverter exceptionConverter;
final long rpcTimeout;
// For only the request
private final CellScanner cellScanner;
private final int priority;
final MetricsConnection clientMetrics;
final MetricsConnection.CallStats callStats;
/**
@ -71,13 +72,15 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
* @param cellScanner cellScanner containing cells to send as request
* @param responseDefaultType the default response type
* @param messageConverter converts the messages to what is the expected output
* @param exceptionConverter converts exceptions to expected format. Can be null
* @param rpcTimeout timeout for this call in ms
* @param priority for this request
* @param metrics MetricsConnection to which the metrics are stored for this request
*/
public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor
md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
MetricsConnection.CallStats callStats) {
MetricsConnection metrics) {
super(channel.getEventExecutor());
this.channel = channel;
@ -90,13 +93,15 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
this.messageConverter = messageConverter;
this.exceptionConverter = exceptionConverter;
this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = rpcTimeout;
this.priority = priority;
this.cellScanner = cellScanner;
this.callStats = callStats;
this.callStats = MetricsConnection.newCallStats();
callStats.setStartTime(EnvironmentEdgeManager.currentTime());
this.clientMetrics = metrics;
}
/**
@ -105,7 +110,7 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
* @return start time for the call
*/
public long getStartTime() {
return this.startTime;
return this.callStats.getStartTime();
}
@Override
@ -122,9 +127,14 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
* @param cellBlockScanner to set
*/
public void setSuccess(M value, CellScanner cellBlockScanner) {
callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - callStats.getStartTime());
if (LOG.isTraceEnabled()) {
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
LOG.trace("Call: " + method.getName() + ", callTime: " + callStats.getCallTimeMs() + "ms");
}
if (clientMetrics != null) {
clientMetrics.updateRpc(method, param, callStats);
}
try {

View File

@ -27,7 +27,6 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
/**
* Interface for Async Rpc Channels
@ -45,15 +44,13 @@ public interface AsyncRpcChannel {
* @param exceptionConverter for converting exceptions
* @param rpcTimeout timeout for request
* @param priority for request
* @param callStats collects stats of the call
* @return Promise for the response Message
*/
<R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request, final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats);
R responsePrototype, MessageConverter<R, O> messageConverter,
IOExceptionConverter exceptionConverter, long rpcTimeout, int priority);
/**

View File

@ -30,7 +30,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
@ -297,14 +295,16 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel {
* @param priority for request
* @return Promise for the response Message
*/
@Override
public <R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request,final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats) {
exceptionConverter, long rpcTimeout, int priority) {
final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
rpcTimeout, priority, callStats);
rpcTimeout, priority, client.metrics);
synchronized (pendingCalls) {
if (closed) {
call.setFailure(new ConnectException());

View File

@ -27,6 +27,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
@ -59,7 +60,6 @@ import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.ResponseFutureListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
@ -103,7 +103,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
@VisibleForTesting
static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
getGlobalEventLoopGroup(Configuration conf) {
if (GLOBAL_EVENT_LOOP_GROUP == null) {
GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
@ -241,8 +241,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
final Future<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority(),
callStats);
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(),
pcrc.getPriority());
pcrc.notifyOnCancel(new RpcCallback<Object>() {
@Override
@ -289,19 +289,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
final AsyncRpcChannel connection;
try {
connection = createRpcChannel(md.getService().getName(), addr, ticket);
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
ResponseFutureListener<Message> listener =
new ResponseFutureListener<Message>() {
@Override
public void operationComplete(Future<Message> future) throws Exception {
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(md, param, cs);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
if (!future.isSuccess()) {
Throwable cause = future.cause();
if (cause instanceof IOException) {
@ -325,10 +317,9 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
}
};
cs.setStartTime(EnvironmentEdgeManager.currentTime());
connection.callMethod(md, param, pcrc.cellScanner(), returnType,
getMessageConverterWithRpcController(pcrc), null,
pcrc.getCallTimeout(), pcrc.getPriority(), cs)
pcrc.getCallTimeout(), pcrc.getPriority())
.addListener(listener);
} catch (StoppedRpcClientException|FailedServerException e) {
pcrc.setFailed(e);
@ -360,6 +351,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
}
@Override
public EventLoop getEventExecutor() {
return this.bootstrap.group().next();
}
/**
* Create a cell scanner
*
@ -382,10 +378,17 @@ public class AsyncRpcClient extends AbstractRpcClient {
return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
}
@Override
public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
throws StoppedRpcClientException, FailedServerException {
return this.createRpcChannel(serviceName,
new InetSocketAddress(sn.getHostname(), sn.getPort()), user);
}
/**
* Creates an RPC client
*
* @param serviceName name of servicce
* @param serviceName name of service
* @param location to connect to
* @param ticket for current user
* @return new RpcChannel
@ -452,6 +455,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
/**
* Remove connection from pool
* @param connection to remove
*/
public void removeConnection(AsyncRpcChannel connection) {
int connectionHashCode = connection.hashCode();
@ -469,17 +473,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
}
/**
* Creates a "channel" that can be used by a protobuf service. Useful setting up
* protobuf stubs.
*
* @param sn server name describing location of server
* @param user which is to use the connection
* @param rpcTimeout default rpc operation timeout
*
* @return A rpc channel that goes via this rpc client instance.
*/
public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
@Override
public RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
return new RpcChannelImplementation(this, sn, user, rpcTimeout);
}
@ -507,21 +502,20 @@ public class AsyncRpcClient extends AbstractRpcClient {
@Override
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType, RpcCallback<Message> done) {
PayloadCarryingRpcController pcrc;
if (controller != null) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()) {
pcrc.setCallTimeout(channelOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(channelOperationTimeout);
}
PayloadCarryingRpcController pcrc =
configurePayloadCarryingRpcController(controller, channelOperationTimeout);
this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
}
}
/**
* Get a new timeout on this RPC client
* @param task to run at timeout
* @param delay for the timeout
* @param unit time unit for the timeout
* @return Timeout
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
return WHEEL_TIMER.newTimeout(task, delay, unit);
}

View File

@ -41,7 +41,7 @@ public interface MessageConverter<M,O> {
MessageConverter<Message,Message> NO_CONVERTER = new MessageConverter<Message, Message>() {
@Override
public Message convert(Message msg, CellScanner cellScanner) throws IOException {
return null;
return msg;
}
};
}

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcChannel;
import io.netty.util.concurrent.EventExecutor;
import java.io.Closeable;
import java.io.IOException;
@ -68,6 +69,30 @@ import org.apache.hadoop.hbase.security.User;
BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
throws IOException;
/**
* Create or fetch AsyncRpcChannel
* @param serviceName to connect to
* @param sn ServerName of the channel to create
* @param user for the service
* @return An async RPC channel fitting given parameters
* @throws FailedServerException if server failed
* @throws StoppedRpcClientException if the RPC client has stopped
*/
AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
throws StoppedRpcClientException, FailedServerException;
/**
* Creates a "channel" that can be used by a protobuf service. Useful setting up
* protobuf stubs.
*
* @param sn server name describing location of server
* @param user which is to use the connection
* @param rpcTimeout default rpc operation timeout
*
* @return A rpc channel that goes via this rpc client instance.
*/
RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout);
/**
* Interrupt the connections to the given server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,
@ -91,4 +116,10 @@ import org.apache.hadoop.hbase.security.User;
* supports cell blocks.
*/
boolean hasCellBlockSupport();
/**
* Get an event loop to operate on
* @return EventLoop
*/
EventExecutor getEventExecutor();
}

View File

@ -22,6 +22,9 @@ import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import io.netty.util.concurrent.EventExecutor;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -51,7 +54,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import javax.security.sasl.SaslException;
@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
@ -118,9 +121,7 @@ public class RpcClientImpl extends AbstractRpcClient {
protected final SocketFactory socketFactory; // how to create sockets
protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
TokenSelector<? extends TokenIdentifier>>();
TokenSelector<? extends TokenIdentifier>> tokenHandlers = new HashMap<>();
static {
tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
new AuthenticationTokenSelector());
@ -1217,7 +1218,13 @@ public class RpcClientImpl extends AbstractRpcClient {
}
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@Override
public EventExecutor getEventExecutor() {
return AsyncRpcClient.getGlobalEventLoopGroup(this.conf).getFirst().next();
}
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code
@ -1226,7 +1233,7 @@ public class RpcClientImpl extends AbstractRpcClient {
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException if the call is interupted
* @throws InterruptedException if the call is interrupted
* @throws IOException if something fails on the connection
*/
@Override
@ -1237,10 +1244,35 @@ public class RpcClientImpl extends AbstractRpcClient {
if (pcrc == null) {
pcrc = new PayloadCarryingRpcController();
}
Call call = this.call(md, param, returnType, pcrc, ticket, addr, callStats);
return new Pair<>(call.response, call.cells);
}
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @return A Call
* @throws InterruptedException if the call is interrupted
* @throws IOException if something fails on the connection
*/
private <R extends Message> Call call(MethodDescriptor method, Message request,
R responsePrototype, PayloadCarryingRpcController pcrc, User ticket,
InetSocketAddress addr, MetricsConnection.CallStats callStats)
throws IOException, InterruptedException {
CellScanner cells = pcrc.cellScanner();
final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
pcrc.getCallTimeout(), MetricsConnection.newCallStats());
final Call call = new Call(callIdCnt.getAndIncrement(), method, request, cells,
responsePrototype, pcrc.getCallTimeout(), callStats);
final Connection connection = getConnection(ticket, call, addr);
@ -1256,7 +1288,7 @@ public class RpcClientImpl extends AbstractRpcClient {
if (pcrc.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.callComplete();
return new Pair<>(call.response, call.cells);
return call;
}
} else {
cts = null;
@ -1299,9 +1331,19 @@ public class RpcClientImpl extends AbstractRpcClient {
throw wrapException(addr, call.error);
}
return new Pair<>(call.response, call.cells);
return call;
}
@Override
public org.apache.hadoop.hbase.ipc.AsyncRpcChannel createRpcChannel(String serviceName,
ServerName sn, User user) throws StoppedRpcClientException, FailedServerException {
return new AsyncRpcChannel(sn, user);
}
@Override
public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) {
return new RpcChannelImplementation(sn, user, rpcTimeout);
}
/**
* Interrupt the connections to the given ip:port server. This should be called if the server
@ -1349,4 +1391,143 @@ public class RpcClientImpl extends AbstractRpcClient {
return connection;
}
/**
* Simulated async call
*/
private class RpcChannelImplementation implements RpcChannel {
private final InetSocketAddress isa;
private final User ticket;
private final int channelOperationTimeout;
private final EventExecutor executor;
/**
* @param channelOperationTimeout - the default timeout when no timeout is given
*/
protected RpcChannelImplementation(
final ServerName sn, final User ticket, int channelOperationTimeout) {
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
this.ticket = ticket;
this.channelOperationTimeout = channelOperationTimeout;
this.executor = RpcClientImpl.this.getEventExecutor();
}
@Override
public void callMethod(final MethodDescriptor method, RpcController controller,
final Message request, final Message responsePrototype, final RpcCallback<Message> done) {
final PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController(
controller,
channelOperationTimeout);
executor.execute(new Runnable() {
@Override
public void run() {
try {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(method, request, cs);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
done.run(call.response);
} catch (IOException e) {
pcrc.setFailed(e);
} catch (InterruptedException e) {
pcrc.startCancel();
}
}
});
}
}
/**
* Wraps the call in an async channel.
*/
private class AsyncRpcChannel implements org.apache.hadoop.hbase.ipc.AsyncRpcChannel {
private final EventExecutor executor;
private final InetSocketAddress isa;
private final User ticket;
/**
* Constructor
* @param sn servername to connect to
* @param user to connect with
*/
public AsyncRpcChannel(ServerName sn, User user) {
this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
this.executor = RpcClientImpl.this.getEventExecutor();
this.ticket = user;
}
@Override
@SuppressWarnings("unchecked")
public <R extends Message, O> Future<O> callMethod(final MethodDescriptor method,
final Message request, CellScanner cellScanner, final R responsePrototype,
final MessageConverter<R, O> messageConverter,
final IOExceptionConverter exceptionConverter, long rpcTimeout, int priority) {
final PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(cellScanner);
pcrc.setPriority(priority);
pcrc.setCallTimeout((int) rpcTimeout);
final Promise<O> promise = new Promise<>(executor);
executor.execute(new Runnable() {
@Override
public void run() {
try {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(method, request, cs);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
promise.setSuccess(
messageConverter.convert((R) call.response, call.cells)
);
} catch (InterruptedException e) {
promise.cancel(true);
} catch (IOException e) {
if(exceptionConverter != null) {
e = exceptionConverter.convert(e);
}
promise.setFailure(e);
}
}
});
return promise;
}
@Override
public EventExecutor getEventExecutor() {
return this.executor;
}
@Override
public void close(Throwable cause) {
this.executor.shutdownGracefully();
}
@Override
public boolean isAlive() {
return !this.executor.isShuttingDown() && !this.executor.isShutdown();
}
@Override
public InetSocketAddress getAddress() {
return isa;
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
@ -25,6 +26,15 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@ -32,7 +42,9 @@ import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -40,9 +52,12 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
@ -60,15 +75,6 @@ import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Some basic ipc tests.
*/
@ -76,8 +82,10 @@ public abstract class AbstractTestIPC {
private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
static final Configuration CONF = HBaseConfiguration.create();
@ -114,7 +122,7 @@ public abstract class AbstractTestIPC {
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<Cell>();
list = new ArrayList<>();
try {
while (cellScanner.advance()) {
list.add(cellScanner.current());
@ -168,9 +176,8 @@ public abstract class AbstractTestIPC {
@Test
public void testNoCodec() throws InterruptedException, IOException {
Configuration conf = HBaseConfiguration.create();
AbstractRpcClient client = createRpcClientNoCodec(conf);
TestRpcServer rpcServer = new TestRpcServer();
try {
try (AbstractRpcClient client = createRpcClientNoCodec(conf)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
@ -186,7 +193,6 @@ public abstract class AbstractTestIPC {
// Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message));
} finally {
client.close();
rpcServer.stop();
}
}
@ -207,14 +213,13 @@ public abstract class AbstractTestIPC {
NoSuchMethodException, ServiceException {
Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
List<Cell> cells = new ArrayList<Cell>();
List<Cell> cells = new ArrayList<>();
int count = 3;
for (int i = 0; i < count; i++) {
cells.add(CELL);
}
AbstractRpcClient client = createRpcClient(conf);
TestRpcServer rpcServer = new TestRpcServer();
try {
try (AbstractRpcClient client = createRpcClient(conf)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@ -234,7 +239,6 @@ public abstract class AbstractTestIPC {
}
assertEquals(count, index);
} finally {
client.close();
rpcServer.stop();
}
}
@ -246,8 +250,7 @@ public abstract class AbstractTestIPC {
public void testRTEDuringConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
try {
try (AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf)) {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@ -262,7 +265,6 @@ public abstract class AbstractTestIPC {
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
client.close();
rpcServer.stop();
}
}
@ -332,7 +334,7 @@ public abstract class AbstractTestIPC {
*/
static class TestRpcServer1 extends RpcServer {
private static BlockingInterface SERVICE1 =
private static final BlockingInterface SERVICE1 =
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
@ -378,26 +380,22 @@ public abstract class AbstractTestIPC {
final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
final AbstractRpcClient client =
new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null);
try {
try (AbstractRpcClient client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT,
localAddr, null)) {
rpcServer.start();
final InetSocketAddress isa = rpcServer.getListenerAddress();
if (isa == null) {
throw new IOException("Listener channel is closed");
}
final BlockingRpcChannel channel =
client.createBlockingRpcChannel(
final BlockingRpcChannel channel = client.createBlockingRpcChannel(
ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
BlockingInterface stub = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
final EchoRequestProto echoRequest =
EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
} finally {
client.close();
rpcServer.stop();
}
}
@ -416,4 +414,141 @@ public abstract class AbstractTestIPC {
.wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
.getCause() instanceof CallTimeoutException);
}
@Test
public void testAsyncProtobufConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
try (RpcClient client = createRpcClient(CONF)) {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel = client.createProtobufRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
channel
.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType().toProto(),
new com.google.protobuf.RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
rpcServer.stop();
}
}
@Test
public void testRTEDuringAsyncProtobufConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel = client.createProtobufRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.notifyOnFail(new com.google.protobuf.RpcCallback<IOException>() {
@Override
public void run(IOException e) {
done.set(true);
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
}
});
channel.callMethod(md, controller, param, md.getOutputType().toProto(),
new com.google.protobuf.RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
fail("Expected an exception to have been thrown!");
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
rpcServer.stop();
}
}
@Test
public void testAsyncConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
try (RpcClient client = createRpcClient(CONF)) {
rpcServer.start();
Message msg = setupAsyncConnection(rpcServer, client);
assertNotNull(msg);
} finally {
rpcServer.stop();
}
}
@Test
public void testRTEDuringAsyncConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
rpcServer.start();
setupAsyncConnection(rpcServer, client);
fail("Expected an exception to have been thrown!");
} catch (ExecutionException e) {
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
rpcServer.stop();
}
}
private Message setupAsyncConnection(TestRpcServer rpcServer, RpcClient client)
throws IOException, InterruptedException, ExecutionException,
java.util.concurrent.TimeoutException {
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
ServerName serverName =
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis());
AsyncRpcChannel channel =
client.createRpcChannel(md.getService().getName(), serverName, User.getCurrent());
final Future<Message> f = channel
.callMethod(md, param, null, md.getOutputType().toProto(), MessageConverter.NO_CONVERTER,
null, 1000, HConstants.NORMAL_QOS);
return f.get(1, TimeUnit.SECONDS);
}
}

View File

@ -17,8 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
@ -32,7 +32,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,12 +39,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
@ -60,30 +56,20 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
@RunWith(Parameterized.class)
@Category({ RPCTests.class, SmallTests.class })
public class TestAsyncIPC extends AbstractTestIPC {
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Parameters
public static Collection<Object[]> parameters() {
List<Object[]> paramList = new ArrayList<Object[]>();
List<Object[]> paramList = new ArrayList<>();
paramList.add(new Object[] { false, false });
paramList.add(new Object[] { false, true });
paramList.add(new Object[] { true, false });
@ -150,95 +136,6 @@ public class TestAsyncIPC extends AbstractTestIPC {
});
}
@Test
public void testAsyncConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
AsyncRpcClient client = createRpcClient(CONF);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel =
client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
System.currentTimeMillis()), User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType()
.toProto(), new RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
client.close();
rpcServer.stop();
}
}
@Test
public void testRTEDuringAsyncConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel =
client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
System.currentTimeMillis()), User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.notifyOnFail(new RpcCallback<IOException>() {
@Override
public void run(IOException e) {
done.set(true);
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
}
});
channel.callMethod(md, controller, param, md.getOutputType().toProto(),
new RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
fail("Expected an exception to have been thrown!");
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
client.close();
rpcServer.stop();
}
}
public static void main(String[] args) throws IOException, SecurityException,
NoSuchMethodException, InterruptedException {
if (args.length != 2) {
@ -253,7 +150,6 @@ public class TestAsyncIPC extends AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
AsyncRpcClient client = new AsyncRpcClient(conf);
KeyValue kv = BIG_CELL;
Put p = new Put(CellUtil.cloneRow(kv));
for (int i = 0; i < cellcount; i++) {
@ -261,7 +157,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
}
RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
rm.add(p);
try {
try (AsyncRpcClient client = new AsyncRpcClient(conf)) {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
@ -270,7 +166,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
List<CellScannable> cells = new ArrayList<>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder =
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
@ -300,7 +196,6 @@ public class TestAsyncIPC extends AbstractTestIPC {
LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
+ (System.currentTimeMillis() - startTime) + "ms");
} finally {
client.close();
rpcServer.stop();
}
}

View File

@ -22,6 +22,8 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@ -59,9 +61,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
@Category({ RPCTests.class, SmallTests.class })
public class TestIPC extends AbstractTestIPC {
@ -129,7 +128,7 @@ public class TestIPC extends AbstractTestIPC {
throw new IOException("Listener channel is closed");
}
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
List<CellScannable> cells = new ArrayList<>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder =
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,