diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java new file mode 100644 index 00000000000..09de8715438 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -0,0 +1,454 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.ipc.AsyncRpcClient; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcClientImpl; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import com.google.protobuf.Descriptors.MethodDescriptor; + +@Category(IntegrationTests.class) +public class IntegrationTestRpcClient { + + private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class); + + private final Configuration conf; + + private int numIterations = 10; + + public IntegrationTestRpcClient() { + conf = HBaseConfiguration.create(); + } + + static class TestRpcServer extends RpcServer { + + TestRpcServer(Configuration conf) throws IOException { + this(new FifoRpcScheduler(conf, 1), conf); + } + + TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { + super(null, "testRpcServer", Lists + .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( + "localhost", 0), conf, scheduler); + } + + @Override + public Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) + throws IOException { + return super.call(service, md, param, cellScanner, receiveTime, status); + } + } + + static final BlockingService SERVICE = + TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { + + @Override + public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public EchoResponseProto echo(RpcController controller, EchoRequestProto request) + throws ServiceException { + return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); + } + }); + + protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) { + return isSyncClient ? + new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) : + new AsyncRpcClient(conf) { + @Override + Codec getCodec() { + return null; + } + }; + } + + static String BIG_PAYLOAD; + + static { + StringBuilder builder = new StringBuilder(); + + while (builder.length() < 1024 * 1024) { // 2 MB + builder.append("big.payload."); + } + + BIG_PAYLOAD = builder.toString(); + } + + class Cluster { + Random random = new Random(); + ReadWriteLock lock = new ReentrantReadWriteLock(); + HashMap rpcServers = new HashMap<>(); + List serverList = new ArrayList<>(); + int maxServers; + int minServers; + + Cluster(int minServers, int maxServers) { + this.minServers = minServers; + this.maxServers = maxServers; + } + + TestRpcServer startServer() throws IOException { + lock.writeLock().lock(); + try { + if (rpcServers.size() >= maxServers) { + return null; + } + + TestRpcServer rpcServer = new TestRpcServer(conf); + rpcServer.start(); + rpcServers.put(rpcServer.getListenerAddress(), rpcServer); + serverList.add(rpcServer); + LOG.info("Started server: " + rpcServer.getListenerAddress()); + return rpcServer; + } finally { + lock.writeLock().unlock(); + } + } + + void stopRandomServer() throws Exception { + lock.writeLock().lock(); + TestRpcServer rpcServer = null; + try { + if (rpcServers.size() <= minServers) { + return; + } + int size = rpcServers.size(); + int rand = random.nextInt(size); + rpcServer = serverList.remove(rand); + rpcServers.remove(rpcServer.getListenerAddress()); + + if (rpcServer != null) { + stopServer(rpcServer); + } + } finally { + lock.writeLock().unlock(); + } + } + + void stopServer(TestRpcServer rpcServer) throws InterruptedException { + InetSocketAddress address = rpcServer.getListenerAddress(); + LOG.info("Stopping server: " + address); + rpcServer.stop(); + rpcServer.join(); + LOG.info("Stopped server: " + address); + } + + void stopRunning() throws InterruptedException { + lock.writeLock().lock(); + try { + for (TestRpcServer rpcServer : serverList) { + stopServer(rpcServer); + } + + } finally { + lock.writeLock().unlock(); + } + } + + TestRpcServer getRandomServer() { + lock.readLock().lock(); + try { + int size = rpcServers.size(); + int rand = random.nextInt(size); + return serverList.get(rand); + } finally { + lock.readLock().unlock(); + } + } + } + + static class MiniChaosMonkey extends Thread { + AtomicBoolean running = new AtomicBoolean(true); + Random random = new Random(); + AtomicReference exception = new AtomicReference<>(null); + Cluster cluster; + + public MiniChaosMonkey(Cluster cluster) { + this.cluster = cluster; + } + + @Override + public void run() { + while (running.get()) { + switch (random.nextInt() % 2) { + case 0: //start a server + try { + cluster.startServer(); + } catch (Exception e) { + LOG.warn(e); + exception.compareAndSet(null, e); + } + break; + + case 1: // stop a server + try { + cluster.stopRandomServer(); + } catch (Exception e) { + LOG.warn(e); + exception.compareAndSet(null, e); + } + default: + } + + Threads.sleep(100); + } + } + + void stopRunning() { + running.set(false); + } + + void rethrowException() throws Exception { + if (exception.get() != null) { + throw exception.get(); + } + } + } + + static class SimpleClient extends Thread { + AbstractRpcClient rpcClient; + AtomicBoolean running = new AtomicBoolean(true); + AtomicReference exception = new AtomicReference<>(null); + Cluster cluster; + String id; + long numCalls = 0; + Random random = new Random(); + + public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) { + this.cluster = cluster; + this.rpcClient = rpcClient; + this.id = id; + } + + @Override + public void run() { + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + + while (running.get()) { + boolean isBigPayload = random.nextBoolean(); + String message = isBigPayload ? BIG_PAYLOAD : id + numCalls; + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); + EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build(); + + TestRpcServer server = cluster.getRandomServer(); + try { + User user = User.getCurrent(); + ret = (EchoResponseProto) + rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress()); + } catch (Exception e) { + LOG.warn(e); + continue; // expected in case connection is closing or closed + } + + try { + assertNotNull(ret); + assertEquals(message, ret.getMessage()); + } catch (Throwable t) { + exception.compareAndSet(null, t); + } + + numCalls++; + } + } + + void stopRunning() { + running.set(false); + } + + void rethrowException() throws Throwable { + if (exception.get() != null) { + throw exception.get(); + } + } + } + + @Test (timeout = 900000) + public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable { + for (int i = 0; i < numIterations; i++) { + TimeoutThread.runWithTimeout(new Callable() { + @Override + public Void call() throws Exception { + try { + testRpcWithChaosMonkey(true); + } catch (Throwable e) { + if (e instanceof Exception) { + throw (Exception)e; + } else { + throw new Exception(e); + } + } + return null; + } + }, 90000); + } + } + + @Test (timeout = 900000) + @Ignore // TODO: test fails with async client + public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable { + for (int i = 0; i < numIterations; i++) { + TimeoutThread.runWithTimeout(new Callable() { + @Override + public Void call() throws Exception { + try { + testRpcWithChaosMonkey(false); + } catch (Throwable e) { + if (e instanceof Exception) { + throw (Exception)e; + } else { + throw new Exception(e); + } + } + return null; + } + }, 90000); + } + } + + static class TimeoutThread extends Thread { + long timeout; + public TimeoutThread(long timeout) { + this.timeout = timeout; + } + + @Override + public void run() { + try { + Thread.sleep(timeout); + Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP"); + System.exit(1); // a timeout happened + } catch (InterruptedException e) { + // this is what we want + } + } + + // runs in the same thread context but injects a timeout thread which will exit the JVM on + // timeout + static void runWithTimeout(Callable callable, long timeout) throws Exception { + TimeoutThread thread = new TimeoutThread(timeout); + thread.start(); + callable.call(); + thread.interrupt(); + } + } + + public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable { + LOG.info("Starting test"); + Cluster cluster = new Cluster(10, 100); + for (int i = 0; i < 10; i++) { + cluster.startServer(); + } + + ArrayList clients = new ArrayList<>(); + + // all threads should share the same rpc client + AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient); + + for (int i = 0; i < 30; i++) { + String clientId = "client_" + i + "_"; + LOG.info("Starting client: " + clientId); + SimpleClient client = new SimpleClient(cluster, rpcClient, clientId); + client.start(); + clients.add(client); + } + + LOG.info("Starting MiniChaosMonkey"); + MiniChaosMonkey cm = new MiniChaosMonkey(cluster); + cm.start(); + + Threads.sleep(30000); + + LOG.info("Stopping MiniChaosMonkey"); + cm.stopRunning(); + cm.join(); + cm.rethrowException(); + + LOG.info("Stopping clients"); + for (SimpleClient client : clients) { + LOG.info("Stopping client: " + client.id); + LOG.info(client.id + " numCalls:" + client.numCalls); + client.stopRunning(); + client.join(); + client.rethrowException(); + assertTrue(client.numCalls > 10); + } + + LOG.info("Stopping RpcClient"); + rpcClient.close(); + + LOG.info("Stopping Cluster"); + cluster.stopRunning(); + } +}