From 69b195d619fdc4b00c912e61879e689dd33d89e7 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Mon, 29 Feb 2016 11:41:00 -0800 Subject: [PATCH] HADOOP-12813. Migrate TestRPC and related codes to rebase on ProtobufRpcEngine. Contributed by Kai Zheng. --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/ipc/RPCCallBenchmark.java | 4 +- .../ipc/TestMultipleProtocolServer.java | 14 +- .../apache/hadoop/ipc/TestProtoBufRpc.java | 139 +-- .../java/org/apache/hadoop/ipc/TestRPC.java | 1087 ++++++++--------- .../hadoop/ipc/TestRPCServerShutdown.java | 106 ++ .../org/apache/hadoop/ipc/TestRpcBase.java | 295 +++++ .../hadoop-common/src/test/proto/test.proto | 33 + .../src/test/proto/test_rpc_service.proto | 7 + .../server/nodemanager/TestNMAuditLogger.java | 40 +- .../resourcemanager/TestRMAuditLogger.java | 44 +- 11 files changed, 1039 insertions(+), 733 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCServerShutdown.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 03a844f8438..8ef0723a340 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1055,6 +1055,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12846. Credential Provider Recursive Dependencies. (Larry McCay via cnauroth) + HADOOP-12813. Migrate TestRPC and related codes to rebase on + ProtobufRpcEngine. (Kai Zheng via wheat9) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java index 6400e872387..eb7b9497092 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java @@ -34,8 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.RPC.Server; -import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; -import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; @@ -54,7 +52,7 @@ * Benchmark for protobuf RPC. * Run with --help option for usage. */ -public class RPCCallBenchmark implements Tool { +public class RPCCallBenchmark extends TestRpcBase implements Tool { private Configuration conf; private AtomicLong callCount = new AtomicLong(0); private static ThreadMXBean threadBean = diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java index 29a293f2165..8b419e36d4a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -23,8 +23,6 @@ import org.junit.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; -import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.net.NetUtils; import org.junit.Before; @@ -32,8 +30,7 @@ import org.junit.Test; import com.google.protobuf.BlockingService; -public class TestMultipleProtocolServer { - private static final String ADDRESS = "0.0.0.0"; +public class TestMultipleProtocolServer extends TestRpcBase { private static InetSocketAddress addr; private static RPC.Server server; @@ -64,13 +61,12 @@ interface Mixin extends VersionedProtocol{ public static final long versionID = 0L; void hello() throws IOException; } + interface Bar extends Mixin { public static final long versionID = 0L; int echo(int i) throws IOException; } - - class Foo0Impl implements Foo0 { @Override @@ -185,8 +181,7 @@ public void setUp() throws Exception { // Add Protobuf server // Create server side implementation - PBServerImpl pbServerImpl = - new PBServerImpl(); + PBServerImpl pbServerImpl = new PBServerImpl(); BlockingService service = TestProtobufRpcProto .newReflectiveBlockingService(pbServerImpl); server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, @@ -241,8 +236,7 @@ public void testNonExistingProtocol() throws IOException { FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); foo.ping(); } - - + /** * getProtocolVersion of an unimplemented version should return highest version * Similarly getProtocolSignature should work. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index d9c9d6c14bc..41ae910cbaf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -17,15 +17,9 @@ */ package org.apache.hadoop.ipc; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; - +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -36,76 +30,37 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto; -import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; -import org.junit.Assert; -import org.junit.Test; -import org.junit.Before; import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import java.io.IOException; +import java.net.URISyntaxException; -import com.google.protobuf.BlockingService; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test for testing protocol buffer based RPC mechanism. * This test depends on test.proto definition of types in src/test/proto * and protobuf service definition from src/test/test_rpc_service.proto */ -public class TestProtoBufRpc { - public final static String ADDRESS = "0.0.0.0"; - public final static int PORT = 0; - private static InetSocketAddress addr; - private static Configuration conf; +public class TestProtoBufRpc extends TestRpcBase { private static RPC.Server server; private final static int SLEEP_DURATION = 1000; - @ProtocolInfo(protocolName = "testProto", protocolVersion = 1) - public interface TestRpcService - extends TestProtobufRpcProto.BlockingInterface { - } - @ProtocolInfo(protocolName = "testProto2", protocolVersion = 1) public interface TestRpcService2 extends TestProtobufRpc2Proto.BlockingInterface { } - public static class PBServerImpl implements TestRpcService { - - @Override - public EmptyResponseProto ping(RpcController unused, - EmptyRequestProto request) throws ServiceException { - // Ensure clientId is received - byte[] clientId = Server.getClientId(); - Assert.assertNotNull(Server.getClientId()); - Assert.assertEquals(16, clientId.length); - return EmptyResponseProto.newBuilder().build(); - } - - @Override - public EchoResponseProto echo(RpcController unused, EchoRequestProto request) - throws ServiceException { - return EchoResponseProto.newBuilder().setMessage(request.getMessage()) - .build(); - } - - @Override - public EmptyResponseProto error(RpcController unused, - EmptyRequestProto request) throws ServiceException { - throw new ServiceException("error", new RpcServerException("error")); - } - - @Override - public EmptyResponseProto error2(RpcController unused, - EmptyRequestProto request) throws ServiceException { - throw new ServiceException("error", new URISyntaxException("", - "testException")); - } - } - public static class PBServer2Impl implements TestRpcService2 { @Override @@ -133,12 +88,13 @@ public TestProtos.SleepResponseProto sleep(RpcController controller, } @Before - public void setUp() throws IOException { // Setup server for both protocols + public void setUp() throws IOException { // Setup server for both protocols conf = new Configuration(); conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024); conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true); // Set RPC engine to protobuf RPC engine RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, TestRpcService2.class, ProtobufRpcEngine.class); // Create server side implementation PBServerImpl serverImpl = new PBServerImpl(); @@ -149,12 +105,12 @@ public void setUp() throws IOException { // Setup server for both protocols server = new RPC.Builder(conf).setProtocol(TestRpcService.class) .setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build(); addr = NetUtils.getConnectAddress(server); - + // now the second protocol PBServer2Impl server2Impl = new PBServer2Impl(); BlockingService service2 = TestProtobufRpc2Proto .newReflectiveBlockingService(server2Impl); - + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, service2); server.start(); @@ -166,31 +122,20 @@ public void tearDown() throws Exception { server.stop(); } - private static TestRpcService getClient() throws IOException { - // Set RPC engine to protobuf RPC engine - RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); - return RPC.getProxy(TestRpcService.class, 0, addr, conf); - } - - private static TestRpcService2 getClient2() throws IOException { - // Set RPC engine to protobuf RPC engine - RPC.setProtocolEngine(conf, TestRpcService2.class, - ProtobufRpcEngine.class); - return RPC.getProxy(TestRpcService2.class, 0, addr, - conf); + private TestRpcService2 getClient2() throws IOException { + return RPC.getProxy(TestRpcService2.class, 0, addr, conf); } @Test (timeout=5000) public void testProtoBufRpc() throws Exception { - TestRpcService client = getClient(); + TestRpcService client = getClient(addr, conf); testProtoBufRpc(client); } // separated test out so that other tests can call it. public static void testProtoBufRpc(TestRpcService client) throws Exception { // Test ping method - EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); - client.ping(null, emptyRequest); + client.ping(null, newEmptyRequest()); // Test echo method EchoRequestProto echoRequest = EchoRequestProto.newBuilder() @@ -200,7 +145,7 @@ public static void testProtoBufRpc(TestRpcService client) throws Exception { // Test error method - error should be thrown as RemoteException try { - client.error(null, emptyRequest); + client.error(null, newEmptyRequest()); Assert.fail("Expected exception is not thrown"); } catch (ServiceException e) { RemoteException re = (RemoteException)e.getCause(); @@ -217,13 +162,11 @@ public void testProtoBufRpc2() throws Exception { TestRpcService2 client = getClient2(); // Test ping method - EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); - client.ping2(null, emptyRequest); + client.ping2(null, newEmptyRequest()); // Test echo method - EchoRequestProto echoRequest = EchoRequestProto.newBuilder() - .setMessage("hello").build(); - EchoResponseProto echoResponse = client.echo2(null, echoRequest); + EchoResponseProto echoResponse = client.echo2(null, + newEchoRequest("hello")); Assert.assertEquals(echoResponse.getMessage(), "hello"); // Ensure RPC metrics are updated @@ -238,11 +181,10 @@ public void testProtoBufRpc2() throws Exception { @Test (timeout=5000) public void testProtoBufRandomException() throws Exception { - TestRpcService client = getClient(); - EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); + TestRpcService client = getClient(addr, conf); try { - client.error2(null, emptyRequest); + client.error2(null, newEmptyRequest()); } catch (ServiceException se) { Assert.assertTrue(se.getCause() instanceof RemoteException); RemoteException re = (RemoteException) se.getCause(); @@ -258,17 +200,14 @@ public void testProtoBufRandomException() throws Exception { public void testExtraLongRpc() throws Exception { TestRpcService2 client = getClient2(); final String shortString = StringUtils.repeat("X", 4); - EchoRequestProto echoRequest = EchoRequestProto.newBuilder() - .setMessage(shortString).build(); // short message goes through - EchoResponseProto echoResponse = client.echo2(null, echoRequest); + EchoResponseProto echoResponse = client.echo2(null, + newEchoRequest(shortString)); Assert.assertEquals(shortString, echoResponse.getMessage()); final String longString = StringUtils.repeat("X", 4096); - echoRequest = EchoRequestProto.newBuilder() - .setMessage(longString).build(); try { - echoResponse = client.echo2(null, echoRequest); + client.echo2(null, newEchoRequest(longString)); Assert.fail("expected extra-long RPC to fail"); } catch (ServiceException se) { // expected @@ -281,8 +220,7 @@ public void testLogSlowRPC() throws IOException, ServiceException { // make 10 K fast calls for (int x = 0; x < 10000; x++) { try { - EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); - client.ping2(null, emptyRequest); + client.ping2(null, newEmptyRequest()); } catch (Exception ex) { throw ex; } @@ -294,10 +232,7 @@ public void testLogSlowRPC() throws IOException, ServiceException { long before = rpcMetrics.getRpcSlowCalls(); // make a really slow call. Sleep sleeps for 1000ms - TestProtos.SleepRequestProto sleepRequest = - TestProtos.SleepRequestProto.newBuilder() - .setMilliSeconds(SLEEP_DURATION * 3).build(); - TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest); + client.sleep(null, newSleepRequest(SLEEP_DURATION * 3)); long after = rpcMetrics.getRpcSlowCalls(); // Ensure slow call is logged. @@ -312,8 +247,7 @@ public void testEnsureNoLogIfDisabled() throws IOException, ServiceException { // make 10 K fast calls for (int x = 0; x < 10000; x++) { - EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); - client.ping2(null, emptyRequest); + client.ping2(null, newEmptyRequest()); } // Ensure RPC metrics are updated @@ -322,10 +256,7 @@ public void testEnsureNoLogIfDisabled() throws IOException, ServiceException { long before = rpcMetrics.getRpcSlowCalls(); // make a really slow call. Sleep sleeps for 1000ms - TestProtos.SleepRequestProto sleepRequest = - TestProtos.SleepRequestProto.newBuilder() - .setMilliSeconds(SLEEP_DURATION).build(); - TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest); + client.sleep(null, newSleepRequest(SLEEP_DURATION)); long after = rpcMetrics.getRpcSlowCalls(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 12115f6266d..a8de7fe5734 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,55 +18,20 @@ package org.apache.hadoop.ipc; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; -import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.Closeable; -import java.io.InterruptedIOException; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.SocketFactory; - +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; +import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -82,36 +47,61 @@ import org.apache.hadoop.test.MockitoUtil; import org.junit.Before; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; -import com.google.protobuf.DescriptorProtos; -import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; +import javax.net.SocketFactory; +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; /** Unit tests for RPC. */ @SuppressWarnings("deprecation") -public class TestRPC { - private static final String ADDRESS = "0.0.0.0"; +public class TestRPC extends TestRpcBase { + + public static final Log LOG = LogFactory.getLog(TestRPC.class); - public static final Log LOG = - LogFactory.getLog(TestRPC.class); - - private static Configuration conf; - @Before - public void setupConf() { - conf = new Configuration(); - conf.setClass("rpc.engine." + StoppedProtocol.class.getName(), - StoppedRpcEngine.class, RpcEngine.class); - UserGroupInformation.setConfiguration(conf); + public void setup() { + setupConf(); } int datasize = 1024*100; int numThreads = 50; public interface TestProtocol extends VersionedProtocol { - public static final long versionID = 1L; - + long versionID = 1L; + void ping() throws IOException; - void slowPing(boolean shouldSlow) throws IOException; void sleep(long delay) throws IOException, InterruptedException; String echo(String value) throws IOException; String[] echo(String[] value) throws IOException; @@ -119,11 +109,6 @@ public interface TestProtocol extends VersionedProtocol { int add(int v1, int v2) throws IOException; int add(int[] values) throws IOException; int error() throws IOException; - void testServerGet() throws IOException; - int[] exchange(int[] values) throws IOException; - - DescriptorProtos.EnumDescriptorProto exchangeProto( - DescriptorProtos.EnumDescriptorProto arg); } public static class TestImpl implements TestProtocol { @@ -133,36 +118,21 @@ public static class TestImpl implements TestProtocol { public long getProtocolVersion(String protocol, long clientVersion) { return TestProtocol.versionID; } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int hashcode) { - return new ProtocolSignature(TestProtocol.versionID, null); - } - - @Override - public void ping() {} @Override - public synchronized void slowPing(boolean shouldSlow) { - if (shouldSlow) { - while (fastPingCounter < 2) { - try { - wait(); // slow response until two fast pings happened - } catch (InterruptedException ignored) {} - } - fastPingCounter -= 2; - } else { - fastPingCounter++; - notify(); - } + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int hashcode) { + return new ProtocolSignature(TestProtocol.versionID, null); } + + @Override + public void ping() {} @Override public void sleep(long delay) throws InterruptedException { Thread.sleep(delay); } - + @Override public String echo(String value) throws IOException { return value; } @@ -191,26 +161,6 @@ public int add(int[] values) { public int error() throws IOException { throw new IOException("bobo"); } - - @Override - public void testServerGet() throws IOException { - if (!(Server.get() instanceof RPC.Server)) { - throw new IOException("Server.get() failed"); - } - } - - @Override - public int[] exchange(int[] values) { - for (int i = 0; i < values.length; i++) { - values[i] = i; - } - return values; - } - - @Override - public EnumDescriptorProto exchangeProto(EnumDescriptorProto arg) { - return arg; - } } // @@ -218,9 +168,9 @@ public EnumDescriptorProto exchangeProto(EnumDescriptorProto arg) { // static class Transactions implements Runnable { int datasize; - TestProtocol proxy; + TestRpcService proxy; - Transactions(TestProtocol proxy, int datasize) { + Transactions(TestRpcService proxy, int datasize) { this.proxy = proxy; this.datasize = datasize; } @@ -228,19 +178,33 @@ static class Transactions implements Runnable { // do two RPC that transfers data. @Override public void run() { - int[] indata = new int[datasize]; - int[] outdata = null; + Integer[] indata = new Integer[datasize]; + Arrays.fill(indata, 123); + TestProtos.ExchangeRequestProto exchangeRequest = + TestProtos.ExchangeRequestProto.newBuilder().addAllValues( + Arrays.asList(indata)).build(); + Integer[] outdata = null; + TestProtos.ExchangeResponseProto exchangeResponse; + + TestProtos.AddRequestProto addRequest = + TestProtos.AddRequestProto.newBuilder().setParam1(1) + .setParam2(2).build(); + TestProtos.AddResponseProto addResponse; + int val = 0; try { - outdata = proxy.exchange(indata); - val = proxy.add(1,2); - } catch (IOException e) { + exchangeResponse = proxy.exchange(null, exchangeRequest); + outdata = new Integer[exchangeResponse.getValuesCount()]; + outdata = exchangeResponse.getValuesList().toArray(outdata); + addResponse = proxy.add(null, addRequest); + val = addResponse.getResult(); + } catch (ServiceException e) { assertTrue("Exception from RPC exchange() " + e, false); } assertEquals(indata.length, outdata.length); assertEquals(3, val); for (int i = 0; i < outdata.length; i++) { - assertEquals(outdata[i], i); + assertEquals(outdata[i].intValue(), i); } } } @@ -249,10 +213,10 @@ public void run() { // A class that does an RPC but does not read its response. // static class SlowRPC implements Runnable { - private TestProtocol proxy; + private TestRpcService proxy; private volatile boolean done; - - SlowRPC(TestProtocol proxy) { + + SlowRPC(TestRpcService proxy) { this.proxy = proxy; done = false; } @@ -264,54 +228,62 @@ boolean isDone() { @Override public void run() { try { - proxy.slowPing(true); // this would hang until two fast pings happened + // this would hang until two fast pings happened + ping(true); done = true; - } catch (IOException e) { + } catch (ServiceException e) { assertTrue("SlowRPC ping exception " + e, false); } } + + void ping(boolean shouldSlow) throws ServiceException { + // this would hang until two fast pings happened + proxy.slowPing(null, newSlowPingRequest(shouldSlow)); + } } - + /** * A basic interface for testing client-side RPC resource cleanup. */ - private static interface StoppedProtocol { + private interface StoppedProtocol { long versionID = 0; - public void stop(); + void stop(); } - + /** * A class used for testing cleanup of client side RPC resources. */ private static class StoppedRpcEngine implements RpcEngine { @Override - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy - ) throws IOException { + public ProtocolProxy getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout, + RetryPolicy connectionRetryPolicy) throws IOException { return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null); + rpcTimeout, connectionRetryPolicy, null); } @SuppressWarnings("unchecked") @Override - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout, - RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth - ) throws IOException { + public ProtocolProxy getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, SocketFactory factory, + int rpcTimeout, RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) throws IOException { T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), - new Class[] { protocol }, new StoppedInvocationHandler()); + new Class[] { protocol }, new StoppedInvocationHandler()); return new ProtocolProxy(protocol, proxy, false); } @Override - public org.apache.hadoop.ipc.RPC.Server getServer(Class protocol, - Object instance, String bindAddress, int port, int numHandlers, - int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager secretManager, + public org.apache.hadoop.ipc.RPC.Server getServer( + Class protocol, Object instance, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, + SecretManager secretManager, String portRangeConfig) throws IOException { return null; } @@ -330,278 +302,265 @@ public ProtocolProxy getProtocolMetaInfoProxy( */ private static class StoppedInvocationHandler implements InvocationHandler, Closeable { - + private int closeCalled = 0; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - return null; + return null; } @Override public void close() throws IOException { closeCalled++; } - + public int getCloseCalled() { return closeCalled; } - + } - + @Test public void testConfRpc() throws IOException { - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + Server server = newServerBuilder(conf) .setNumHandlers(1).setVerbose(false).build(); + // Just one handler int confQ = conf.getInt( - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); assertEquals(confQ, server.getMaxQueueSize()); int confReaders = conf.getInt( - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); assertEquals(confReaders, server.getNumReaders()); - server.stop(); - - server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + + server = newServerBuilder(conf) .setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200) - .setVerbose(false).build(); - + .setVerbose(false).build(); + assertEquals(3, server.getNumReaders()); assertEquals(200, server.getMaxQueueSize()); - server.stop(); } @Test - public void testProxyAddress() throws IOException { - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build(); - TestProtocol proxy = null; - - try { - server.start(); - InetSocketAddress addr = NetUtils.getConnectAddress(server); + public void testProxyAddress() throws Exception { + Server server = null; + TestRpcService proxy = null; + try { + server = setupTestServer(conf, -1); // create a client - proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); - + proxy = getClient(addr, conf); assertEquals(addr, RPC.getServerAddress(proxy)); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, proxy); } } @Test - public void testSlowRpc() throws IOException { + public void testSlowRpc() throws IOException, ServiceException { + Server server; + TestRpcService proxy = null; + System.out.println("Testing Slow RPC"); // create a server with two handlers - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(2).setVerbose(false).build(); - - TestProtocol proxy = null; - + server = setupTestServer(conf, 2); + try { - server.start(); + // create a client + proxy = getClient(addr, conf); - InetSocketAddress addr = NetUtils.getConnectAddress(server); + SlowRPC slowrpc = new SlowRPC(proxy); + Thread thread = new Thread(slowrpc, "SlowRPC"); + thread.start(); // send a slow RPC, which won't return until two fast pings + assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone()); - // create a client - proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); + slowrpc.ping(false); // first fast ping - SlowRPC slowrpc = new SlowRPC(proxy); - Thread thread = new Thread(slowrpc, "SlowRPC"); - thread.start(); // send a slow RPC, which won't return until two fast pings - assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone()); + // verify that the first RPC is still stuck + assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone()); - proxy.slowPing(false); // first fast ping - - // verify that the first RPC is still stuck - assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone()); + slowrpc.ping(false); // second fast ping - proxy.slowPing(false); // second fast ping - - // Now the slow ping should be able to be executed - while (!slowrpc.isDone()) { - System.out.println("Waiting for slow RPC to get done."); - try { - Thread.sleep(1000); - } catch (InterruptedException e) {} - } - } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); + // Now the slow ping should be able to be executed + while (!slowrpc.isDone()) { + System.out.println("Waiting for slow RPC to get done."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} } + } finally { System.out.println("Down slow rpc testing"); + stop(server, proxy); } } - + @Test - public void testCalls() throws IOException { + public void testCalls() throws Exception { testCallsInternal(conf); } - - private void testCallsInternal(Configuration conf) throws IOException { - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build(); - TestProtocol proxy = null; + + private void testCallsInternal(Configuration myConf) throws Exception { + Server server; + TestRpcService proxy = null; + + server = setupTestServer(myConf, -1); try { - server.start(); + proxy = getClient(addr, myConf); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); - - proxy.ping(); + proxy.ping(null, newEmptyRequest()); - String stringResult = proxy.echo("foo"); - assertEquals(stringResult, "foo"); + TestProtos.EchoResponseProto echoResp = proxy.echo(null, + newEchoRequest("foo")); + assertEquals(echoResp.getMessage(), "foo"); - stringResult = proxy.echo((String)null); - assertEquals(stringResult, null); - - // Check rpcMetrics - MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name()); - assertCounter("RpcProcessingTimeNumOps", 3L, rb); - assertCounterGt("SentBytes", 0L, rb); - assertCounterGt("ReceivedBytes", 0L, rb); - - // Number of calls to echo method should be 2 - rb = getMetrics(server.rpcDetailedMetrics.name()); - assertCounter("EchoNumOps", 2L, rb); - - // Number of calls to ping method should be 1 - assertCounter("PingNumOps", 1L, rb); - - String[] stringResults = proxy.echo(new String[]{"foo","bar"}); - assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"})); + echoResp = proxy.echo(null, newEchoRequest("")); + assertEquals(echoResp.getMessage(), ""); - stringResults = proxy.echo((String[])null); - assertTrue(Arrays.equals(stringResults, null)); + // Check rpcMetrics + MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name()); + assertCounter("RpcProcessingTimeNumOps", 3L, rb); + assertCounterGt("SentBytes", 0L, rb); + assertCounterGt("ReceivedBytes", 0L, rb); - UTF8 utf8Result = (UTF8)proxy.echo(new UTF8("hello world")); - assertEquals(new UTF8("hello world"), utf8Result ); + // Number of calls to echo method should be 2 + rb = getMetrics(server.rpcDetailedMetrics.name()); + assertCounter("EchoNumOps", 2L, rb); - utf8Result = (UTF8)proxy.echo((UTF8)null); - assertEquals(null, utf8Result); + // Number of calls to ping method should be 1 + assertCounter("PingNumOps", 1L, rb); - int intResult = proxy.add(1, 2); - assertEquals(intResult, 3); + String[] strings = new String[] {"foo","bar"}; + TestProtos.EchoRequestProto2 echoRequest2 = + TestProtos.EchoRequestProto2.newBuilder().addAllMessage( + Arrays.asList(strings)).build(); + TestProtos.EchoResponseProto2 echoResponse2 = + proxy.echo2(null, echoRequest2); + assertTrue(Arrays.equals(echoResponse2.getMessageList().toArray(), + strings)); - intResult = proxy.add(new int[] {1, 2}); - assertEquals(intResult, 3); + echoRequest2 = TestProtos.EchoRequestProto2.newBuilder() + .addAllMessage(Collections.emptyList()).build(); + echoResponse2 = proxy.echo2(null, echoRequest2); + assertTrue(Arrays.equals(echoResponse2.getMessageList().toArray(), + new String[]{})); - // Test protobufs - EnumDescriptorProto sendProto = - EnumDescriptorProto.newBuilder().setName("test").build(); - EnumDescriptorProto retProto = proxy.exchangeProto(sendProto); - assertEquals(sendProto, retProto); - assertNotSame(sendProto, retProto); + TestProtos.AddRequestProto addRequest = + TestProtos.AddRequestProto.newBuilder().setParam1(1) + .setParam2(2).build(); + TestProtos.AddResponseProto addResponse = + proxy.add(null, addRequest); + assertEquals(addResponse.getResult(), 3); - boolean caught = false; - try { - proxy.error(); - } catch (IOException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Caught " + e); - } - caught = true; - } - assertTrue(caught); - rb = getMetrics(server.rpcDetailedMetrics.name()); - assertCounter("IOExceptionNumOps", 1L, rb); + Integer[] integers = new Integer[] {1, 2}; + TestProtos.AddRequestProto2 addRequest2 = + TestProtos.AddRequestProto2.newBuilder().addAllParams( + Arrays.asList(integers)).build(); + addResponse = proxy.add2(null, addRequest2); + assertEquals(addResponse.getResult(), 3); - proxy.testServerGet(); - - // create multiple threads and make them do large data transfers - System.out.println("Starting multi-threaded RPC test..."); - server.setSocketSendBufSize(1024); - Thread threadId[] = new Thread[numThreads]; - for (int i = 0; i < numThreads; i++) { - Transactions trans = new Transactions(proxy, datasize); - threadId[i] = new Thread(trans, "TransactionThread-" + i); - threadId[i].start(); - } - - // wait for all transactions to get over - System.out.println("Waiting for all threads to finish RPCs..."); - for (int i = 0; i < numThreads; i++) { + boolean caught = false; try { - threadId[i].join(); - } catch (InterruptedException e) { - i--; // retry + proxy.error(null, newEmptyRequest()); + } catch (ServiceException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Caught " + e); + } + caught = true; + } + assertTrue(caught); + rb = getMetrics(server.rpcDetailedMetrics.name()); + assertCounter("RpcServerExceptionNumOps", 1L, rb); + + //proxy.testServerGet(); + + // create multiple threads and make them do large data transfers + System.out.println("Starting multi-threaded RPC test..."); + server.setSocketSendBufSize(1024); + Thread threadId[] = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + Transactions trans = new Transactions(proxy, datasize); + threadId[i] = new Thread(trans, "TransactionThread-" + i); + threadId[i].start(); + } + + // wait for all transactions to get over + System.out.println("Waiting for all threads to finish RPCs..."); + for (int i = 0; i < numThreads; i++) { + try { + threadId[i].join(); + } catch (InterruptedException e) { + i--; // retry + } + } + } finally { + stop(server, proxy); + } + } + + @Test + public void testClientWithoutServer() throws Exception { + TestRpcService proxy; + + short invalidPort = 20; + InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS, + invalidPort); + long invalidClientVersion = 1L; + try { + proxy = RPC.getProxy(TestRpcService.class, + invalidClientVersion, invalidAddress, conf); + // Test echo method + proxy.echo(null, newEchoRequest("hello")); + fail("We should not have reached here"); + } catch (ServiceException ioe) { + //this is what we expected + if (!(ioe.getCause() instanceof ConnectException)) { + fail("We should not have reached here"); } } + } - } finally { - server.stop(); - if(proxy!=null) RPC.stopProxy(proxy); - } - } - - @Test - public void testStandaloneClient() throws IOException { - try { - TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, - TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L); - proxy.echo(""); - fail("We should not have reached here"); - } catch (ConnectException ioe) { - //this is what we expected - } - } - private static final String ACL_CONFIG = "test.protocol.acl"; - + private static class TestPolicyProvider extends PolicyProvider { @Override public Service[] getServices() { - return new Service[] { new Service(ACL_CONFIG, TestProtocol.class) }; + return new Service[] { new Service(ACL_CONFIG, TestRpcService.class) }; } - } - - private void doRPCs(Configuration conf, boolean expectFailure) throws IOException { - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).build(); - server.refreshServiceAcl(conf, new TestPolicyProvider()); + private void doRPCs(Configuration myConf, boolean expectFailure) throws Exception { + Server server; + TestRpcService proxy = null; - TestProtocol proxy = null; + server = setupTestServer(myConf, 5); - server.start(); + server.refreshServiceAcl(myConf, new TestPolicyProvider()); + + TestProtos.EmptyRequestProto emptyRequestProto = + TestProtos.EmptyRequestProto.newBuilder().build(); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - try { - proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); - proxy.ping(); - + proxy = getClient(addr, conf); + proxy.ping(null, emptyRequestProto); if (expectFailure) { fail("Expect RPC.getProxy to fail with AuthorizationException!"); } - } catch (RemoteException e) { + } catch (ServiceException e) { if (expectFailure) { - assertEquals("RPC error code should be UNAUTHORIZED", RpcErrorCodeProto.FATAL_UNAUTHORIZED, e.getErrorCode()); - assertTrue(e.unwrapRemoteException() instanceof AuthorizationException); + RemoteException re = (RemoteException) e.getCause(); + assertTrue(re.unwrapRemoteException() instanceof AuthorizationException); + assertEquals("RPC error code should be UNAUTHORIZED", + RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode()); } else { throw e; } } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name()); if (expectFailure) { assertCounter("RpcAuthorizationFailures", 1L, rb); @@ -612,57 +571,58 @@ private void doRPCs(Configuration conf, boolean expectFailure) throws IOExceptio // 0 for the authentication successes and 0 for failure assertCounter("RpcAuthenticationFailures", 0L, rb); assertCounter("RpcAuthenticationSuccesses", 0L, rb); + + stop(server, proxy); } } - - @Test - public void testServerAddress() throws IOException { - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).build(); - InetSocketAddress bindAddr = null; - try { - bindAddr = NetUtils.getConnectAddress(server); - } finally { - server.stop(); - } - assertEquals(InetAddress.getLocalHost(), bindAddr.getAddress()); - } @Test - public void testAuthorization() throws IOException { - Configuration conf = new Configuration(); - conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, + public void testServerAddress() throws IOException { + Server server; + + server = setupTestServer(conf, 5); + try { + InetSocketAddress bindAddr = NetUtils.getConnectAddress(server); + assertEquals(InetAddress.getLocalHost(), bindAddr.getAddress()); + } finally { + stop(server, null); + } + } + + @Test + public void testAuthorization() throws Exception { + Configuration myConf = new Configuration(); + myConf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, true); // Expect to succeed - conf.set(ACL_CONFIG, "*"); - doRPCs(conf, false); + myConf.set(ACL_CONFIG, "*"); + doRPCs(myConf, false); // Reset authorization to expect failure - conf.set(ACL_CONFIG, "invalid invalid"); - doRPCs(conf, true); + myConf.set(ACL_CONFIG, "invalid invalid"); + doRPCs(myConf, true); - conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); + myConf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); // Expect to succeed - conf.set(ACL_CONFIG, "*"); - doRPCs(conf, false); + myConf.set(ACL_CONFIG, "*"); + doRPCs(myConf, false); // Reset authorization to expect failure - conf.set(ACL_CONFIG, "invalid invalid"); - doRPCs(conf, true); + myConf.set(ACL_CONFIG, "invalid invalid"); + doRPCs(myConf, true); } /** * Switch off setting socketTimeout values on RPC sockets. * Verify that RPC calls still work ok. */ - public void testNoPings() throws IOException { + public void testNoPings() throws Exception { Configuration conf = new Configuration(); - + conf.setBoolean("ipc.client.ping", false); new TestRPC().testCallsInternal(conf); - + conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); new TestRPC().testCallsInternal(conf); } @@ -687,6 +647,9 @@ public void testStopMockObject() throws IOException { @Test public void testStopProxy() throws IOException { + RPC.setProtocolEngine(conf, + StoppedProtocol.class, StoppedRpcEngine.class); + StoppedProtocol proxy = RPC.getProxy(StoppedProtocol.class, StoppedProtocol.versionID, null, conf); StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) @@ -703,8 +666,8 @@ public void testWrappedStopProxy() throws IOException { StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) Proxy.getInvocationHandler(wrappedProxy); - StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class, - wrappedProxy, RetryPolicies.RETRY_FOREVER); + StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create( + StoppedProtocol.class, wrappedProxy, RetryPolicies.RETRY_FOREVER); assertEquals(0, invocationHandler.getCloseCalled()); RPC.stopProxy(proxy); @@ -713,101 +676,71 @@ public void testWrappedStopProxy() throws IOException { @Test public void testErrorMsgForInsecureClient() throws IOException { + Server server; + TestRpcService proxy = null; + Configuration serverConf = new Configuration(conf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, - serverConf); + serverConf); UserGroupInformation.setConfiguration(serverConf); - final Server server = new RPC.Builder(serverConf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).build(); - server.start(); + server = setupTestServer(serverConf, 5); - UserGroupInformation.setConfiguration(conf); boolean succeeded = false; - final InetSocketAddress addr = NetUtils.getConnectAddress(server); - TestProtocol proxy = null; + try { - proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); - proxy.echo(""); - } catch (RemoteException e) { - LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); - assertEquals("RPC error code should be UNAUTHORIZED", RpcErrorCodeProto.FATAL_UNAUTHORIZED, e.getErrorCode()); - assertTrue(e.unwrapRemoteException() instanceof AccessControlException); + UserGroupInformation.setConfiguration(conf); + proxy = getClient(addr, conf); + proxy.echo(null, newEchoRequest("")); + } catch (ServiceException e) { + assertTrue(e.getCause() instanceof RemoteException); + RemoteException re = (RemoteException) e.getCause(); + LOG.info("LOGGING MESSAGE: " + re.getLocalizedMessage()); + assertEquals("RPC error code should be UNAUTHORIZED", + RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode()); + assertTrue(re.unwrapRemoteException() instanceof AccessControlException); succeeded = true; } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, proxy); } assertTrue(succeeded); conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); UserGroupInformation.setConfiguration(serverConf); - final Server multiServer = new RPC.Builder(serverConf) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .build(); - multiServer.start(); + server = setupTestServer(serverConf, 5); succeeded = false; - final InetSocketAddress mulitServerAddr = - NetUtils.getConnectAddress(multiServer); proxy = null; try { UserGroupInformation.setConfiguration(conf); - proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, mulitServerAddr, conf); - proxy.echo(""); - } catch (RemoteException e) { - LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); - assertEquals("RPC error code should be UNAUTHORIZED", RpcErrorCodeProto.FATAL_UNAUTHORIZED, e.getErrorCode()); - assertTrue(e.unwrapRemoteException() instanceof AccessControlException); + proxy = getClient(addr, conf); + proxy.echo(null, newEchoRequest("")); + } catch (ServiceException e) { + RemoteException re = (RemoteException) e.getCause(); + LOG.info("LOGGING MESSAGE: " + re.getLocalizedMessage()); + assertEquals("RPC error code should be UNAUTHORIZED", + RpcErrorCodeProto.FATAL_UNAUTHORIZED, re.getErrorCode()); + assertTrue(re.unwrapRemoteException() instanceof AccessControlException); succeeded = true; } finally { - multiServer.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, proxy); } assertTrue(succeeded); } - /** - * Count the number of threads that have a stack frame containing - * the given string - */ - private static int countThreads(String search) { - ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - - int count = 0; - ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20); - for (ThreadInfo info : infos) { - if (info == null) continue; - for (StackTraceElement elem : info.getStackTrace()) { - if (elem.getClassName().contains(search)) { - count++; - break; - } - } - } - return count; - } - /** * Test that server.stop() properly stops all threads */ @Test public void testStopsAllThreads() throws IOException, InterruptedException { + Server server; + int threadsBefore = countThreads("Server$Listener$Reader"); assertEquals("Expect no Reader threads running before test", - 0, threadsBefore); + 0, threadsBefore); + + server = setupTestServer(conf, 5); - final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true).build(); - server.start(); try { // Wait for at least one reader thread to start int threadsRunning = 0; @@ -824,11 +757,12 @@ public void testStopsAllThreads() throws IOException, InterruptedException { } finally { server.stop(); } + int threadsAfter = countThreads("Server$Listener$Reader"); assertEquals("Expect no Reader threads left running after test", - 0, threadsAfter); + 0, threadsAfter); } - + @Test public void testRPCBuilder() throws IOException { // Test mandatory field conf @@ -864,90 +798,104 @@ public void testRPCBuilder() throws IOException { } } } - - @Test(timeout=90000) - public void testRPCInterruptedSimple() throws IOException { - final Configuration conf = new Configuration(); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS) - .setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(null).build(); - - server.start(); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - final TestProtocol proxy = RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, addr, conf); - // Connect to the server - proxy.ping(); - // Interrupt self, try another call - Thread.currentThread().interrupt(); + @Test(timeout=90000) + public void testRPCInterruptedSimple() throws Exception { + Server server; + TestRpcService proxy = null; + + RPC.Builder builder = newServerBuilder(conf) + .setNumHandlers(5).setVerbose(true) + .setSecretManager(null); + + server = setupTestServer(builder); + try { - proxy.ping(); - fail("Interruption did not cause IPC to fail"); - } catch (IOException ioe) { - if (ioe.toString().contains("InterruptedException") || - ioe instanceof InterruptedIOException) { - // clear interrupt status for future tests - Thread.interrupted(); - return; + proxy = getClient(addr, conf); + // Connect to the server + + proxy.ping(null, newEmptyRequest()); + // Interrupt self, try another call + Thread.currentThread().interrupt(); + try { + proxy.ping(null, newEmptyRequest()); + fail("Interruption did not cause IPC to fail"); + } catch (ServiceException se) { + if (se.toString().contains("InterruptedException") || + se.getCause() instanceof InterruptedIOException) { + // clear interrupt status for future tests + Thread.interrupted(); + return; + } + throw se; } - throw ioe; } finally { - server.stop(); + stop(server, proxy); } } - - @Test(timeout=30000) - public void testRPCInterrupted() throws IOException, InterruptedException { - final Configuration conf = new Configuration(); - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new TestImpl()).setBindAddress(ADDRESS) - .setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(null).build(); - server.start(); + @Test(timeout=30000) + public void testRPCInterrupted() throws Exception { + Server server; + + RPC.Builder builder = newServerBuilder(conf) + .setNumHandlers(5).setVerbose(true) + .setSecretManager(null); + server = setupTestServer(builder); int numConcurrentRPC = 200; - InetSocketAddress addr = NetUtils.getConnectAddress(server); final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC); final CountDownLatch latch = new CountDownLatch(numConcurrentRPC); final AtomicBoolean leaderRunning = new AtomicBoolean(true); - final AtomicReference error = new AtomicReference(); + final AtomicReference error = new AtomicReference<>(); Thread leaderThread = null; - - for (int i = 0; i < numConcurrentRPC; i++) { - final int num = i; - final TestProtocol proxy = RPC.getProxy( - TestProtocol.class, TestProtocol.versionID, addr, conf); - Thread rpcThread = new Thread(new Runnable() { - @Override - public void run() { - try { - barrier.await(); - while (num == 0 || leaderRunning.get()) { - proxy.slowPing(false); - } - proxy.slowPing(false); - } catch (Exception e) { - if (num == 0) { - leaderRunning.set(false); - } else { - error.set(e); - } + try { + for (int i = 0; i < numConcurrentRPC; i++) { + final int num = i; + final TestRpcService proxy = getClient(addr, conf); + Thread rpcThread = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + while (num == 0 || leaderRunning.get()) { + proxy.slowPing(null, newSlowPingRequest(false)); + } - LOG.error(e); - } finally { - latch.countDown(); + proxy.slowPing(null, newSlowPingRequest(false)); + } catch (Exception e) { + if (num == 0) { + leaderRunning.set(false); + } else { + error.set(e); + } + + LOG.error("thread " + num, e); + } finally { + latch.countDown(); + } } - } - }); - rpcThread.start(); + }); + rpcThread.start(); - if (leaderThread == null) { - leaderThread = rpcThread; + if (leaderThread == null) { + leaderThread = rpcThread; + } } + // let threads get past the barrier + Thread.sleep(1000); + // stop a single thread + while (leaderRunning.get()) { + leaderThread.interrupt(); + } + + latch.await(); + + // should not cause any other thread to get an error + assertTrue("rpc got exception " + error.get(), error.get() == null); + } finally { + server.stop(); } // let threads get past the barrier Thread.sleep(1000); @@ -965,46 +913,43 @@ public void run() { @Test public void testConnectionPing() throws Exception { - Configuration conf = new Configuration(); + Server server; + TestRpcService proxy = null; + int pingInterval = 50; conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval); - final Server server = new RPC.Builder(conf) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .build(); - server.start(); + server = setupTestServer(conf, 5); - final TestProtocol proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, server.getListenerAddress(), conf); try { - // this call will throw exception if server couldn't decode the ping - proxy.sleep(pingInterval*4); + proxy = getClient(addr, conf); + + proxy.sleep(null, newSleepRequest(pingInterval * 4)); } finally { - if (proxy != null) RPC.stopProxy(proxy); - server.stop(); + stop(server, proxy); } } @Test public void testRpcMetrics() throws Exception { - Configuration configuration = new Configuration(); + Server server; + TestRpcService proxy = null; + final int interval = 1; - configuration.setBoolean(CommonConfigurationKeys. + conf.setBoolean(CommonConfigurationKeys. RPC_METRICS_QUANTILE_ENABLE, true); - configuration.set(CommonConfigurationKeys. + conf.set(CommonConfigurationKeys. RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); - final Server server = new RPC.Builder(configuration) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .build(); - server.start(); - final TestProtocol proxy = RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, server.getListenerAddress(), configuration); + + server = setupTestServer(conf, 5); + try { - for (int i=0; i<1000; i++) { - proxy.ping(); - proxy.echo("" + i); + proxy = getClient(addr, conf); + + for (int i = 0; i < 1000; i++) { + proxy.ping(null, newEmptyRequest()); + + proxy.echo(null, newEchoRequest("" + i)); } MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); @@ -1017,71 +962,7 @@ public void testRpcMetrics() throws Exception { MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", rpcMetrics); } finally { - if (proxy != null) { - RPC.stopProxy(proxy); - } - server.stop(); - } - } - - /** - * Verify the RPC server can shutdown properly when callQueue is full. - */ - @Test (timeout=30000) - public void testRPCServerShutdown() throws Exception { - final int numClients = 3; - final List> res = new ArrayList>(); - final ExecutorService executorService = - Executors.newFixedThreadPool(numClients); - final Configuration conf = new Configuration(); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); - final Server server = new RPC.Builder(conf) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0) - .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) - .build(); - server.start(); - - final TestProtocol proxy = - RPC.getProxy(TestProtocol.class, TestProtocol.versionID, - NetUtils.getConnectAddress(server), conf); - try { - // start a sleep RPC call to consume the only handler thread. - // Start another sleep RPC call to make callQueue full. - // Start another sleep RPC call to make reader thread block on CallQueue. - for (int i = 0; i < numClients; i++) { - res.add(executorService.submit( - new Callable() { - @Override - public Void call() throws IOException, InterruptedException { - proxy.sleep(100000); - return null; - } - })); - } - while (server.getCallQueueLen() != 1 - || countThreads(CallQueueManager.class.getName()) != 1 - || countThreads(TestImpl.class.getName()) != 1) { - Thread.sleep(100); - } - } finally { - try { - server.stop(); - assertEquals("Not enough clients", numClients, res.size()); - for (Future f : res) { - try { - f.get(); - fail("Future get should not return"); - } catch (ExecutionException e) { - assertTrue("Unexpected exception: " + e, - e.getCause() instanceof IOException); - LOG.info("Expected exception", e.getCause()); - } - } - } finally { - RPC.stopProxy(proxy); - executorService.shutdown(); - } + stop(server, proxy); } } @@ -1090,25 +971,28 @@ public Void call() throws IOException, InterruptedException { */ @Test (timeout=30000) public void testClientBackOff() throws Exception { + Server server; + final TestRpcService proxy; + boolean succeeded = false; final int numClients = 2; final List> res = new ArrayList>(); final ExecutorService executorService = Executors.newFixedThreadPool(numClients); - final Configuration conf = new Configuration(); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); - final Server server = new RPC.Builder(conf) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0) - .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) - .build(); - server.start(); + RPC.Builder builder = newServerBuilder(conf) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); + server = setupTestServer(builder); - final TestProtocol proxy = - RPC.getProxy(TestProtocol.class, TestProtocol.versionID, - NetUtils.getConnectAddress(server), conf); + @SuppressWarnings("unchecked") + CallQueueManager spy = spy((CallQueueManager) Whitebox + .getInternalState(server, "callQueue")); + Whitebox.setInternalState(server, "callQueue", spy); + + Exception lastException = null; + proxy = getClient(addr, conf); try { // start a sleep RPC call to consume the only handler thread. // Start another sleep RPC call to make callQueue full. @@ -1117,8 +1001,8 @@ public void testClientBackOff() throws Exception { res.add(executorService.submit( new Callable() { @Override - public Void call() throws IOException, InterruptedException { - proxy.sleep(100000); + public Void call() throws ServiceException, InterruptedException { + proxy.sleep(null, newSleepRequest(100000)); return null; } })); @@ -1128,17 +1012,19 @@ && countThreads(CallQueueManager.class.getName()) != 1) { Thread.sleep(100); } try { - proxy.sleep(100); - } catch (RemoteException e) { - IOException unwrapExeption = e.unwrapRemoteException(); + proxy.sleep(null, newSleepRequest(100)); + } catch (ServiceException e) { + RemoteException re = (RemoteException) e.getCause(); + IOException unwrapExeption = re.unwrapRemoteException(); if (unwrapExeption instanceof RetriableException) { - succeeded = true; + succeeded = true; + } else { + lastException = unwrapExeption; } } } finally { - server.stop(); - RPC.stopProxy(proxy); executorService.shutdown(); + stop(server, proxy); } assertTrue("RetriableException not received", succeeded); } @@ -1148,32 +1034,27 @@ && countThreads(CallQueueManager.class.getName()) != 1) { */ @Test(timeout=30000) public void testClientRpcTimeout() throws Exception { - final Server server = new RPC.Builder(conf) - .setProtocol(TestProtocol.class).setInstance(new TestImpl()) - .setBindAddress(ADDRESS).setPort(0) - .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) - .build(); - server.start(); + Server server; + TestRpcService proxy = null; + + RPC.Builder builder = newServerBuilder(conf) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); + server = setupTestServer(builder); - final Configuration conf = new Configuration(); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); - final TestProtocol proxy = - RPC.getProxy(TestProtocol.class, TestProtocol.versionID, - NetUtils.getConnectAddress(server), conf); - try { - proxy.sleep(3000); + proxy = getClient(addr, conf); + proxy.sleep(null, newSleepRequest(3000)); fail("RPC should time out."); - } catch (SocketTimeoutException e) { + } catch (ServiceException e) { + assertTrue(e.getCause() instanceof SocketTimeoutException); LOG.info("got expected timeout.", e); } finally { - server.stop(); - RPC.stopProxy(proxy); + stop(server, proxy); } } - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws Exception { new TestRPC().testCallsInternal(conf); - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCServerShutdown.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCServerShutdown.java new file mode 100644 index 00000000000..93af7d4aad8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCServerShutdown.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Split from TestRPC. */ +@SuppressWarnings("deprecation") +public class TestRPCServerShutdown extends TestRpcBase { + + public static final Log LOG = LogFactory.getLog(TestRPCServerShutdown.class); + + @Before + public void setup() { + setupConf(); + } + + /** + * Verify the RPC server can shutdown properly when callQueue is full. + */ + @Test (timeout=30000) + public void testRPCServerShutdown() throws Exception { + final int numClients = 3; + final List> res = new ArrayList>(); + final ExecutorService executorService = + Executors.newFixedThreadPool(numClients); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + RPC.Builder builder = newServerBuilder(conf) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); + final Server server = setupTestServer(builder); + + final TestRpcService proxy = getClient(addr, conf); + try { + // start a sleep RPC call to consume the only handler thread. + // Start another sleep RPC call to make callQueue full. + // Start another sleep RPC call to make reader thread block on CallQueue. + for (int i = 0; i < numClients; i++) { + res.add(executorService.submit( + new Callable() { + @Override + public Void call() throws ServiceException, InterruptedException { + proxy.sleep(null, newSleepRequest(100000)); + return null; + } + })); + } + while (server.getCallQueueLen() != 1 + || countThreads(CallQueueManager.class.getName()) != 1 + || countThreads(PBServerImpl.class.getName()) != 1) { + Thread.sleep(100); + } + } finally { + try { + stop(server, proxy); + assertEquals("Not enough clients", numClients, res.size()); + for (Future f : res) { + try { + f.get(); + fail("Future get should not return"); + } catch (ExecutionException e) { + ServiceException se = (ServiceException) e.getCause(); + assertTrue("Unexpected exception: " + se, + se.getCause() instanceof IOException); + LOG.info("Expected exception", e.getCause()); + } + } + } finally { + executorService.shutdown(); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java new file mode 100644 index 00000000000..03fd31ed668 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.protobuf.TestProtos; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.junit.Assert; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** Test facilities for unit tests for RPC. */ +public class TestRpcBase { + + protected final static String SERVER_PRINCIPAL_KEY = + "test.ipc.server.principal"; + protected final static String ADDRESS = "0.0.0.0"; + protected final static int PORT = 0; + protected static InetSocketAddress addr; + protected static Configuration conf; + + protected void setupConf() { + conf = new Configuration(); + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); + UserGroupInformation.setConfiguration(conf); + } + + protected static RPC.Builder newServerBuilder( + Configuration serverConf) throws IOException { + // Create server side implementation + PBServerImpl serverImpl = new PBServerImpl(); + BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + + // Get RPC server for server side implementation + RPC.Builder builder = new RPC.Builder(serverConf) + .setProtocol(TestRpcService.class) + .setInstance(service).setBindAddress(ADDRESS).setPort(PORT); + + return builder; + } + + protected static RPC.Server setupTestServer(Configuration serverConf, + int numHandlers) throws IOException { + return setupTestServer(serverConf, numHandlers, null); + } + + protected static RPC.Server setupTestServer(Configuration serverConf, + int numHandlers, + SecretManager serverSm) + throws IOException { + RPC.Builder builder = newServerBuilder(serverConf); + + if (numHandlers > 0) { + builder.setNumHandlers(numHandlers); + } + + if (serverSm != null) { + builder.setSecretManager(serverSm); + } + + return setupTestServer(builder); + } + + protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException { + RPC.Server server = builder.build(); + + server.start(); + + addr = NetUtils.getConnectAddress(server); + + return server; + } + + protected static TestRpcService getClient(InetSocketAddress serverAddr, + Configuration clientConf) + throws ServiceException { + try { + return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + protected static void stop(Server server, TestRpcService proxy) { + if (proxy != null) { + try { + RPC.stopProxy(proxy); + } catch (Exception ignored) {} + } + + if (server != null) { + try { + server.stop(); + } catch (Exception ignored) {} + } + } + + /** + * Count the number of threads that have a stack frame containing + * the given string + */ + protected static int countThreads(String search) { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + + int count = 0; + ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20); + for (ThreadInfo info : infos) { + if (info == null) continue; + for (StackTraceElement elem : info.getStackTrace()) { + if (elem.getClassName().contains(search)) { + count++; + break; + } + } + } + return count; + } + + @ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService", + protocolVersion = 1) + public interface TestRpcService + extends TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface { + } + + public static class PBServerImpl implements TestRpcService { + CountDownLatch fastPingCounter = new CountDownLatch(2); + private List postponedCalls = new ArrayList<>(); + + @Override + public TestProtos.EmptyResponseProto ping(RpcController unused, + TestProtos.EmptyRequestProto request) throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(clientId); + Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length); + return TestProtos.EmptyResponseProto.newBuilder().build(); + } + + @Override + public TestProtos.EchoResponseProto echo( + RpcController unused, TestProtos.EchoRequestProto request) + throws ServiceException { + return TestProtos.EchoResponseProto.newBuilder().setMessage( + request.getMessage()) + .build(); + } + + @Override + public TestProtos.EmptyResponseProto error( + RpcController unused, TestProtos.EmptyRequestProto request) + throws ServiceException { + throw new ServiceException("error", new RpcServerException("error")); + } + + @Override + public TestProtos.EmptyResponseProto error2( + RpcController unused, TestProtos.EmptyRequestProto request) + throws ServiceException { + throw new ServiceException("error", new URISyntaxException("", + "testException")); + } + + @Override + public TestProtos.EmptyResponseProto slowPing( + RpcController unused, TestProtos.SlowPingRequestProto request) + throws ServiceException { + boolean shouldSlow = request.getShouldSlow(); + if (shouldSlow) { + try { + fastPingCounter.await(); //slow response until two fast pings happened + } catch (InterruptedException ignored) {} + } else { + fastPingCounter.countDown(); + } + + return TestProtos.EmptyResponseProto.newBuilder().build(); + } + + @Override + public TestProtos.EchoResponseProto2 echo2( + RpcController controller, TestProtos.EchoRequestProto2 request) + throws ServiceException { + return TestProtos.EchoResponseProto2.newBuilder().addAllMessage( + request.getMessageList()).build(); + } + + @Override + public TestProtos.AddResponseProto add( + RpcController controller, TestProtos.AddRequestProto request) + throws ServiceException { + return TestProtos.AddResponseProto.newBuilder().setResult( + request.getParam1() + request.getParam2()).build(); + } + + @Override + public TestProtos.AddResponseProto add2( + RpcController controller, TestProtos.AddRequestProto2 request) + throws ServiceException { + int sum = 0; + for (Integer num : request.getParamsList()) { + sum += num; + } + return TestProtos.AddResponseProto.newBuilder().setResult(sum).build(); + } + + @Override + public TestProtos.EmptyResponseProto testServerGet( + RpcController controller, TestProtos.EmptyRequestProto request) + throws ServiceException { + if (!(Server.get() instanceof RPC.Server)) { + throw new ServiceException("Server.get() failed"); + } + return TestProtos.EmptyResponseProto.newBuilder().build(); + } + + @Override + public TestProtos.ExchangeResponseProto exchange( + RpcController controller, TestProtos.ExchangeRequestProto request) + throws ServiceException { + Integer[] values = new Integer[request.getValuesCount()]; + for (int i = 0; i < values.length; i++) { + values[i] = i; + } + return TestProtos.ExchangeResponseProto.newBuilder() + .addAllValues(Arrays.asList(values)).build(); + } + + @Override + public TestProtos.EmptyResponseProto sleep( + RpcController controller, TestProtos.SleepRequestProto request) + throws ServiceException { + try { + Thread.sleep(request.getMilliSeconds()); + } catch (InterruptedException ignore) {} + return TestProtos.EmptyResponseProto.newBuilder().build(); + } + } + + protected static TestProtos.EmptyRequestProto newEmptyRequest() { + return TestProtos.EmptyRequestProto.newBuilder().build(); + } + + protected static TestProtos.EchoRequestProto newEchoRequest(String msg) { + return TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build(); + } + + protected static String convert(TestProtos.EchoResponseProto response) { + return response.getMessage(); + } + + protected static TestProtos.SlowPingRequestProto newSlowPingRequest( + boolean shouldSlow) throws ServiceException { + return TestProtos.SlowPingRequestProto.newBuilder(). + setShouldSlow(shouldSlow).build(); + } + + protected static TestProtos.SleepRequestProto newSleepRequest( + int milliSeconds) { + return TestProtos.SleepRequestProto.newBuilder() + .setMilliSeconds(milliSeconds).build(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto index 4ab590e01a6..ba0038d0d15 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto @@ -48,4 +48,37 @@ message SleepRequestProto{ } message SleepResponseProto{ +} + +message SlowPingRequestProto { + required bool shouldSlow = 1; +} + +message EchoRequestProto2 { + repeated string message = 1; +} + +message EchoResponseProto2 { + repeated string message = 1; +} + +message AddRequestProto { + required int32 param1 = 1; + required int32 param2 = 2; +} + +message AddRequestProto2 { + repeated int32 params = 1; +} + +message AddResponseProto { + required int32 result = 1; +} + +message ExchangeRequestProto { + repeated int32 values = 1; +} + +message ExchangeResponseProto { + repeated int32 values = 1; } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index 722af892442..abb38831e5f 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -32,6 +32,13 @@ service TestProtobufRpcProto { rpc echo(EchoRequestProto) returns (EchoResponseProto); rpc error(EmptyRequestProto) returns (EmptyResponseProto); rpc error2(EmptyRequestProto) returns (EmptyResponseProto); + rpc slowPing(SlowPingRequestProto) returns (EmptyResponseProto); + rpc echo2(EchoRequestProto2) returns (EchoResponseProto2); + rpc add(AddRequestProto) returns (AddResponseProto); + rpc add2(AddRequestProto2) returns (AddResponseProto); + rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto); + rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); + rpc sleep(SleepRequestProto) returns (EmptyResponseProto); } service TestProtobufRpc2Proto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java index 39e6dc51cef..44ed883c3c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java @@ -24,15 +24,25 @@ import java.net.InetAddress; import java.net.InetSocketAddress; +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.TestRPC.TestImpl; +import org.apache.hadoop.ipc.TestRpcBase.TestRpcService; import org.apache.hadoop.ipc.TestRPC.TestProtocol; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.ipc.protobuf.TestProtos; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.Keys; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -188,12 +198,19 @@ public void testNMAuditLoggerWithoutIP() throws Exception { * A special extension of {@link TestImpl} RPC server with * {@link TestImpl#ping()} testing the audit logs. */ - private class MyTestRPCServer extends TestImpl { + private class MyTestRPCServer extends TestRpcBase.PBServerImpl { @Override - public void ping() { + public TestProtos.EmptyResponseProto ping( + RpcController unused, TestProtos.EmptyRequestProto request) + throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(clientId); + Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length); // test with ip set testSuccessLogFormat(true); testFailureLogFormat(true); + return TestProtos.EmptyResponseProto.newBuilder().build(); } } @@ -203,9 +220,17 @@ public void ping() { @Test public void testNMAuditLoggerWithIP() throws Exception { Configuration conf = new Configuration(); + RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); + + // Create server side implementation + MyTestRPCServer serverImpl = new MyTestRPCServer(); + BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + // start the IPC server - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0") + Server server = new RPC.Builder(conf) + .setProtocol(TestRpcBase.TestRpcService.class) + .setInstance(service).setBindAddress("0.0.0.0") .setPort(0).setNumHandlers(5).setVerbose(true).build(); server.start(); @@ -213,11 +238,14 @@ public void testNMAuditLoggerWithIP() throws Exception { InetSocketAddress addr = NetUtils.getConnectAddress(server); // Make a client connection and test the audit log - TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, + TestRpcService proxy = RPC.getProxy(TestRpcService.class, TestProtocol.versionID, addr, conf); // Start the testcase - proxy.ping(); + TestProtos.EmptyRequestProto pingRequest = + TestProtos.EmptyRequestProto.newBuilder().build(); + proxy.ping(null, pingRequest); server.stop(); + RPC.stopProxy(proxy); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java index 49b23d97f9d..66af3f1e75e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java @@ -24,17 +24,27 @@ import java.net.InetAddress; import java.net.InetSocketAddress; +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.ClientId; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.TestRPC.TestImpl; import org.apache.hadoop.ipc.TestRPC.TestProtocol; +import org.apache.hadoop.ipc.TestRpcBase; +import org.apache.hadoop.ipc.TestRpcBase.TestRpcService; +import org.apache.hadoop.ipc.protobuf.TestProtos; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -255,12 +265,19 @@ public void testRMAuditLoggerWithoutIP() throws Exception { * A special extension of {@link TestImpl} RPC server with * {@link TestImpl#ping()} testing the audit logs. */ - private class MyTestRPCServer extends TestImpl { + private class MyTestRPCServer extends TestRpcBase.PBServerImpl { @Override - public void ping() { + public TestProtos.EmptyResponseProto ping( + RpcController unused, TestProtos.EmptyRequestProto request) + throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(clientId); + Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length); // test with ip set testSuccessLogFormat(true); testFailureLogFormat(true); + return TestProtos.EmptyResponseProto.newBuilder().build(); } } @@ -270,20 +287,33 @@ public void ping() { @Test public void testRMAuditLoggerWithIP() throws Exception { Configuration conf = new Configuration(); + RPC.setProtocolEngine(conf, TestRpcService.class, + ProtobufRpcEngine.class); + + // Create server side implementation + MyTestRPCServer serverImpl = new MyTestRPCServer(); + BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(serverImpl); + // start the IPC server - Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) - .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0") + Server server = new RPC.Builder(conf) + .setProtocol(TestRpcService.class) + .setInstance(service).setBindAddress("0.0.0.0") .setPort(0).setNumHandlers(5).setVerbose(true).build(); + server.start(); InetSocketAddress addr = NetUtils.getConnectAddress(server); // Make a client connection and test the audit log - TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, - TestProtocol.versionID, addr, conf); + TestRpcService proxy = RPC.getProxy(TestRpcService.class, + TestProtocol.versionID, addr, conf); // Start the testcase - proxy.ping(); + TestProtos.EmptyRequestProto pingRequest = + TestProtos.EmptyRequestProto.newBuilder().build(); + proxy.ping(null, pingRequest); server.stop(); + RPC.stopProxy(proxy); } }