HBASE-13097 Use same EventLoopGroup for different AsyncRpcClients if possible

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
zhangduo 2015-02-27 15:03:46 +08:00 committed by stack
parent f670649f0e
commit d1619bceb3
5 changed files with 752 additions and 555 deletions

View File

@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -63,11 +64,12 @@ import com.google.protobuf.RpcController;
/** /**
* Netty client for the requests and responses * Netty client for the requests and responses
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class AsyncRpcClient extends AbstractRpcClient { public class AsyncRpcClient extends AbstractRpcClient {
public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.useNativeTransport"; public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
public static final HashedWheelTimer WHEEL_TIMER = public static final HashedWheelTimer WHEEL_TIMER =
new HashedWheelTimer(100, TimeUnit.MILLISECONDS); new HashedWheelTimer(100, TimeUnit.MILLISECONDS);
@ -82,12 +84,54 @@ public class AsyncRpcClient extends AbstractRpcClient {
protected final AtomicInteger callIdCnt = new AtomicInteger(); protected final AtomicInteger callIdCnt = new AtomicInteger();
private final EventLoopGroup eventLoopGroup;
private final PoolMap<Integer, AsyncRpcChannel> connections; private final PoolMap<Integer, AsyncRpcChannel> connections;
final FailedServers failedServers; final FailedServers failedServers;
private final Bootstrap bootstrap; @VisibleForTesting
final Bootstrap bootstrap;
private final boolean useGlobalEventLoopGroup;
@VisibleForTesting
static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
getGlobalEventLoopGroup(Configuration conf) {
if (GLOBAL_EVENT_LOOP_GROUP == null) {
GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Create global event loop group "
+ GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
}
}
return GLOBAL_EVENT_LOOP_GROUP;
}
private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
Configuration conf) {
// Max amount of threads to use. 0 lets Netty decide based on amount of cores
int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
// Config to enable native transport. Does not seem to be stable at time of implementation
// although it is not extensively tested.
boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
// Use the faster native epoll transport mechanism on linux if enabled
if (epollEnabled && JVM.isLinux()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
}
return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
}
return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
}
}
/** /**
* Constructor for tests * Constructor for tests
@ -106,23 +150,16 @@ public class AsyncRpcClient extends AbstractRpcClient {
LOG.debug("Starting async Hbase RPC client"); LOG.debug("Starting async Hbase RPC client");
} }
// Max amount of threads to use. 0 lets Netty decide based on amount of cores Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
if (useGlobalEventLoopGroup) {
// Config to enable native transport. Does not seem to be stable at time of implementation eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
// although it is not extensively tested.
boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
// Use the faster native epoll transport mechanism on linux if enabled
Class<? extends Channel> socketChannelClass;
if (epollEnabled && JVM.isLinux()) {
socketChannelClass = EpollSocketChannel.class;
this.eventLoopGroup =
new EpollEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel"));
} else { } else {
socketChannelClass = NioSocketChannel.class; eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
this.eventLoopGroup = }
new NioEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); if (LOG.isDebugEnabled()) {
LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
+ eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
} }
this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
@ -133,7 +170,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
// Configure the default bootstrap. // Configure the default bootstrap.
this.bootstrap = new Bootstrap(); this.bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(socketChannelClass) bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
.channel(eventLoopGroupAndChannelClass.getSecond())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.TCP_NODELAY, tcpNoDelay) .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
@ -176,6 +214,9 @@ public class AsyncRpcClient extends AbstractRpcClient {
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress addr) throws IOException, InterruptedException { InetSocketAddress addr) throws IOException, InterruptedException {
if (pcrc == null) {
pcrc = new PayloadCarryingRpcController();
}
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType); Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
@ -236,6 +277,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
} }
} }
private boolean closed = false;
/** /**
* Close netty * Close netty
*/ */
@ -245,12 +288,18 @@ public class AsyncRpcClient extends AbstractRpcClient {
} }
synchronized (connections) { synchronized (connections) {
if (closed) {
return;
}
closed = true;
for (AsyncRpcChannel conn : connections.values()) { for (AsyncRpcChannel conn : connections.values()) {
conn.close(null); conn.close(null);
} }
} }
// do not close global EventLoopGroup.
eventLoopGroup.shutdownGracefully(); if (!useGlobalEventLoopGroup) {
bootstrap.group().shutdownGracefully();
}
} }
/** /**
@ -287,10 +336,6 @@ public class AsyncRpcClient extends AbstractRpcClient {
*/ */
private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
User ticket) throws StoppedRpcClientException, FailedServerException { User ticket) throws StoppedRpcClientException, FailedServerException {
if (this.eventLoopGroup.isShuttingDown() || this.eventLoopGroup.isShutdown()) {
throw new StoppedRpcClientException();
}
// Check if server is failed // Check if server is failed
if (this.failedServers.isFailedServer(location)) { if (this.failedServers.isFailedServer(location)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -305,6 +350,9 @@ public class AsyncRpcClient extends AbstractRpcClient {
AsyncRpcChannel rpcChannel; AsyncRpcChannel rpcChannel;
synchronized (connections) { synchronized (connections) {
if (closed) {
throw new StoppedRpcClientException();
}
rpcChannel = connections.get(hashCode); rpcChannel = connections.get(hashCode);
if (rpcChannel == null) { if (rpcChannel == null) {
rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);

View File

@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Some basic ipc tests.
*/
public abstract class AbstractTestIPC {
private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
static final Configuration CONF = HBaseConfiguration.create();
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
// by passing in implementation of blocking interface. We use this service in all tests that
// follow.
static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto
.newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
throws ServiceException {
return null;
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws ServiceException {
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
// If cells, scan them to check we are able to iterate what we were given and since
// this is
// an echo, just put them back on the controller creating a new block. Tests our
// block
// building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<Cell>();
try {
while (cellScanner.advance()) {
list.add(cellScanner.current());
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
}
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
});
/**
* Instance of server. We actually don't do anything speical in here so could just use
* HBaseRpcServer directly.
*/
static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
}
TestRpcServer(RpcScheduler scheduler) throws IOException {
super(null, "testRpcServer", Lists
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
"localhost", 0), CONF, scheduler);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return super.call(service, md, param, cellScanner, receiveTime, status);
}
}
protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
/**
* Ensure we do not HAVE TO HAVE a codec.
* @throws InterruptedException
* @throws IOException
*/
@Test
public void testNoCodec() throws InterruptedException, IOException {
Configuration conf = HBaseConfiguration.create();
AbstractRpcClient client = createRpcClientNoCodec(conf);
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
Pair<Message, CellScanner> r =
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
assertTrue(r.getSecond() == null);
// Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message));
} finally {
client.close();
rpcServer.stop();
}
}
protected abstract AbstractRpcClient createRpcClient(Configuration conf);
/**
* It is hard to verify the compression is actually happening under the wraps. Hope that if
* unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
* confirm that compression is happening down in the client and server).
* @throws IOException
* @throws InterruptedException
* @throws SecurityException
* @throws NoSuchMethodException
*/
@Test
public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
NoSuchMethodException, ServiceException {
Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
List<Cell> cells = new ArrayList<Cell>();
int count = 3;
for (int i = 0; i < count; i++) {
cells.add(CELL);
}
AbstractRpcClient client = createRpcClient(conf);
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
Pair<Message, CellScanner> r =
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
int index = 0;
while (r.getSecond().advance()) {
assertTrue(CELL.equals(r.getSecond().current()));
index++;
}
assertEquals(count, index);
} finally {
client.close();
rpcServer.stop();
}
}
protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
throws IOException;
@Test
public void testRTEDuringConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
client.call(null, md, param, null, User.getCurrent(), address);
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
client.close();
rpcServer.stop();
}
}
/** Tests that the rpc scheduler is called when requests arrive. */
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject());
AbstractRpcClient client = createRpcClient(CONF);
try {
rpcServer.start();
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
for (int i = 0; i < 10; i++) {
client.call(
new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, md
.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress());
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally {
rpcServer.stop();
verify(scheduler).stop();
}
}
}

View File

@ -0,0 +1,298 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
@RunWith(Parameterized.class)
@Category({ RPCTests.class, SmallTests.class })
public class TestAsyncIPC extends AbstractTestIPC {
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Parameters
public static Collection<Object[]> parameters() {
List<Object[]> paramList = new ArrayList<Object[]>();
paramList.add(new Object[] { false, false });
paramList.add(new Object[] { false, true });
paramList.add(new Object[] { true, false });
paramList.add(new Object[] { true, true });
return paramList;
}
private final boolean useNativeTransport;
private final boolean useGlobalEventLoopGroup;
public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
this.useNativeTransport = useNativeTransport;
this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
}
private void setConf(Configuration conf) {
conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport);
conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useGlobalEventLoopGroup);
if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
if (useNativeTransport
&& !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
|| (!useNativeTransport
&& !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
}
}
}
@Override
protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf);
return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
@Override
Codec getCodec() {
return super.getCodec();
}
};
}
@Override
protected AsyncRpcClient createRpcClient(Configuration conf) {
setConf(conf);
return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
}
@Override
protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
setConf(conf);
return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
promise.setFailure(new RuntimeException("Injected fault"));
}
});
}
});
}
@Test
public void testAsyncConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
AsyncRpcClient client = createRpcClient(CONF);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel =
client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
System.currentTimeMillis()), User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType()
.toProto(), new RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
client.close();
rpcServer.stop();
}
}
@Test
public void testRTEDuringAsyncConnectionSetup() throws Exception {
TestRpcServer rpcServer = new TestRpcServer();
AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel =
client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
System.currentTimeMillis()), User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.notifyOnFail(new RpcCallback<IOException>() {
@Override
public void run(IOException e) {
done.set(true);
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
}
});
channel.callMethod(md, controller, param, md.getOutputType().toProto(),
new RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
fail("Expected an exception to have been thrown!");
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
client.close();
rpcServer.stop();
}
}
public static void main(String[] args) throws IOException, SecurityException,
NoSuchMethodException, InterruptedException {
if (args.length != 2) {
System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
return;
}
// ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
// ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
int cycles = Integer.parseInt(args[0]);
int cellcount = Integer.parseInt(args[1]);
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
KeyValue kv = BIG_CELL;
Put p = new Put(CellUtil.cloneRow(kv));
for (int i = 0; i < cellcount; i++) {
p.add(kv);
}
RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
rm.add(p);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder =
RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
MutationProto.newBuilder());
builder.setRegion(RegionSpecifier
.newBuilder()
.setType(RegionSpecifierType.REGION_NAME)
.setValue(
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
if (i % 100000 == 0) {
LOG.info("" + i);
// Uncomment this for a thread dump every so often.
// ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
// "Thread dump " + Thread.currentThread().getName());
}
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
// Pair<Message, CellScanner> response =
client.call(pcrc, md, builder.build(), param, user, address);
/*
* int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
* count);
*/
}
LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
+ (System.currentTimeMillis() - startTime) + "ms");
} finally {
client.close();
rpcServer.stop();
}
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RPCTests.class, SmallTests.class })
public class TestGlobalEventLoopGroup {
@Test
public void test() {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true);
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP);
AsyncRpcClient client1 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
assertSame(client.bootstrap.group(), client1.bootstrap.group());
client1.close();
assertFalse(client.bootstrap.group().isShuttingDown());
conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false);
AsyncRpcClient client2 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
assertNotSame(client.bootstrap.group(), client2.bootstrap.group());
client2.close();
client.close();
}
}

View File

@ -1,76 +1,48 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
@ -78,500 +50,55 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/** @Category({ RPCTests.class, SmallTests.class })
* Some basic ipc tests. public class TestIPC extends AbstractTestIPC {
*/
@Category({RPCTests.class, SmallTests.class})
public class TestIPC {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
public static final Log LOG = LogFactory.getLog(TestIPC.class); private static final Log LOG = LogFactory.getLog(TestIPC.class);
static byte [] CELL_BYTES = Bytes.toBytes("xyz"); @Override
static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
static byte [] BIG_CELL_BYTES = new byte [10 * 1024]; return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
private final static Configuration CONF = HBaseConfiguration.create();
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
// by passing in implementation of blocking interface. We use this service in all tests that
// follow.
private static final BlockingService SERVICE =
TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
@Override
public EmptyResponseProto ping(RpcController controller,
EmptyRequestProto request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public EmptyResponseProto error(RpcController controller,
EmptyRequestProto request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
throws ServiceException {
if (controller instanceof PayloadCarryingRpcController) {
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
// If cells, scan them to check we are able to iterate what we were given and since this is
// an echo, just put them back on the controller creating a new block. Tests our block
// building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<Cell>();
try {
while(cellScanner.advance()) {
list.add(cellScanner.current());
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController)controller).setCellScanner(cellScanner);
}
return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
}
});
/**
* Instance of server. We actually don't do anything speical in here so could just use
* HBaseRpcServer directly.
*/
private static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
}
TestRpcServer(RpcScheduler scheduler) throws IOException {
super(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, scheduler);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service,
MethodDescriptor md, Message param, CellScanner cellScanner,
long receiveTime, MonitoredRPCHandler status) throws IOException {
return super.call(service, md, param, cellScanner, receiveTime, status);
}
}
/**
* Ensure we do not HAVE TO HAVE a codec.
* @throws InterruptedException
* @throws IOException
*/
@Test
public void testNoCodec() throws InterruptedException, IOException {
Configuration conf = HBaseConfiguration.create();
RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
@Override @Override
Codec getCodec() { Codec getCodec() {
return null; return null;
} }
}; };
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
Pair<Message, CellScanner> r = client.call(null, md, param,
md.getOutputType().toProto(), User.getCurrent(), address);
assertTrue(r.getSecond() == null);
// Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message));
} finally {
client.close();
rpcServer.stop();
}
} }
/** @Override
* Ensure we do not HAVE TO HAVE a codec. protected RpcClientImpl createRpcClient(Configuration conf) {
* return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
* @throws InterruptedException
* @throws IOException
*/
@Test public void testNoCodecAsync() throws InterruptedException, IOException, ServiceException {
Configuration conf = HBaseConfiguration.create();
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
@Override Codec getCodec() {
return null;
}
};
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
BlockingRpcChannel channel = client
.createBlockingRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
System.currentTimeMillis()), User.getCurrent(), 0);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
Message response =
channel.callBlockingMethod(md, controller, param, md.getOutputType().toProto());
assertTrue(controller.cellScanner() == null);
// Silly assertion that the message is in the returned pb.
assertTrue(response.toString().contains(message));
} finally {
client.close();
rpcServer.stop();
}
} }
/** @Override
* It is hard to verify the compression is actually happening under the wraps. Hope that if protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf)
* unsupported, we'll get an exception out of some time (meantime, have to trace it manually throws IOException {
* to confirm that compression is happening down in the client and server).
* @throws IOException
* @throws InterruptedException
* @throws SecurityException
* @throws NoSuchMethodException
*/
@Test
public void testCompressCellBlock()
throws IOException, InterruptedException, SecurityException, NoSuchMethodException,
ServiceException {
Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
doSimpleTest(new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT));
// Another test for the async client
doAsyncSimpleTest(new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null));
}
private void doSimpleTest(final RpcClientImpl client)
throws InterruptedException, IOException {
TestRpcServer rpcServer = new TestRpcServer();
List<Cell> cells = new ArrayList<Cell>();
int count = 3;
for (int i = 0; i < count; i++) cells.add(CELL);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
Pair<Message, CellScanner> r = client
.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
int index = 0;
while (r.getSecond().advance()) {
assertTrue(CELL.equals(r.getSecond().current()));
index++;
}
assertEquals(count, index);
} finally {
client.close();
rpcServer.stop();
}
}
private void doAsyncSimpleTest(final AsyncRpcClient client)
throws InterruptedException, IOException, ServiceException {
TestRpcServer rpcServer = new TestRpcServer();
List<Cell> cells = new ArrayList<Cell>();
int count = 3;
for (int i = 0; i < count; i++)
cells.add(CELL);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
BlockingRpcChannel channel = client.createBlockingRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
channel.callBlockingMethod(md, pcrc, param, md.getOutputType().toProto());
CellScanner cellScanner = pcrc.cellScanner();
int index = 0;
while (cellScanner.advance()) {
assertTrue(CELL.equals(cellScanner.current()));
index++;
}
assertEquals(count, index);
} finally {
client.close();
rpcServer.stop();
}
}
@Test
public void testRTEDuringConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();
SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf)); SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
Mockito.doAnswer(new Answer<Socket>() { Mockito.doAnswer(new Answer<Socket>() {
@Override @Override
public Socket answer(InvocationOnMock invocation) throws Throwable { public Socket answer(InvocationOnMock invocation) throws Throwable {
Socket s = spy((Socket)invocation.callRealMethod()); Socket s = spy((Socket) invocation.callRealMethod());
doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt()); doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
return s; return s;
} }
}).when(spyFactory).createSocket(); }).when(spyFactory).createSocket();
TestRpcServer rpcServer = new TestRpcServer(); return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
client.call(null, md, param, null, User.getCurrent(), address);
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
client.close();
rpcServer.stop();
}
} }
@Test public static void main(String[] args) throws IOException, SecurityException,
public void testRTEDuringAsyncBlockingConnectionSetup() throws Exception { NoSuchMethodException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
promise.setFailure(new RuntimeException("Injected fault"));
}
});
}
});
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
BlockingRpcChannel channel = client.createBlockingRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
channel.callBlockingMethod(md, new PayloadCarryingRpcController(), param,
md.getOutputType().toProto());
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
client.close();
rpcServer.stop();
}
}
@Test
public void testRTEDuringAsyncConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
promise.setFailure(new RuntimeException("Injected fault"));
}
});
}
});
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel = client.createRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.notifyOnFail(new RpcCallback<IOException>() {
@Override
public void run(IOException e) {
done.set(true);
LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
}
});
channel.callMethod(md, controller, param,
md.getOutputType().toProto(), new RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
fail("Expected an exception to have been thrown!");
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
client.close();
rpcServer.stop();
}
}
@Test
public void testAsyncConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcChannel channel = client.createRpcChannel(
ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 0);
final AtomicBoolean done = new AtomicBoolean(false);
channel.callMethod(md, new PayloadCarryingRpcController(), param,
md.getOutputType().toProto(), new RpcCallback<Message>() {
@Override
public void run(Message parameter) {
done.set(true);
}
});
TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return done.get();
}
});
} finally {
client.close();
rpcServer.stop();
}
}
/** Tests that the rpc scheduler is called when requests arrive. */
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject());
RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
try {
rpcServer.start();
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
for (int i = 0; i < 10; i++) {
client.call(
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))),
md, param, md.getOutputType().toProto(), User.getCurrent(),
rpcServer.getListenerAddress());
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally {
rpcServer.stop();
verify(scheduler).stop();
}
}
/**
* Tests that the rpc scheduler is called when requests arrive.
*/
@Test
public void testRpcSchedulerAsync()
throws IOException, InterruptedException, ServiceException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject());
AbstractRpcClient client = new AsyncRpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT, null);
try {
rpcServer.start();
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
ServerName serverName = ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
rpcServer.getListenerAddress().getPort(), System.currentTimeMillis());
for (int i = 0; i < 10; i++) {
BlockingRpcChannel channel = client.createBlockingRpcChannel(
serverName, User.getCurrent(), 0);
channel.callBlockingMethod(md,
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))),
param, md.getOutputType().toProto());
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally {
rpcServer.stop();
verify(scheduler).stop();
}
}
public static void main(String[] args)
throws IOException, SecurityException, NoSuchMethodException, InterruptedException {
if (args.length != 2) { if (args.length != 2) {
System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>"); System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
return; return;
@ -585,12 +112,12 @@ public class TestIPC {
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL); KeyValue kv = BIG_CELL;
Put p = new Put(kv.getRow()); Put p = new Put(CellUtil.cloneRow(kv));
for (int i = 0; i < cellcount; i++) { for (int i = 0; i < cellcount; i++) {
p.add(kv); p.add(kv);
} }
RowMutations rm = new RowMutations(kv.getRow()); RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
rm.add(p); rm.add(p);
try { try {
rpcServer.start(); rpcServer.start();
@ -600,35 +127,36 @@ public class TestIPC {
for (int i = 0; i < cycles; i++) { for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>(); List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction( ClientProtos.RegionAction.Builder builder =
HConstants.EMPTY_BYTE_ARRAY, rm, cells, RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
RegionAction.newBuilder(), RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
ClientProtos.Action.newBuilder(), MutationProto.newBuilder());
MutationProto.newBuilder()); builder.setRegion(RegionSpecifier
builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME). .newBuilder()
setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); .setType(RegionSpecifierType.REGION_NAME)
.setValue(
ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
if (i % 100000 == 0) { if (i % 100000 == 0) {
LOG.info("" + i); LOG.info("" + i);
// Uncomment this for a thread dump every so often. // Uncomment this for a thread dump every so often.
// ReflectionUtils.printThreadInfo(new PrintWriter(System.out), // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
// "Thread dump " + Thread.currentThread().getName()); // "Thread dump " + Thread.currentThread().getName());
} }
PayloadCarryingRpcController pcrc = PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
Pair<Message, CellScanner> response = // Pair<Message, CellScanner> response =
client.call(pcrc, md, builder.build(), param, user, address); client.call(pcrc, md, builder.build(), param, user, address);
/* /*
int count = 0; * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
while (p.getSecond().advance()) { * count);
count++; */
}
assertEquals(cells.size(), count);*/
} }
LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
(System.currentTimeMillis() - startTime) + "ms"); + (System.currentTimeMillis() - startTime) + "ms");
} finally { } finally {
client.close(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }
} }