HADOOP-12813. Migrate TestRPC and related codes to rebase on ProtobufRpcEngine. Contributed by Kai Zheng.
This commit is contained in:
parent
bd0f5085e3
commit
c5db4ab0b4
|
@ -1127,6 +1127,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HADOOP-12846. Credential Provider Recursive Dependencies.
|
||||
(Larry McCay via cnauroth)
|
||||
|
||||
HADOOP-12813. Migrate TestRPC and related codes to rebase on
|
||||
ProtobufRpcEngine. (Kai Zheng via wheat9)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -34,8 +34,6 @@ import org.apache.commons.cli.ParseException;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ipc.RPC.Server;
|
||||
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
|
||||
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
|
||||
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||
|
@ -54,7 +52,7 @@ import com.google.protobuf.BlockingService;
|
|||
* Benchmark for protobuf RPC.
|
||||
* Run with --help option for usage.
|
||||
*/
|
||||
public class RPCCallBenchmark implements Tool {
|
||||
public class RPCCallBenchmark extends TestRpcBase implements Tool {
|
||||
private Configuration conf;
|
||||
private AtomicLong callCount = new AtomicLong(0);
|
||||
private static ThreadMXBean threadBean =
|
||||
|
|
|
@ -23,8 +23,6 @@ import java.net.InetSocketAddress;
|
|||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
|
||||
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.junit.Before;
|
||||
|
@ -32,8 +30,7 @@ import org.junit.After;
|
|||
import org.junit.Test;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
public class TestMultipleProtocolServer {
|
||||
private static final String ADDRESS = "0.0.0.0";
|
||||
public class TestMultipleProtocolServer extends TestRpcBase {
|
||||
private static InetSocketAddress addr;
|
||||
private static RPC.Server server;
|
||||
|
||||
|
@ -64,13 +61,12 @@ public class TestMultipleProtocolServer {
|
|||
public static final long versionID = 0L;
|
||||
void hello() throws IOException;
|
||||
}
|
||||
|
||||
interface Bar extends Mixin {
|
||||
public static final long versionID = 0L;
|
||||
int echo(int i) throws IOException;
|
||||
}
|
||||
|
||||
|
||||
|
||||
class Foo0Impl implements Foo0 {
|
||||
|
||||
@Override
|
||||
|
@ -185,8 +181,7 @@ public class TestMultipleProtocolServer {
|
|||
|
||||
// Add Protobuf server
|
||||
// Create server side implementation
|
||||
PBServerImpl pbServerImpl =
|
||||
new PBServerImpl();
|
||||
PBServerImpl pbServerImpl = new PBServerImpl();
|
||||
BlockingService service = TestProtobufRpcProto
|
||||
.newReflectiveBlockingService(pbServerImpl);
|
||||
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
|
||||
|
@ -241,8 +236,7 @@ public class TestMultipleProtocolServer {
|
|||
FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
|
||||
foo.ping();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* getProtocolVersion of an unimplemented version should return highest version
|
||||
* Similarly getProtocolSignature should work.
|
||||
|
|
|
@ -17,15 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
|
@ -36,76 +30,37 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
|||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test for testing protocol buffer based RPC mechanism.
|
||||
* This test depends on test.proto definition of types in src/test/proto
|
||||
* and protobuf service definition from src/test/test_rpc_service.proto
|
||||
*/
|
||||
public class TestProtoBufRpc {
|
||||
public final static String ADDRESS = "0.0.0.0";
|
||||
public final static int PORT = 0;
|
||||
private static InetSocketAddress addr;
|
||||
private static Configuration conf;
|
||||
public class TestProtoBufRpc extends TestRpcBase {
|
||||
private static RPC.Server server;
|
||||
private final static int SLEEP_DURATION = 1000;
|
||||
|
||||
@ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
|
||||
public interface TestRpcService
|
||||
extends TestProtobufRpcProto.BlockingInterface {
|
||||
}
|
||||
|
||||
@ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
|
||||
public interface TestRpcService2 extends
|
||||
TestProtobufRpc2Proto.BlockingInterface {
|
||||
}
|
||||
|
||||
public static class PBServerImpl implements TestRpcService {
|
||||
|
||||
@Override
|
||||
public EmptyResponseProto ping(RpcController unused,
|
||||
EmptyRequestProto request) throws ServiceException {
|
||||
// Ensure clientId is received
|
||||
byte[] clientId = Server.getClientId();
|
||||
Assert.assertNotNull(Server.getClientId());
|
||||
Assert.assertEquals(16, clientId.length);
|
||||
return EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
|
||||
throws ServiceException {
|
||||
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EmptyResponseProto error(RpcController unused,
|
||||
EmptyRequestProto request) throws ServiceException {
|
||||
throw new ServiceException("error", new RpcServerException("error"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public EmptyResponseProto error2(RpcController unused,
|
||||
EmptyRequestProto request) throws ServiceException {
|
||||
throw new ServiceException("error", new URISyntaxException("",
|
||||
"testException"));
|
||||
}
|
||||
}
|
||||
|
||||
public static class PBServer2Impl implements TestRpcService2 {
|
||||
|
||||
@Override
|
||||
|
@ -133,12 +88,13 @@ public class TestProtoBufRpc {
|
|||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException { // Setup server for both protocols
|
||||
public void setUp() throws IOException { // Setup server for both protocols
|
||||
conf = new Configuration();
|
||||
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
|
||||
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
|
||||
// Set RPC engine to protobuf RPC engine
|
||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||
RPC.setProtocolEngine(conf, TestRpcService2.class, ProtobufRpcEngine.class);
|
||||
|
||||
// Create server side implementation
|
||||
PBServerImpl serverImpl = new PBServerImpl();
|
||||
|
@ -149,12 +105,12 @@ public class TestProtoBufRpc {
|
|||
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
|
||||
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build();
|
||||
addr = NetUtils.getConnectAddress(server);
|
||||
|
||||
|
||||
// now the second protocol
|
||||
PBServer2Impl server2Impl = new PBServer2Impl();
|
||||
BlockingService service2 = TestProtobufRpc2Proto
|
||||
.newReflectiveBlockingService(server2Impl);
|
||||
|
||||
|
||||
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
|
||||
service2);
|
||||
server.start();
|
||||
|
@ -166,31 +122,20 @@ public class TestProtoBufRpc {
|
|||
server.stop();
|
||||
}
|
||||
|
||||
private static TestRpcService getClient() throws IOException {
|
||||
// Set RPC engine to protobuf RPC engine
|
||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||
return RPC.getProxy(TestRpcService.class, 0, addr, conf);
|
||||
}
|
||||
|
||||
private static TestRpcService2 getClient2() throws IOException {
|
||||
// Set RPC engine to protobuf RPC engine
|
||||
RPC.setProtocolEngine(conf, TestRpcService2.class,
|
||||
ProtobufRpcEngine.class);
|
||||
return RPC.getProxy(TestRpcService2.class, 0, addr,
|
||||
conf);
|
||||
private TestRpcService2 getClient2() throws IOException {
|
||||
return RPC.getProxy(TestRpcService2.class, 0, addr, conf);
|
||||
}
|
||||
|
||||
@Test (timeout=5000)
|
||||
public void testProtoBufRpc() throws Exception {
|
||||
TestRpcService client = getClient();
|
||||
TestRpcService client = getClient(addr, conf);
|
||||
testProtoBufRpc(client);
|
||||
}
|
||||
|
||||
// separated test out so that other tests can call it.
|
||||
public static void testProtoBufRpc(TestRpcService client) throws Exception {
|
||||
// Test ping method
|
||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||
client.ping(null, emptyRequest);
|
||||
client.ping(null, newEmptyRequest());
|
||||
|
||||
// Test echo method
|
||||
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
||||
|
@ -200,7 +145,7 @@ public class TestProtoBufRpc {
|
|||
|
||||
// Test error method - error should be thrown as RemoteException
|
||||
try {
|
||||
client.error(null, emptyRequest);
|
||||
client.error(null, newEmptyRequest());
|
||||
Assert.fail("Expected exception is not thrown");
|
||||
} catch (ServiceException e) {
|
||||
RemoteException re = (RemoteException)e.getCause();
|
||||
|
@ -217,13 +162,11 @@ public class TestProtoBufRpc {
|
|||
TestRpcService2 client = getClient2();
|
||||
|
||||
// Test ping method
|
||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||
client.ping2(null, emptyRequest);
|
||||
client.ping2(null, newEmptyRequest());
|
||||
|
||||
// Test echo method
|
||||
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
||||
.setMessage("hello").build();
|
||||
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
|
||||
EchoResponseProto echoResponse = client.echo2(null,
|
||||
newEchoRequest("hello"));
|
||||
Assert.assertEquals(echoResponse.getMessage(), "hello");
|
||||
|
||||
// Ensure RPC metrics are updated
|
||||
|
@ -238,11 +181,10 @@ public class TestProtoBufRpc {
|
|||
|
||||
@Test (timeout=5000)
|
||||
public void testProtoBufRandomException() throws Exception {
|
||||
TestRpcService client = getClient();
|
||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||
TestRpcService client = getClient(addr, conf);
|
||||
|
||||
try {
|
||||
client.error2(null, emptyRequest);
|
||||
client.error2(null, newEmptyRequest());
|
||||
} catch (ServiceException se) {
|
||||
Assert.assertTrue(se.getCause() instanceof RemoteException);
|
||||
RemoteException re = (RemoteException) se.getCause();
|
||||
|
@ -258,17 +200,14 @@ public class TestProtoBufRpc {
|
|||
public void testExtraLongRpc() throws Exception {
|
||||
TestRpcService2 client = getClient2();
|
||||
final String shortString = StringUtils.repeat("X", 4);
|
||||
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
||||
.setMessage(shortString).build();
|
||||
// short message goes through
|
||||
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
|
||||
EchoResponseProto echoResponse = client.echo2(null,
|
||||
newEchoRequest(shortString));
|
||||
Assert.assertEquals(shortString, echoResponse.getMessage());
|
||||
|
||||
final String longString = StringUtils.repeat("X", 4096);
|
||||
echoRequest = EchoRequestProto.newBuilder()
|
||||
.setMessage(longString).build();
|
||||
try {
|
||||
echoResponse = client.echo2(null, echoRequest);
|
||||
client.echo2(null, newEchoRequest(longString));
|
||||
Assert.fail("expected extra-long RPC to fail");
|
||||
} catch (ServiceException se) {
|
||||
// expected
|
||||
|
@ -281,8 +220,7 @@ public class TestProtoBufRpc {
|
|||
// make 10 K fast calls
|
||||
for (int x = 0; x < 10000; x++) {
|
||||
try {
|
||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||
client.ping2(null, emptyRequest);
|
||||
client.ping2(null, newEmptyRequest());
|
||||
} catch (Exception ex) {
|
||||
throw ex;
|
||||
}
|
||||
|
@ -294,10 +232,7 @@ public class TestProtoBufRpc {
|
|||
long before = rpcMetrics.getRpcSlowCalls();
|
||||
|
||||
// make a really slow call. Sleep sleeps for 1000ms
|
||||
TestProtos.SleepRequestProto sleepRequest =
|
||||
TestProtos.SleepRequestProto.newBuilder()
|
||||
.setMilliSeconds(SLEEP_DURATION * 3).build();
|
||||
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
||||
client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
|
||||
|
||||
long after = rpcMetrics.getRpcSlowCalls();
|
||||
// Ensure slow call is logged.
|
||||
|
@ -312,8 +247,7 @@ public class TestProtoBufRpc {
|
|||
|
||||
// make 10 K fast calls
|
||||
for (int x = 0; x < 10000; x++) {
|
||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||
client.ping2(null, emptyRequest);
|
||||
client.ping2(null, newEmptyRequest());
|
||||
}
|
||||
|
||||
// Ensure RPC metrics are updated
|
||||
|
@ -322,10 +256,7 @@ public class TestProtoBufRpc {
|
|||
long before = rpcMetrics.getRpcSlowCalls();
|
||||
|
||||
// make a really slow call. Sleep sleeps for 1000ms
|
||||
TestProtos.SleepRequestProto sleepRequest =
|
||||
TestProtos.SleepRequestProto.newBuilder()
|
||||
.setMilliSeconds(SLEEP_DURATION).build();
|
||||
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
||||
client.sleep(null, newSleepRequest(SLEEP_DURATION));
|
||||
|
||||
long after = rpcMetrics.getRpcSlowCalls();
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/** Split from TestRPC. */
|
||||
@SuppressWarnings("deprecation")
|
||||
public class TestRPCServerShutdown extends TestRpcBase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestRPCServerShutdown.class);
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
setupConf();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the RPC server can shutdown properly when callQueue is full.
|
||||
*/
|
||||
@Test (timeout=30000)
|
||||
public void testRPCServerShutdown() throws Exception {
|
||||
final int numClients = 3;
|
||||
final List<Future<Void>> res = new ArrayList<Future<Void>>();
|
||||
final ExecutorService executorService =
|
||||
Executors.newFixedThreadPool(numClients);
|
||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||
RPC.Builder builder = newServerBuilder(conf)
|
||||
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
|
||||
final Server server = setupTestServer(builder);
|
||||
|
||||
final TestRpcService proxy = getClient(addr, conf);
|
||||
try {
|
||||
// start a sleep RPC call to consume the only handler thread.
|
||||
// Start another sleep RPC call to make callQueue full.
|
||||
// Start another sleep RPC call to make reader thread block on CallQueue.
|
||||
for (int i = 0; i < numClients; i++) {
|
||||
res.add(executorService.submit(
|
||||
new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws ServiceException, InterruptedException {
|
||||
proxy.sleep(null, newSleepRequest(100000));
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
}
|
||||
while (server.getCallQueueLen() != 1
|
||||
|| countThreads(CallQueueManager.class.getName()) != 1
|
||||
|| countThreads(PBServerImpl.class.getName()) != 1) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
stop(server, proxy);
|
||||
assertEquals("Not enough clients", numClients, res.size());
|
||||
for (Future<Void> f : res) {
|
||||
try {
|
||||
f.get();
|
||||
fail("Future get should not return");
|
||||
} catch (ExecutionException e) {
|
||||
ServiceException se = (ServiceException) e.getCause();
|
||||
assertTrue("Unexpected exception: " + se,
|
||||
se.getCause() instanceof IOException);
|
||||
LOG.info("Expected exception", e.getCause());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,295 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/** Test facilities for unit tests for RPC. */
|
||||
public class TestRpcBase {
|
||||
|
||||
protected final static String SERVER_PRINCIPAL_KEY =
|
||||
"test.ipc.server.principal";
|
||||
protected final static String ADDRESS = "0.0.0.0";
|
||||
protected final static int PORT = 0;
|
||||
protected static InetSocketAddress addr;
|
||||
protected static Configuration conf;
|
||||
|
||||
protected void setupConf() {
|
||||
conf = new Configuration();
|
||||
// Set RPC engine to protobuf RPC engine
|
||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
}
|
||||
|
||||
protected static RPC.Builder newServerBuilder(
|
||||
Configuration serverConf) throws IOException {
|
||||
// Create server side implementation
|
||||
PBServerImpl serverImpl = new PBServerImpl();
|
||||
BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto
|
||||
.newReflectiveBlockingService(serverImpl);
|
||||
|
||||
// Get RPC server for server side implementation
|
||||
RPC.Builder builder = new RPC.Builder(serverConf)
|
||||
.setProtocol(TestRpcService.class)
|
||||
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT);
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
protected static RPC.Server setupTestServer(Configuration serverConf,
|
||||
int numHandlers) throws IOException {
|
||||
return setupTestServer(serverConf, numHandlers, null);
|
||||
}
|
||||
|
||||
protected static RPC.Server setupTestServer(Configuration serverConf,
|
||||
int numHandlers,
|
||||
SecretManager<?> serverSm)
|
||||
throws IOException {
|
||||
RPC.Builder builder = newServerBuilder(serverConf);
|
||||
|
||||
if (numHandlers > 0) {
|
||||
builder.setNumHandlers(numHandlers);
|
||||
}
|
||||
|
||||
if (serverSm != null) {
|
||||
builder.setSecretManager(serverSm);
|
||||
}
|
||||
|
||||
return setupTestServer(builder);
|
||||
}
|
||||
|
||||
protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException {
|
||||
RPC.Server server = builder.build();
|
||||
|
||||
server.start();
|
||||
|
||||
addr = NetUtils.getConnectAddress(server);
|
||||
|
||||
return server;
|
||||
}
|
||||
|
||||
protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||
Configuration clientConf)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void stop(Server server, TestRpcService proxy) {
|
||||
if (proxy != null) {
|
||||
try {
|
||||
RPC.stopProxy(proxy);
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
|
||||
if (server != null) {
|
||||
try {
|
||||
server.stop();
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Count the number of threads that have a stack frame containing
|
||||
* the given string
|
||||
*/
|
||||
protected static int countThreads(String search) {
|
||||
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
|
||||
|
||||
int count = 0;
|
||||
ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
|
||||
for (ThreadInfo info : infos) {
|
||||
if (info == null) continue;
|
||||
for (StackTraceElement elem : info.getStackTrace()) {
|
||||
if (elem.getClassName().contains(search)) {
|
||||
count++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
@ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
|
||||
protocolVersion = 1)
|
||||
public interface TestRpcService
|
||||
extends TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface {
|
||||
}
|
||||
|
||||
public static class PBServerImpl implements TestRpcService {
|
||||
CountDownLatch fastPingCounter = new CountDownLatch(2);
|
||||
private List<Server.Call> postponedCalls = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto ping(RpcController unused,
|
||||
TestProtos.EmptyRequestProto request) throws ServiceException {
|
||||
// Ensure clientId is received
|
||||
byte[] clientId = Server.getClientId();
|
||||
Assert.assertNotNull(clientId);
|
||||
Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length);
|
||||
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EchoResponseProto echo(
|
||||
RpcController unused, TestProtos.EchoRequestProto request)
|
||||
throws ServiceException {
|
||||
return TestProtos.EchoResponseProto.newBuilder().setMessage(
|
||||
request.getMessage())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto error(
|
||||
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
throw new ServiceException("error", new RpcServerException("error"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto error2(
|
||||
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
throw new ServiceException("error", new URISyntaxException("",
|
||||
"testException"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto slowPing(
|
||||
RpcController unused, TestProtos.SlowPingRequestProto request)
|
||||
throws ServiceException {
|
||||
boolean shouldSlow = request.getShouldSlow();
|
||||
if (shouldSlow) {
|
||||
try {
|
||||
fastPingCounter.await(); //slow response until two fast pings happened
|
||||
} catch (InterruptedException ignored) {}
|
||||
} else {
|
||||
fastPingCounter.countDown();
|
||||
}
|
||||
|
||||
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EchoResponseProto2 echo2(
|
||||
RpcController controller, TestProtos.EchoRequestProto2 request)
|
||||
throws ServiceException {
|
||||
return TestProtos.EchoResponseProto2.newBuilder().addAllMessage(
|
||||
request.getMessageList()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.AddResponseProto add(
|
||||
RpcController controller, TestProtos.AddRequestProto request)
|
||||
throws ServiceException {
|
||||
return TestProtos.AddResponseProto.newBuilder().setResult(
|
||||
request.getParam1() + request.getParam2()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.AddResponseProto add2(
|
||||
RpcController controller, TestProtos.AddRequestProto2 request)
|
||||
throws ServiceException {
|
||||
int sum = 0;
|
||||
for (Integer num : request.getParamsList()) {
|
||||
sum += num;
|
||||
}
|
||||
return TestProtos.AddResponseProto.newBuilder().setResult(sum).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto testServerGet(
|
||||
RpcController controller, TestProtos.EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
if (!(Server.get() instanceof RPC.Server)) {
|
||||
throw new ServiceException("Server.get() failed");
|
||||
}
|
||||
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.ExchangeResponseProto exchange(
|
||||
RpcController controller, TestProtos.ExchangeRequestProto request)
|
||||
throws ServiceException {
|
||||
Integer[] values = new Integer[request.getValuesCount()];
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
values[i] = i;
|
||||
}
|
||||
return TestProtos.ExchangeResponseProto.newBuilder()
|
||||
.addAllValues(Arrays.asList(values)).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.EmptyResponseProto sleep(
|
||||
RpcController controller, TestProtos.SleepRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
Thread.sleep(request.getMilliSeconds());
|
||||
} catch (InterruptedException ignore) {}
|
||||
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
}
|
||||
|
||||
protected static TestProtos.EmptyRequestProto newEmptyRequest() {
|
||||
return TestProtos.EmptyRequestProto.newBuilder().build();
|
||||
}
|
||||
|
||||
protected static TestProtos.EchoRequestProto newEchoRequest(String msg) {
|
||||
return TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build();
|
||||
}
|
||||
|
||||
protected static String convert(TestProtos.EchoResponseProto response) {
|
||||
return response.getMessage();
|
||||
}
|
||||
|
||||
protected static TestProtos.SlowPingRequestProto newSlowPingRequest(
|
||||
boolean shouldSlow) throws ServiceException {
|
||||
return TestProtos.SlowPingRequestProto.newBuilder().
|
||||
setShouldSlow(shouldSlow).build();
|
||||
}
|
||||
|
||||
protected static TestProtos.SleepRequestProto newSleepRequest(
|
||||
int milliSeconds) {
|
||||
return TestProtos.SleepRequestProto.newBuilder()
|
||||
.setMilliSeconds(milliSeconds).build();
|
||||
}
|
||||
}
|
|
@ -48,4 +48,37 @@ message SleepRequestProto{
|
|||
}
|
||||
|
||||
message SleepResponseProto{
|
||||
}
|
||||
|
||||
message SlowPingRequestProto {
|
||||
required bool shouldSlow = 1;
|
||||
}
|
||||
|
||||
message EchoRequestProto2 {
|
||||
repeated string message = 1;
|
||||
}
|
||||
|
||||
message EchoResponseProto2 {
|
||||
repeated string message = 1;
|
||||
}
|
||||
|
||||
message AddRequestProto {
|
||||
required int32 param1 = 1;
|
||||
required int32 param2 = 2;
|
||||
}
|
||||
|
||||
message AddRequestProto2 {
|
||||
repeated int32 params = 1;
|
||||
}
|
||||
|
||||
message AddResponseProto {
|
||||
required int32 result = 1;
|
||||
}
|
||||
|
||||
message ExchangeRequestProto {
|
||||
repeated int32 values = 1;
|
||||
}
|
||||
|
||||
message ExchangeResponseProto {
|
||||
repeated int32 values = 1;
|
||||
}
|
|
@ -32,6 +32,13 @@ service TestProtobufRpcProto {
|
|||
rpc echo(EchoRequestProto) returns (EchoResponseProto);
|
||||
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
|
||||
rpc error2(EmptyRequestProto) returns (EmptyResponseProto);
|
||||
rpc slowPing(SlowPingRequestProto) returns (EmptyResponseProto);
|
||||
rpc echo2(EchoRequestProto2) returns (EchoResponseProto2);
|
||||
rpc add(AddRequestProto) returns (AddResponseProto);
|
||||
rpc add2(AddRequestProto2) returns (AddResponseProto);
|
||||
rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
|
||||
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
|
||||
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
|
||||
}
|
||||
|
||||
service TestProtobufRpc2Proto {
|
||||
|
|
|
@ -24,15 +24,25 @@ import static org.mockito.Mockito.when;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ClientId;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.TestRPC.TestImpl;
|
||||
import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
|
||||
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||
import org.apache.hadoop.ipc.TestRpcBase;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.Keys;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -188,12 +198,19 @@ public class TestNMAuditLogger {
|
|||
* A special extension of {@link TestImpl} RPC server with
|
||||
* {@link TestImpl#ping()} testing the audit logs.
|
||||
*/
|
||||
private class MyTestRPCServer extends TestImpl {
|
||||
private class MyTestRPCServer extends TestRpcBase.PBServerImpl {
|
||||
@Override
|
||||
public void ping() {
|
||||
public TestProtos.EmptyResponseProto ping(
|
||||
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
// Ensure clientId is received
|
||||
byte[] clientId = Server.getClientId();
|
||||
Assert.assertNotNull(clientId);
|
||||
Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length);
|
||||
// test with ip set
|
||||
testSuccessLogFormat(true);
|
||||
testFailureLogFormat(true);
|
||||
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -203,9 +220,17 @@ public class TestNMAuditLogger {
|
|||
@Test
|
||||
public void testNMAuditLoggerWithIP() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||
|
||||
// Create server side implementation
|
||||
MyTestRPCServer serverImpl = new MyTestRPCServer();
|
||||
BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto
|
||||
.newReflectiveBlockingService(serverImpl);
|
||||
|
||||
// start the IPC server
|
||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||
.setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
|
||||
Server server = new RPC.Builder(conf)
|
||||
.setProtocol(TestRpcBase.TestRpcService.class)
|
||||
.setInstance(service).setBindAddress("0.0.0.0")
|
||||
.setPort(0).setNumHandlers(5).setVerbose(true).build();
|
||||
|
||||
server.start();
|
||||
|
@ -213,11 +238,14 @@ public class TestNMAuditLogger {
|
|||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
|
||||
// Make a client connection and test the audit log
|
||||
TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class,
|
||||
TestRpcService proxy = RPC.getProxy(TestRpcService.class,
|
||||
TestProtocol.versionID, addr, conf);
|
||||
// Start the testcase
|
||||
proxy.ping();
|
||||
TestProtos.EmptyRequestProto pingRequest =
|
||||
TestProtos.EmptyRequestProto.newBuilder().build();
|
||||
proxy.ping(null, pingRequest);
|
||||
|
||||
server.stop();
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,17 +24,27 @@ import static org.mockito.Mockito.when;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.ipc.ClientId;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.TestRPC.TestImpl;
|
||||
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||
import org.apache.hadoop.ipc.TestRpcBase;
|
||||
import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -255,12 +265,19 @@ public class TestRMAuditLogger {
|
|||
* A special extension of {@link TestImpl} RPC server with
|
||||
* {@link TestImpl#ping()} testing the audit logs.
|
||||
*/
|
||||
private class MyTestRPCServer extends TestImpl {
|
||||
private class MyTestRPCServer extends TestRpcBase.PBServerImpl {
|
||||
@Override
|
||||
public void ping() {
|
||||
public TestProtos.EmptyResponseProto ping(
|
||||
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
// Ensure clientId is received
|
||||
byte[] clientId = Server.getClientId();
|
||||
Assert.assertNotNull(clientId);
|
||||
Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length);
|
||||
// test with ip set
|
||||
testSuccessLogFormat(true);
|
||||
testFailureLogFormat(true);
|
||||
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -270,20 +287,33 @@ public class TestRMAuditLogger {
|
|||
@Test
|
||||
public void testRMAuditLoggerWithIP() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
RPC.setProtocolEngine(conf, TestRpcService.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
// Create server side implementation
|
||||
MyTestRPCServer serverImpl = new MyTestRPCServer();
|
||||
BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto
|
||||
.newReflectiveBlockingService(serverImpl);
|
||||
|
||||
// start the IPC server
|
||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||
.setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
|
||||
Server server = new RPC.Builder(conf)
|
||||
.setProtocol(TestRpcService.class)
|
||||
.setInstance(service).setBindAddress("0.0.0.0")
|
||||
.setPort(0).setNumHandlers(5).setVerbose(true).build();
|
||||
|
||||
server.start();
|
||||
|
||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
|
||||
// Make a client connection and test the audit log
|
||||
TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class,
|
||||
TestProtocol.versionID, addr, conf);
|
||||
TestRpcService proxy = RPC.getProxy(TestRpcService.class,
|
||||
TestProtocol.versionID, addr, conf);
|
||||
// Start the testcase
|
||||
proxy.ping();
|
||||
TestProtos.EmptyRequestProto pingRequest =
|
||||
TestProtos.EmptyRequestProto.newBuilder().build();
|
||||
proxy.ping(null, pingRequest);
|
||||
|
||||
server.stop();
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue