From a11091c49cdd67c0ad37e8a55e87f430e24da4f1 Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Sun, 8 May 2016 10:20:15 +0200 Subject: [PATCH] HBASE-15798 Add Async RpcChannels to all RpcClients Signed-off-by: stack --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 34 ++- .../apache/hadoop/hbase/ipc/AsyncCall.java | 26 ++- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 9 +- .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java | 8 +- .../hadoop/hbase/ipc/AsyncRpcClient.java | 66 +++--- .../hadoop/hbase/ipc/MessageConverter.java | 2 +- .../apache/hadoop/hbase/ipc/RpcClient.java | 33 ++- .../hadoop/hbase/ipc/RpcClientImpl.java | 201 ++++++++++++++++- .../hadoop/hbase/ipc/AbstractTestIPC.java | 203 +++++++++++++++--- .../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 121 +---------- .../org/apache/hadoop/hbase/ipc/TestIPC.java | 7 +- 11 files changed, 483 insertions(+), 227 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index c091d1d3d0d..71c88756668 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -270,6 +270,27 @@ public abstract class AbstractRpcClient implements RpcClient { return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); } + /** + * Configure a payload carrying controller + * @param controller to configure + * @param channelOperationTimeout timeout for operation + * @return configured payload controller + */ + static PayloadCarryingRpcController configurePayloadCarryingRpcController( + RpcController controller, int channelOperationTimeout) { + PayloadCarryingRpcController pcrc; + if (controller != null && controller instanceof PayloadCarryingRpcController) { + pcrc = (PayloadCarryingRpcController) controller; + if (!pcrc.hasCallTimeout()) { + pcrc.setCallTimeout(channelOperationTimeout); + } + } else { + pcrc = new PayloadCarryingRpcController(); + pcrc.setCallTimeout(channelOperationTimeout); + } + return pcrc; + } + /** * Takes an Exception and the address we were trying to connect to and return an IOException with * the input exception as the cause. The new exception provides the stack trace of the place where @@ -321,16 +342,9 @@ public abstract class AbstractRpcClient implements RpcClient { @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { - PayloadCarryingRpcController pcrc; - if (controller != null && controller instanceof PayloadCarryingRpcController) { - pcrc = (PayloadCarryingRpcController) controller; - if (!pcrc.hasCallTimeout()) { - pcrc.setCallTimeout(channelOperationTimeout); - } - } else { - pcrc = new PayloadCarryingRpcController(); - pcrc.setCallTimeout(channelOperationTimeout); - } + PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController( + controller, + channelOperationTimeout); return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index 3acf280ae78..89e6ca489ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -51,14 +51,15 @@ public class AsyncCall extends Promise { final Message responseDefaultType; private final MessageConverter messageConverter; - final long startTime; - final long rpcTimeout; private final IOExceptionConverter exceptionConverter; + final long rpcTimeout; + // For only the request private final CellScanner cellScanner; private final int priority; + final MetricsConnection clientMetrics; final MetricsConnection.CallStats callStats; /** @@ -71,13 +72,15 @@ public class AsyncCall extends Promise { * @param cellScanner cellScanner containing cells to send as request * @param responseDefaultType the default response type * @param messageConverter converts the messages to what is the expected output + * @param exceptionConverter converts exceptions to expected format. Can be null * @param rpcTimeout timeout for this call in ms * @param priority for this request + * @param metrics MetricsConnection to which the metrics are stored for this request */ public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority, - MetricsConnection.CallStats callStats) { + MetricsConnection metrics) { super(channel.getEventExecutor()); this.channel = channel; @@ -90,13 +93,15 @@ public class AsyncCall extends Promise { this.messageConverter = messageConverter; this.exceptionConverter = exceptionConverter; - this.startTime = EnvironmentEdgeManager.currentTime(); this.rpcTimeout = rpcTimeout; this.priority = priority; this.cellScanner = cellScanner; - this.callStats = callStats; + this.callStats = MetricsConnection.newCallStats(); + callStats.setStartTime(EnvironmentEdgeManager.currentTime()); + + this.clientMetrics = metrics; } /** @@ -105,7 +110,7 @@ public class AsyncCall extends Promise { * @return start time for the call */ public long getStartTime() { - return this.startTime; + return this.callStats.getStartTime(); } @Override @@ -122,9 +127,14 @@ public class AsyncCall extends Promise { * @param cellBlockScanner to set */ public void setSuccess(M value, CellScanner cellBlockScanner) { + callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - callStats.getStartTime()); + if (LOG.isTraceEnabled()) { - long callTime = EnvironmentEdgeManager.currentTime() - startTime; - LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms"); + LOG.trace("Call: " + method.getName() + ", callTime: " + callStats.getCallTimeMs() + "ms"); + } + + if (clientMetrics != null) { + clientMetrics.updateRpc(method, param, callStats); } try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index bd4be5a0714..8cc730ffbc0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -27,7 +27,6 @@ import java.net.InetSocketAddress; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Future; -import org.apache.hadoop.hbase.client.MetricsConnection; /** * Interface for Async Rpc Channels @@ -45,15 +44,13 @@ public interface AsyncRpcChannel { * @param exceptionConverter for converting exceptions * @param rpcTimeout timeout for request * @param priority for request - * @param callStats collects stats of the call * @return Promise for the response Message */ - Future callMethod( final Descriptors.MethodDescriptor method, - final Message request,final CellScanner cellScanner, - R responsePrototype, MessageConverter messageConverter, IOExceptionConverter - exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats); + final Message request, final CellScanner cellScanner, + R responsePrototype, MessageConverter messageConverter, + IOExceptionConverter exceptionConverter, long rpcTimeout, int priority); /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java index 5af23547189..cd61b61db99 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java @@ -30,7 +30,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.GenericFutureListener; - import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Future; -import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; @@ -297,14 +295,16 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { * @param priority for request * @return Promise for the response Message */ + @Override public Future callMethod( final Descriptors.MethodDescriptor method, final Message request,final CellScanner cellScanner, R responsePrototype, MessageConverter messageConverter, IOExceptionConverter - exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats) { + exceptionConverter, long rpcTimeout, int priority) { final AsyncCall call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(), method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter, - rpcTimeout, priority, callStats); + rpcTimeout, priority, client.metrics); + synchronized (pendingCalls) { if (closed) { call.setFailure(new ConnectException()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 2fdc1ecfa5d..c1ed7480bce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -27,6 +27,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; @@ -59,7 +60,6 @@ import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.ResponseFutureListener; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; @@ -103,7 +103,7 @@ public class AsyncRpcClient extends AbstractRpcClient { @VisibleForTesting static Pair> GLOBAL_EVENT_LOOP_GROUP; - private synchronized static Pair> + synchronized static Pair> getGlobalEventLoopGroup(Configuration conf) { if (GLOBAL_EVENT_LOOP_GROUP == null) { GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf); @@ -241,8 +241,8 @@ public class AsyncRpcClient extends AbstractRpcClient { final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); final Future promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType, - getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority(), - callStats); + getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), + pcrc.getPriority()); pcrc.notifyOnCancel(new RpcCallback() { @Override @@ -289,19 +289,11 @@ public class AsyncRpcClient extends AbstractRpcClient { final AsyncRpcChannel connection; try { connection = createRpcChannel(md.getService().getName(), addr, ticket); - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); ResponseFutureListener listener = new ResponseFutureListener() { @Override public void operationComplete(Future future) throws Exception { - cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); - if (metrics != null) { - metrics.updateRpc(md, param, cs); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); - } if (!future.isSuccess()) { Throwable cause = future.cause(); if (cause instanceof IOException) { @@ -325,10 +317,9 @@ public class AsyncRpcClient extends AbstractRpcClient { } } }; - cs.setStartTime(EnvironmentEdgeManager.currentTime()); connection.callMethod(md, param, pcrc.cellScanner(), returnType, getMessageConverterWithRpcController(pcrc), null, - pcrc.getCallTimeout(), pcrc.getPriority(), cs) + pcrc.getCallTimeout(), pcrc.getPriority()) .addListener(listener); } catch (StoppedRpcClientException|FailedServerException e) { pcrc.setFailed(e); @@ -360,6 +351,11 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + @Override + public EventLoop getEventExecutor() { + return this.bootstrap.group().next(); + } + /** * Create a cell scanner * @@ -382,10 +378,17 @@ public class AsyncRpcClient extends AbstractRpcClient { return ipcUtil.buildCellBlock(this.codec, this.compressor, cells); } + @Override + public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user) + throws StoppedRpcClientException, FailedServerException { + return this.createRpcChannel(serviceName, + new InetSocketAddress(sn.getHostname(), sn.getPort()), user); + } + /** * Creates an RPC client * - * @param serviceName name of servicce + * @param serviceName name of service * @param location to connect to * @param ticket for current user * @return new RpcChannel @@ -452,6 +455,7 @@ public class AsyncRpcClient extends AbstractRpcClient { /** * Remove connection from pool + * @param connection to remove */ public void removeConnection(AsyncRpcChannel connection) { int connectionHashCode = connection.hashCode(); @@ -469,17 +473,8 @@ public class AsyncRpcClient extends AbstractRpcClient { } } - /** - * Creates a "channel" that can be used by a protobuf service. Useful setting up - * protobuf stubs. - * - * @param sn server name describing location of server - * @param user which is to use the connection - * @param rpcTimeout default rpc operation timeout - * - * @return A rpc channel that goes via this rpc client instance. - */ - public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) { + @Override + public RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout) { return new RpcChannelImplementation(this, sn, user, rpcTimeout); } @@ -507,21 +502,20 @@ public class AsyncRpcClient extends AbstractRpcClient { @Override public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType, RpcCallback done) { - PayloadCarryingRpcController pcrc; - if (controller != null) { - pcrc = (PayloadCarryingRpcController) controller; - if (!pcrc.hasCallTimeout()) { - pcrc.setCallTimeout(channelOperationTimeout); - } - } else { - pcrc = new PayloadCarryingRpcController(); - pcrc.setCallTimeout(channelOperationTimeout); - } + PayloadCarryingRpcController pcrc = + configurePayloadCarryingRpcController(controller, channelOperationTimeout); this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); } } + /** + * Get a new timeout on this RPC client + * @param task to run at timeout + * @param delay for the timeout + * @param unit time unit for the timeout + * @return Timeout + */ Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { return WHEEL_TIMER.newTimeout(task, delay, unit); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java index 527ac95663a..a85225ad31b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java @@ -41,7 +41,7 @@ public interface MessageConverter { MessageConverter NO_CONVERTER = new MessageConverter() { @Override public Message convert(Message msg, CellScanner cellScanner) throws IOException { - return null; + return msg; } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index f77f1ec9243..9d05c210b2e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -18,7 +18,8 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.BlockingRpcChannel; - +import com.google.protobuf.RpcChannel; +import io.netty.util.concurrent.EventExecutor; import java.io.Closeable; import java.io.IOException; @@ -68,6 +69,30 @@ import org.apache.hadoop.hbase.security.User; BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) throws IOException; + /** + * Create or fetch AsyncRpcChannel + * @param serviceName to connect to + * @param sn ServerName of the channel to create + * @param user for the service + * @return An async RPC channel fitting given parameters + * @throws FailedServerException if server failed + * @throws StoppedRpcClientException if the RPC client has stopped + */ + AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user) + throws StoppedRpcClientException, FailedServerException; + + /** + * Creates a "channel" that can be used by a protobuf service. Useful setting up + * protobuf stubs. + * + * @param sn server name describing location of server + * @param user which is to use the connection + * @param rpcTimeout default rpc operation timeout + * + * @return A rpc channel that goes via this rpc client instance. + */ + RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout); + /** * Interrupt the connections to the given server. This should be called if the server * is known as actually dead. This will not prevent current operation to be retried, and, @@ -91,4 +116,10 @@ import org.apache.hadoop.hbase.security.User; * supports cell blocks. */ boolean hasCellBlockSupport(); + + /** + * Get an event loop to operate on + * @return EventLoop + */ + EventExecutor getEventExecutor(); } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 7b2500c1404..a5d2482415d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -22,6 +22,9 @@ import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; +import io.netty.util.concurrent.EventExecutor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -51,7 +54,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import javax.net.SocketFactory; import javax.security.sasl.SaslException; @@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; @@ -118,9 +121,7 @@ public class RpcClientImpl extends AbstractRpcClient { protected final SocketFactory socketFactory; // how to create sockets protected final static Map> tokenHandlers = - new HashMap>(); + TokenSelector> tokenHandlers = new HashMap<>(); static { tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector()); @@ -1217,7 +1218,13 @@ public class RpcClientImpl extends AbstractRpcClient { } } - /** Make a call, passing param, to the IPC server running at + @Override + public EventExecutor getEventExecutor() { + return AsyncRpcClient.getGlobalEventLoopGroup(this.conf).getFirst().next(); + } + + /** + * Make a call, passing param, to the IPC server running at * address which is servicing the protocol protocol, * with the ticket credentials, returning the value. * Throws exceptions if there are network problems or if the remote code @@ -1226,7 +1233,7 @@ public class RpcClientImpl extends AbstractRpcClient { * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a * new Connection each time. * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException if the call is interupted + * @throws InterruptedException if the call is interrupted * @throws IOException if something fails on the connection */ @Override @@ -1237,10 +1244,35 @@ public class RpcClientImpl extends AbstractRpcClient { if (pcrc == null) { pcrc = new PayloadCarryingRpcController(); } + + Call call = this.call(md, param, returnType, pcrc, ticket, addr, callStats); + + return new Pair<>(call.response, call.cells); + } + + + /** + * Make a call, passing param, to the IPC server running at + * address which is servicing the protocol protocol, + * with the ticket credentials, returning the value. + * Throws exceptions if there are network problems or if the remote code + * threw an exception. + * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. + * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a + * new Connection each time. + * @return A Call + * @throws InterruptedException if the call is interrupted + * @throws IOException if something fails on the connection + */ + private Call call(MethodDescriptor method, Message request, + R responsePrototype, PayloadCarryingRpcController pcrc, User ticket, + InetSocketAddress addr, MetricsConnection.CallStats callStats) + throws IOException, InterruptedException { + CellScanner cells = pcrc.cellScanner(); - final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType, - pcrc.getCallTimeout(), MetricsConnection.newCallStats()); + final Call call = new Call(callIdCnt.getAndIncrement(), method, request, cells, + responsePrototype, pcrc.getCallTimeout(), callStats); final Connection connection = getConnection(ticket, call, addr); @@ -1256,7 +1288,7 @@ public class RpcClientImpl extends AbstractRpcClient { if (pcrc.isCanceled()) { // To finish if the call was cancelled before we set the notification (race condition) call.callComplete(); - return new Pair<>(call.response, call.cells); + return call; } } else { cts = null; @@ -1299,9 +1331,19 @@ public class RpcClientImpl extends AbstractRpcClient { throw wrapException(addr, call.error); } - return new Pair<>(call.response, call.cells); + return call; } + @Override + public org.apache.hadoop.hbase.ipc.AsyncRpcChannel createRpcChannel(String serviceName, + ServerName sn, User user) throws StoppedRpcClientException, FailedServerException { + return new AsyncRpcChannel(sn, user); + } + + @Override + public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) { + return new RpcChannelImplementation(sn, user, rpcTimeout); + } /** * Interrupt the connections to the given ip:port server. This should be called if the server @@ -1349,4 +1391,143 @@ public class RpcClientImpl extends AbstractRpcClient { return connection; } + + /** + * Simulated async call + */ + private class RpcChannelImplementation implements RpcChannel { + private final InetSocketAddress isa; + private final User ticket; + private final int channelOperationTimeout; + private final EventExecutor executor; + + /** + * @param channelOperationTimeout - the default timeout when no timeout is given + */ + protected RpcChannelImplementation( + final ServerName sn, final User ticket, int channelOperationTimeout) { + this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); + this.ticket = ticket; + this.channelOperationTimeout = channelOperationTimeout; + + this.executor = RpcClientImpl.this.getEventExecutor(); + } + + @Override + public void callMethod(final MethodDescriptor method, RpcController controller, + final Message request, final Message responsePrototype, final RpcCallback done) { + final PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController( + controller, + channelOperationTimeout); + + executor.execute(new Runnable() { + @Override + public void run() { + try { + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(EnvironmentEdgeManager.currentTime()); + Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs); + cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(method, request, cs); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); + } + + done.run(call.response); + } catch (IOException e) { + pcrc.setFailed(e); + } catch (InterruptedException e) { + pcrc.startCancel(); + } + } + }); + } + } + + /** + * Wraps the call in an async channel. + */ + private class AsyncRpcChannel implements org.apache.hadoop.hbase.ipc.AsyncRpcChannel { + private final EventExecutor executor; + private final InetSocketAddress isa; + + private final User ticket; + + /** + * Constructor + * @param sn servername to connect to + * @param user to connect with + */ + public AsyncRpcChannel(ServerName sn, User user) { + this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); + this.executor = RpcClientImpl.this.getEventExecutor(); + this.ticket = user; + } + + @Override + @SuppressWarnings("unchecked") + public Future callMethod(final MethodDescriptor method, + final Message request, CellScanner cellScanner, final R responsePrototype, + final MessageConverter messageConverter, + final IOExceptionConverter exceptionConverter, long rpcTimeout, int priority) { + final PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(cellScanner); + pcrc.setPriority(priority); + pcrc.setCallTimeout((int) rpcTimeout); + + final Promise promise = new Promise<>(executor); + + executor.execute(new Runnable() { + @Override + public void run() { + try { + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(EnvironmentEdgeManager.currentTime()); + Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs); + cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(method, request, cs); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); + } + + promise.setSuccess( + messageConverter.convert((R) call.response, call.cells) + ); + } catch (InterruptedException e) { + promise.cancel(true); + } catch (IOException e) { + if(exceptionConverter != null) { + e = exceptionConverter.convert(e); + } + promise.setFailure(e); + } + } + }); + + return promise; + } + + @Override + public EventExecutor getEventExecutor() { + return this.executor; + } + + @Override + public void close(Throwable cause) { + this.executor.shutdownGracefully(); + } + + @Override + public boolean isAlive() { + return !this.executor.isShuttingDown() && !this.executor.isShutdown(); + } + + @Override + public InetSocketAddress getAddress() { + return isa; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 69c8fe246f4..ceb945b9fcb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyObject; @@ -25,6 +26,15 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import java.io.IOException; import java.net.ConnectException; import java.net.InetAddress; @@ -32,7 +42,9 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; - +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,9 +52,12 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; @@ -60,15 +75,6 @@ import org.apache.hadoop.util.StringUtils; import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** * Some basic ipc tests. */ @@ -76,8 +82,10 @@ public abstract class AbstractTestIPC { private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class); - private static byte[] CELL_BYTES = Bytes.toBytes("xyz"); - private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); + private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); static byte[] BIG_CELL_BYTES = new byte[10 * 1024]; static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); static final Configuration CONF = HBaseConfiguration.create(); @@ -114,7 +122,7 @@ public abstract class AbstractTestIPC { CellScanner cellScanner = pcrc.cellScanner(); List list = null; if (cellScanner != null) { - list = new ArrayList(); + list = new ArrayList<>(); try { while (cellScanner.advance()) { list.add(cellScanner.current()); @@ -168,9 +176,8 @@ public abstract class AbstractTestIPC { @Test public void testNoCodec() throws InterruptedException, IOException { Configuration conf = HBaseConfiguration.create(); - AbstractRpcClient client = createRpcClientNoCodec(conf); TestRpcServer rpcServer = new TestRpcServer(); - try { + try (AbstractRpcClient client = createRpcClientNoCodec(conf)) { rpcServer.start(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); final String message = "hello"; @@ -186,7 +193,6 @@ public abstract class AbstractTestIPC { // Silly assertion that the message is in the returned pb. assertTrue(r.getFirst().toString().contains(message)); } finally { - client.close(); rpcServer.stop(); } } @@ -207,14 +213,13 @@ public abstract class AbstractTestIPC { NoSuchMethodException, ServiceException { Configuration conf = new Configuration(HBaseConfiguration.create()); conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); - List cells = new ArrayList(); + List cells = new ArrayList<>(); int count = 3; for (int i = 0; i < count; i++) { cells.add(CELL); } - AbstractRpcClient client = createRpcClient(conf); TestRpcServer rpcServer = new TestRpcServer(); - try { + try (AbstractRpcClient client = createRpcClient(conf)) { rpcServer.start(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -234,7 +239,6 @@ public abstract class AbstractTestIPC { } assertEquals(count, index); } finally { - client.close(); rpcServer.stop(); } } @@ -246,8 +250,7 @@ public abstract class AbstractTestIPC { public void testRTEDuringConnectionSetup() throws Exception { Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); - AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf); - try { + try (AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf)) { rpcServer.start(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); @@ -262,7 +265,6 @@ public abstract class AbstractTestIPC { LOG.info("Caught expected exception: " + e.toString()); assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); } finally { - client.close(); rpcServer.stop(); } } @@ -332,7 +334,7 @@ public abstract class AbstractTestIPC { */ static class TestRpcServer1 extends RpcServer { - private static BlockingInterface SERVICE1 = + private static final BlockingInterface SERVICE1 = new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { @Override public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) @@ -378,26 +380,22 @@ public abstract class AbstractTestIPC { final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1); final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler); final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); - final AbstractRpcClient client = - new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null); - try { + try (AbstractRpcClient client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, + localAddr, null)) { rpcServer.start(); final InetSocketAddress isa = rpcServer.getListenerAddress(); if (isa == null) { throw new IOException("Listener channel is closed"); } - final BlockingRpcChannel channel = - client.createBlockingRpcChannel( - ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + final BlockingRpcChannel channel = client.createBlockingRpcChannel( + ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + BlockingInterface stub = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); final EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build(); final EchoResponseProto echoResponse = stub.echo(null, echoRequest); Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage()); } finally { - client.close(); rpcServer.stop(); } } @@ -416,4 +414,141 @@ public abstract class AbstractTestIPC { .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) .getCause() instanceof CallTimeoutException); } + + @Test + public void testAsyncProtobufConnectionSetup() throws Exception { + TestRpcServer rpcServer = new TestRpcServer(); + try (RpcClient client = createRpcClient(CONF)) { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + RpcChannel channel = client.createProtobufRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + + final AtomicBoolean done = new AtomicBoolean(false); + + channel + .callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType().toProto(), + new com.google.protobuf.RpcCallback() { + @Override + public void run(Message parameter) { + done.set(true); + } + }); + + TEST_UTIL.waitFor(1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return done.get(); + } + }); + } finally { + rpcServer.stop(); + } + } + + @Test + public void testRTEDuringAsyncProtobufConnectionSetup() throws Exception { + TestRpcServer rpcServer = new TestRpcServer(); + try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + RpcChannel channel = client.createProtobufRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + + final AtomicBoolean done = new AtomicBoolean(false); + + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.notifyOnFail(new com.google.protobuf.RpcCallback() { + @Override + public void run(IOException e) { + done.set(true); + LOG.info("Caught expected exception: " + e.toString()); + assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + } + }); + + channel.callMethod(md, controller, param, md.getOutputType().toProto(), + new com.google.protobuf.RpcCallback() { + @Override + public void run(Message parameter) { + done.set(true); + fail("Expected an exception to have been thrown!"); + } + }); + + TEST_UTIL.waitFor(1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return done.get(); + } + }); + } finally { + rpcServer.stop(); + } + } + + @Test + public void testAsyncConnectionSetup() throws Exception { + TestRpcServer rpcServer = new TestRpcServer(); + try (RpcClient client = createRpcClient(CONF)) { + rpcServer.start(); + Message msg = setupAsyncConnection(rpcServer, client); + + assertNotNull(msg); + } finally { + rpcServer.stop(); + } + } + + @Test + public void testRTEDuringAsyncConnectionSetup() throws Exception { + TestRpcServer rpcServer = new TestRpcServer(); + try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) { + rpcServer.start(); + setupAsyncConnection(rpcServer, client); + + fail("Expected an exception to have been thrown!"); + } catch (ExecutionException e) { + assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + } finally { + rpcServer.stop(); + } + } + + private Message setupAsyncConnection(TestRpcServer rpcServer, RpcClient client) + throws IOException, InterruptedException, ExecutionException, + java.util.concurrent.TimeoutException { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + ServerName serverName = + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()); + + AsyncRpcChannel channel = + client.createRpcChannel(md.getService().getName(), serverName, User.getCurrent()); + + final Future f = channel + .callMethod(md, param, null, md.getOutputType().toProto(), MessageConverter.NO_CONVERTER, + null, 1000, HConstants.NORMAL_QOS); + + return f.get(1, TimeUnit.SECONDS); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index b9d390aaaff..7efe1985469 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.MethodDescriptor; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -32,7 +32,6 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,12 +39,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; @@ -60,30 +56,20 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.util.StringUtils; -import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; - @RunWith(Parameterized.class) @Category({ RPCTests.class, SmallTests.class }) public class TestAsyncIPC extends AbstractTestIPC { private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - @Parameters public static Collection parameters() { - List paramList = new ArrayList(); + List paramList = new ArrayList<>(); paramList.add(new Object[] { false, false }); paramList.add(new Object[] { false, true }); paramList.add(new Object[] { true, false }); @@ -150,95 +136,6 @@ public class TestAsyncIPC extends AbstractTestIPC { }); } - @Test - public void testAsyncConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - AsyncRpcClient client = createRpcClient(CONF); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = - client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType() - .toProto(), new RpcCallback() { - @Override - public void run(Message parameter) { - done.set(true); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - client.close(); - rpcServer.stop(); - } - } - - @Test - public void testRTEDuringAsyncConnectionSetup() throws Exception { - TestRpcServer rpcServer = new TestRpcServer(); - AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = - client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - controller.notifyOnFail(new RpcCallback() { - @Override - public void run(IOException e) { - done.set(true); - LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); - } - }); - - channel.callMethod(md, controller, param, md.getOutputType().toProto(), - new RpcCallback() { - @Override - public void run(Message parameter) { - done.set(true); - fail("Expected an exception to have been thrown!"); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - client.close(); - rpcServer.stop(); - } - } - public static void main(String[] args) throws IOException, SecurityException, NoSuchMethodException, InterruptedException { if (args.length != 2) { @@ -253,7 +150,6 @@ public class TestAsyncIPC extends AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - AsyncRpcClient client = new AsyncRpcClient(conf); KeyValue kv = BIG_CELL; Put p = new Put(CellUtil.cloneRow(kv)); for (int i = 0; i < cellcount; i++) { @@ -261,7 +157,7 @@ public class TestAsyncIPC extends AbstractTestIPC { } RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); rm.add(p); - try { + try (AsyncRpcClient client = new AsyncRpcClient(conf)) { rpcServer.start(); InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { @@ -270,17 +166,17 @@ public class TestAsyncIPC extends AbstractTestIPC { long startTime = System.currentTimeMillis(); User user = User.getCurrent(); for (int i = 0; i < cycles; i++) { - List cells = new ArrayList(); + List cells = new ArrayList<>(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); + RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), + MutationProto.newBuilder()); builder.setRegion(RegionSpecifier .newBuilder() .setType(RegionSpecifierType.REGION_NAME) .setValue( - ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); + ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); if (i % 100000 == 0) { LOG.info("" + i); // Uncomment this for a thread dump every so often. @@ -300,7 +196,6 @@ public class TestAsyncIPC extends AbstractTestIPC { LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + (System.currentTimeMillis() - startTime) + "ms"); } finally { - client.close(); rpcServer.stop(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 3fc12590a91..56de07d2cd3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -22,6 +22,8 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.MethodDescriptor; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; @@ -59,9 +61,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; - @Category({ RPCTests.class, SmallTests.class }) public class TestIPC extends AbstractTestIPC { @@ -129,7 +128,7 @@ public class TestIPC extends AbstractTestIPC { throw new IOException("Listener channel is closed"); } for (int i = 0; i < cycles; i++) { - List cells = new ArrayList(); + List cells = new ArrayList<>(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,