HADOOP-12813. Migrate TestRPC and related codes to rebase on ProtobufRpcEngine. Contributed by Kai Zheng.
This commit is contained in:
parent
7ddff4b37a
commit
69b195d619
|
@ -1055,6 +1055,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-12846. Credential Provider Recursive Dependencies.
|
HADOOP-12846. Credential Provider Recursive Dependencies.
|
||||||
(Larry McCay via cnauroth)
|
(Larry McCay via cnauroth)
|
||||||
|
|
||||||
|
HADOOP-12813. Migrate TestRPC and related codes to rebase on
|
||||||
|
ProtobufRpcEngine. (Kai Zheng via wheat9)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -34,8 +34,6 @@ import org.apache.commons.cli.ParseException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ipc.RPC.Server;
|
import org.apache.hadoop.ipc.RPC.Server;
|
||||||
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
|
|
||||||
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
|
|
||||||
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||||
|
@ -54,7 +52,7 @@ import com.google.protobuf.BlockingService;
|
||||||
* Benchmark for protobuf RPC.
|
* Benchmark for protobuf RPC.
|
||||||
* Run with --help option for usage.
|
* Run with --help option for usage.
|
||||||
*/
|
*/
|
||||||
public class RPCCallBenchmark implements Tool {
|
public class RPCCallBenchmark extends TestRpcBase implements Tool {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private AtomicLong callCount = new AtomicLong(0);
|
private AtomicLong callCount = new AtomicLong(0);
|
||||||
private static ThreadMXBean threadBean =
|
private static ThreadMXBean threadBean =
|
||||||
|
|
|
@ -23,8 +23,6 @@ import java.net.InetSocketAddress;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
|
|
||||||
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
|
|
||||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -32,8 +30,7 @@ import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
public class TestMultipleProtocolServer {
|
public class TestMultipleProtocolServer extends TestRpcBase {
|
||||||
private static final String ADDRESS = "0.0.0.0";
|
|
||||||
private static InetSocketAddress addr;
|
private static InetSocketAddress addr;
|
||||||
private static RPC.Server server;
|
private static RPC.Server server;
|
||||||
|
|
||||||
|
@ -64,13 +61,12 @@ public class TestMultipleProtocolServer {
|
||||||
public static final long versionID = 0L;
|
public static final long versionID = 0L;
|
||||||
void hello() throws IOException;
|
void hello() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Bar extends Mixin {
|
interface Bar extends Mixin {
|
||||||
public static final long versionID = 0L;
|
public static final long versionID = 0L;
|
||||||
int echo(int i) throws IOException;
|
int echo(int i) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Foo0Impl implements Foo0 {
|
class Foo0Impl implements Foo0 {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -185,8 +181,7 @@ public class TestMultipleProtocolServer {
|
||||||
|
|
||||||
// Add Protobuf server
|
// Add Protobuf server
|
||||||
// Create server side implementation
|
// Create server side implementation
|
||||||
PBServerImpl pbServerImpl =
|
PBServerImpl pbServerImpl = new PBServerImpl();
|
||||||
new PBServerImpl();
|
|
||||||
BlockingService service = TestProtobufRpcProto
|
BlockingService service = TestProtobufRpcProto
|
||||||
.newReflectiveBlockingService(pbServerImpl);
|
.newReflectiveBlockingService(pbServerImpl);
|
||||||
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
|
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
|
||||||
|
@ -242,7 +237,6 @@ public class TestMultipleProtocolServer {
|
||||||
foo.ping();
|
foo.ping();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* getProtocolVersion of an unimplemented version should return highest version
|
* getProtocolVersion of an unimplemented version should return highest version
|
||||||
* Similarly getProtocolSignature should work.
|
* Similarly getProtocolSignature should work.
|
||||||
|
|
|
@ -17,15 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import com.google.protobuf.BlockingService;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
import com.google.protobuf.RpcController;
|
||||||
import static org.junit.Assert.assertEquals;
|
import com.google.protobuf.ServiceException;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
@ -36,76 +30,37 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
|
||||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
import com.google.protobuf.BlockingService;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
||||||
import com.google.protobuf.RpcController;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import com.google.protobuf.ServiceException;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for testing protocol buffer based RPC mechanism.
|
* Test for testing protocol buffer based RPC mechanism.
|
||||||
* This test depends on test.proto definition of types in src/test/proto
|
* This test depends on test.proto definition of types in src/test/proto
|
||||||
* and protobuf service definition from src/test/test_rpc_service.proto
|
* and protobuf service definition from src/test/test_rpc_service.proto
|
||||||
*/
|
*/
|
||||||
public class TestProtoBufRpc {
|
public class TestProtoBufRpc extends TestRpcBase {
|
||||||
public final static String ADDRESS = "0.0.0.0";
|
|
||||||
public final static int PORT = 0;
|
|
||||||
private static InetSocketAddress addr;
|
|
||||||
private static Configuration conf;
|
|
||||||
private static RPC.Server server;
|
private static RPC.Server server;
|
||||||
private final static int SLEEP_DURATION = 1000;
|
private final static int SLEEP_DURATION = 1000;
|
||||||
|
|
||||||
@ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
|
|
||||||
public interface TestRpcService
|
|
||||||
extends TestProtobufRpcProto.BlockingInterface {
|
|
||||||
}
|
|
||||||
|
|
||||||
@ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
|
@ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
|
||||||
public interface TestRpcService2 extends
|
public interface TestRpcService2 extends
|
||||||
TestProtobufRpc2Proto.BlockingInterface {
|
TestProtobufRpc2Proto.BlockingInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class PBServerImpl implements TestRpcService {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EmptyResponseProto ping(RpcController unused,
|
|
||||||
EmptyRequestProto request) throws ServiceException {
|
|
||||||
// Ensure clientId is received
|
|
||||||
byte[] clientId = Server.getClientId();
|
|
||||||
Assert.assertNotNull(Server.getClientId());
|
|
||||||
Assert.assertEquals(16, clientId.length);
|
|
||||||
return EmptyResponseProto.newBuilder().build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
|
|
||||||
throws ServiceException {
|
|
||||||
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EmptyResponseProto error(RpcController unused,
|
|
||||||
EmptyRequestProto request) throws ServiceException {
|
|
||||||
throw new ServiceException("error", new RpcServerException("error"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EmptyResponseProto error2(RpcController unused,
|
|
||||||
EmptyRequestProto request) throws ServiceException {
|
|
||||||
throw new ServiceException("error", new URISyntaxException("",
|
|
||||||
"testException"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class PBServer2Impl implements TestRpcService2 {
|
public static class PBServer2Impl implements TestRpcService2 {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -139,6 +94,7 @@ public class TestProtoBufRpc {
|
||||||
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
|
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
|
||||||
// Set RPC engine to protobuf RPC engine
|
// Set RPC engine to protobuf RPC engine
|
||||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||||
|
RPC.setProtocolEngine(conf, TestRpcService2.class, ProtobufRpcEngine.class);
|
||||||
|
|
||||||
// Create server side implementation
|
// Create server side implementation
|
||||||
PBServerImpl serverImpl = new PBServerImpl();
|
PBServerImpl serverImpl = new PBServerImpl();
|
||||||
|
@ -166,31 +122,20 @@ public class TestProtoBufRpc {
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TestRpcService getClient() throws IOException {
|
private TestRpcService2 getClient2() throws IOException {
|
||||||
// Set RPC engine to protobuf RPC engine
|
return RPC.getProxy(TestRpcService2.class, 0, addr, conf);
|
||||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
|
||||||
return RPC.getProxy(TestRpcService.class, 0, addr, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TestRpcService2 getClient2() throws IOException {
|
|
||||||
// Set RPC engine to protobuf RPC engine
|
|
||||||
RPC.setProtocolEngine(conf, TestRpcService2.class,
|
|
||||||
ProtobufRpcEngine.class);
|
|
||||||
return RPC.getProxy(TestRpcService2.class, 0, addr,
|
|
||||||
conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=5000)
|
@Test (timeout=5000)
|
||||||
public void testProtoBufRpc() throws Exception {
|
public void testProtoBufRpc() throws Exception {
|
||||||
TestRpcService client = getClient();
|
TestRpcService client = getClient(addr, conf);
|
||||||
testProtoBufRpc(client);
|
testProtoBufRpc(client);
|
||||||
}
|
}
|
||||||
|
|
||||||
// separated test out so that other tests can call it.
|
// separated test out so that other tests can call it.
|
||||||
public static void testProtoBufRpc(TestRpcService client) throws Exception {
|
public static void testProtoBufRpc(TestRpcService client) throws Exception {
|
||||||
// Test ping method
|
// Test ping method
|
||||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
client.ping(null, newEmptyRequest());
|
||||||
client.ping(null, emptyRequest);
|
|
||||||
|
|
||||||
// Test echo method
|
// Test echo method
|
||||||
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
||||||
|
@ -200,7 +145,7 @@ public class TestProtoBufRpc {
|
||||||
|
|
||||||
// Test error method - error should be thrown as RemoteException
|
// Test error method - error should be thrown as RemoteException
|
||||||
try {
|
try {
|
||||||
client.error(null, emptyRequest);
|
client.error(null, newEmptyRequest());
|
||||||
Assert.fail("Expected exception is not thrown");
|
Assert.fail("Expected exception is not thrown");
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
RemoteException re = (RemoteException)e.getCause();
|
RemoteException re = (RemoteException)e.getCause();
|
||||||
|
@ -217,13 +162,11 @@ public class TestProtoBufRpc {
|
||||||
TestRpcService2 client = getClient2();
|
TestRpcService2 client = getClient2();
|
||||||
|
|
||||||
// Test ping method
|
// Test ping method
|
||||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
client.ping2(null, newEmptyRequest());
|
||||||
client.ping2(null, emptyRequest);
|
|
||||||
|
|
||||||
// Test echo method
|
// Test echo method
|
||||||
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
EchoResponseProto echoResponse = client.echo2(null,
|
||||||
.setMessage("hello").build();
|
newEchoRequest("hello"));
|
||||||
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
|
|
||||||
Assert.assertEquals(echoResponse.getMessage(), "hello");
|
Assert.assertEquals(echoResponse.getMessage(), "hello");
|
||||||
|
|
||||||
// Ensure RPC metrics are updated
|
// Ensure RPC metrics are updated
|
||||||
|
@ -238,11 +181,10 @@ public class TestProtoBufRpc {
|
||||||
|
|
||||||
@Test (timeout=5000)
|
@Test (timeout=5000)
|
||||||
public void testProtoBufRandomException() throws Exception {
|
public void testProtoBufRandomException() throws Exception {
|
||||||
TestRpcService client = getClient();
|
TestRpcService client = getClient(addr, conf);
|
||||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
client.error2(null, emptyRequest);
|
client.error2(null, newEmptyRequest());
|
||||||
} catch (ServiceException se) {
|
} catch (ServiceException se) {
|
||||||
Assert.assertTrue(se.getCause() instanceof RemoteException);
|
Assert.assertTrue(se.getCause() instanceof RemoteException);
|
||||||
RemoteException re = (RemoteException) se.getCause();
|
RemoteException re = (RemoteException) se.getCause();
|
||||||
|
@ -258,17 +200,14 @@ public class TestProtoBufRpc {
|
||||||
public void testExtraLongRpc() throws Exception {
|
public void testExtraLongRpc() throws Exception {
|
||||||
TestRpcService2 client = getClient2();
|
TestRpcService2 client = getClient2();
|
||||||
final String shortString = StringUtils.repeat("X", 4);
|
final String shortString = StringUtils.repeat("X", 4);
|
||||||
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
|
||||||
.setMessage(shortString).build();
|
|
||||||
// short message goes through
|
// short message goes through
|
||||||
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
|
EchoResponseProto echoResponse = client.echo2(null,
|
||||||
|
newEchoRequest(shortString));
|
||||||
Assert.assertEquals(shortString, echoResponse.getMessage());
|
Assert.assertEquals(shortString, echoResponse.getMessage());
|
||||||
|
|
||||||
final String longString = StringUtils.repeat("X", 4096);
|
final String longString = StringUtils.repeat("X", 4096);
|
||||||
echoRequest = EchoRequestProto.newBuilder()
|
|
||||||
.setMessage(longString).build();
|
|
||||||
try {
|
try {
|
||||||
echoResponse = client.echo2(null, echoRequest);
|
client.echo2(null, newEchoRequest(longString));
|
||||||
Assert.fail("expected extra-long RPC to fail");
|
Assert.fail("expected extra-long RPC to fail");
|
||||||
} catch (ServiceException se) {
|
} catch (ServiceException se) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -281,8 +220,7 @@ public class TestProtoBufRpc {
|
||||||
// make 10 K fast calls
|
// make 10 K fast calls
|
||||||
for (int x = 0; x < 10000; x++) {
|
for (int x = 0; x < 10000; x++) {
|
||||||
try {
|
try {
|
||||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
client.ping2(null, newEmptyRequest());
|
||||||
client.ping2(null, emptyRequest);
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
|
@ -294,10 +232,7 @@ public class TestProtoBufRpc {
|
||||||
long before = rpcMetrics.getRpcSlowCalls();
|
long before = rpcMetrics.getRpcSlowCalls();
|
||||||
|
|
||||||
// make a really slow call. Sleep sleeps for 1000ms
|
// make a really slow call. Sleep sleeps for 1000ms
|
||||||
TestProtos.SleepRequestProto sleepRequest =
|
client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
|
||||||
TestProtos.SleepRequestProto.newBuilder()
|
|
||||||
.setMilliSeconds(SLEEP_DURATION * 3).build();
|
|
||||||
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
|
||||||
|
|
||||||
long after = rpcMetrics.getRpcSlowCalls();
|
long after = rpcMetrics.getRpcSlowCalls();
|
||||||
// Ensure slow call is logged.
|
// Ensure slow call is logged.
|
||||||
|
@ -312,8 +247,7 @@ public class TestProtoBufRpc {
|
||||||
|
|
||||||
// make 10 K fast calls
|
// make 10 K fast calls
|
||||||
for (int x = 0; x < 10000; x++) {
|
for (int x = 0; x < 10000; x++) {
|
||||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
client.ping2(null, newEmptyRequest());
|
||||||
client.ping2(null, emptyRequest);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure RPC metrics are updated
|
// Ensure RPC metrics are updated
|
||||||
|
@ -322,10 +256,7 @@ public class TestProtoBufRpc {
|
||||||
long before = rpcMetrics.getRpcSlowCalls();
|
long before = rpcMetrics.getRpcSlowCalls();
|
||||||
|
|
||||||
// make a really slow call. Sleep sleeps for 1000ms
|
// make a really slow call. Sleep sleeps for 1000ms
|
||||||
TestProtos.SleepRequestProto sleepRequest =
|
client.sleep(null, newSleepRequest(SLEEP_DURATION));
|
||||||
TestProtos.SleepRequestProto.newBuilder()
|
|
||||||
.setMilliSeconds(SLEEP_DURATION).build();
|
|
||||||
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
|
||||||
|
|
||||||
long after = rpcMetrics.getRpcSlowCalls();
|
long after = rpcMetrics.getRpcSlowCalls();
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,106 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
/** Split from TestRPC. */
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public class TestRPCServerShutdown extends TestRpcBase {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestRPCServerShutdown.class);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
setupConf();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify the RPC server can shutdown properly when callQueue is full.
|
||||||
|
*/
|
||||||
|
@Test (timeout=30000)
|
||||||
|
public void testRPCServerShutdown() throws Exception {
|
||||||
|
final int numClients = 3;
|
||||||
|
final List<Future<Void>> res = new ArrayList<Future<Void>>();
|
||||||
|
final ExecutorService executorService =
|
||||||
|
Executors.newFixedThreadPool(numClients);
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
|
RPC.Builder builder = newServerBuilder(conf)
|
||||||
|
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
|
||||||
|
final Server server = setupTestServer(builder);
|
||||||
|
|
||||||
|
final TestRpcService proxy = getClient(addr, conf);
|
||||||
|
try {
|
||||||
|
// start a sleep RPC call to consume the only handler thread.
|
||||||
|
// Start another sleep RPC call to make callQueue full.
|
||||||
|
// Start another sleep RPC call to make reader thread block on CallQueue.
|
||||||
|
for (int i = 0; i < numClients; i++) {
|
||||||
|
res.add(executorService.submit(
|
||||||
|
new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws ServiceException, InterruptedException {
|
||||||
|
proxy.sleep(null, newSleepRequest(100000));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
while (server.getCallQueueLen() != 1
|
||||||
|
|| countThreads(CallQueueManager.class.getName()) != 1
|
||||||
|
|| countThreads(PBServerImpl.class.getName()) != 1) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
stop(server, proxy);
|
||||||
|
assertEquals("Not enough clients", numClients, res.size());
|
||||||
|
for (Future<Void> f : res) {
|
||||||
|
try {
|
||||||
|
f.get();
|
||||||
|
fail("Future get should not return");
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
ServiceException se = (ServiceException) e.getCause();
|
||||||
|
assertTrue("Unexpected exception: " + se,
|
||||||
|
se.getCause() instanceof IOException);
|
||||||
|
LOG.info("Expected exception", e.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,295 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.ThreadInfo;
|
||||||
|
import java.lang.management.ThreadMXBean;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
/** Test facilities for unit tests for RPC. */
|
||||||
|
public class TestRpcBase {
|
||||||
|
|
||||||
|
protected final static String SERVER_PRINCIPAL_KEY =
|
||||||
|
"test.ipc.server.principal";
|
||||||
|
protected final static String ADDRESS = "0.0.0.0";
|
||||||
|
protected final static int PORT = 0;
|
||||||
|
protected static InetSocketAddress addr;
|
||||||
|
protected static Configuration conf;
|
||||||
|
|
||||||
|
protected void setupConf() {
|
||||||
|
conf = new Configuration();
|
||||||
|
// Set RPC engine to protobuf RPC engine
|
||||||
|
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static RPC.Builder newServerBuilder(
|
||||||
|
Configuration serverConf) throws IOException {
|
||||||
|
// Create server side implementation
|
||||||
|
PBServerImpl serverImpl = new PBServerImpl();
|
||||||
|
BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto
|
||||||
|
.newReflectiveBlockingService(serverImpl);
|
||||||
|
|
||||||
|
// Get RPC server for server side implementation
|
||||||
|
RPC.Builder builder = new RPC.Builder(serverConf)
|
||||||
|
.setProtocol(TestRpcService.class)
|
||||||
|
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT);
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static RPC.Server setupTestServer(Configuration serverConf,
|
||||||
|
int numHandlers) throws IOException {
|
||||||
|
return setupTestServer(serverConf, numHandlers, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static RPC.Server setupTestServer(Configuration serverConf,
|
||||||
|
int numHandlers,
|
||||||
|
SecretManager<?> serverSm)
|
||||||
|
throws IOException {
|
||||||
|
RPC.Builder builder = newServerBuilder(serverConf);
|
||||||
|
|
||||||
|
if (numHandlers > 0) {
|
||||||
|
builder.setNumHandlers(numHandlers);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (serverSm != null) {
|
||||||
|
builder.setSecretManager(serverSm);
|
||||||
|
}
|
||||||
|
|
||||||
|
return setupTestServer(builder);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException {
|
||||||
|
RPC.Server server = builder.build();
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||||
|
Configuration clientConf)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void stop(Server server, TestRpcService proxy) {
|
||||||
|
if (proxy != null) {
|
||||||
|
try {
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
} catch (Exception ignored) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (server != null) {
|
||||||
|
try {
|
||||||
|
server.stop();
|
||||||
|
} catch (Exception ignored) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count the number of threads that have a stack frame containing
|
||||||
|
* the given string
|
||||||
|
*/
|
||||||
|
protected static int countThreads(String search) {
|
||||||
|
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
|
||||||
|
for (ThreadInfo info : infos) {
|
||||||
|
if (info == null) continue;
|
||||||
|
for (StackTraceElement elem : info.getStackTrace()) {
|
||||||
|
if (elem.getClassName().contains(search)) {
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
|
||||||
|
protocolVersion = 1)
|
||||||
|
public interface TestRpcService
|
||||||
|
extends TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class PBServerImpl implements TestRpcService {
|
||||||
|
CountDownLatch fastPingCounter = new CountDownLatch(2);
|
||||||
|
private List<Server.Call> postponedCalls = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto ping(RpcController unused,
|
||||||
|
TestProtos.EmptyRequestProto request) throws ServiceException {
|
||||||
|
// Ensure clientId is received
|
||||||
|
byte[] clientId = Server.getClientId();
|
||||||
|
Assert.assertNotNull(clientId);
|
||||||
|
Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length);
|
||||||
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EchoResponseProto echo(
|
||||||
|
RpcController unused, TestProtos.EchoRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
return TestProtos.EchoResponseProto.newBuilder().setMessage(
|
||||||
|
request.getMessage())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto error(
|
||||||
|
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
throw new ServiceException("error", new RpcServerException("error"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto error2(
|
||||||
|
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
throw new ServiceException("error", new URISyntaxException("",
|
||||||
|
"testException"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto slowPing(
|
||||||
|
RpcController unused, TestProtos.SlowPingRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
boolean shouldSlow = request.getShouldSlow();
|
||||||
|
if (shouldSlow) {
|
||||||
|
try {
|
||||||
|
fastPingCounter.await(); //slow response until two fast pings happened
|
||||||
|
} catch (InterruptedException ignored) {}
|
||||||
|
} else {
|
||||||
|
fastPingCounter.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EchoResponseProto2 echo2(
|
||||||
|
RpcController controller, TestProtos.EchoRequestProto2 request)
|
||||||
|
throws ServiceException {
|
||||||
|
return TestProtos.EchoResponseProto2.newBuilder().addAllMessage(
|
||||||
|
request.getMessageList()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.AddResponseProto add(
|
||||||
|
RpcController controller, TestProtos.AddRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
return TestProtos.AddResponseProto.newBuilder().setResult(
|
||||||
|
request.getParam1() + request.getParam2()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.AddResponseProto add2(
|
||||||
|
RpcController controller, TestProtos.AddRequestProto2 request)
|
||||||
|
throws ServiceException {
|
||||||
|
int sum = 0;
|
||||||
|
for (Integer num : request.getParamsList()) {
|
||||||
|
sum += num;
|
||||||
|
}
|
||||||
|
return TestProtos.AddResponseProto.newBuilder().setResult(sum).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto testServerGet(
|
||||||
|
RpcController controller, TestProtos.EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
if (!(Server.get() instanceof RPC.Server)) {
|
||||||
|
throw new ServiceException("Server.get() failed");
|
||||||
|
}
|
||||||
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.ExchangeResponseProto exchange(
|
||||||
|
RpcController controller, TestProtos.ExchangeRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
Integer[] values = new Integer[request.getValuesCount()];
|
||||||
|
for (int i = 0; i < values.length; i++) {
|
||||||
|
values[i] = i;
|
||||||
|
}
|
||||||
|
return TestProtos.ExchangeResponseProto.newBuilder()
|
||||||
|
.addAllValues(Arrays.asList(values)).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestProtos.EmptyResponseProto sleep(
|
||||||
|
RpcController controller, TestProtos.SleepRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
Thread.sleep(request.getMilliSeconds());
|
||||||
|
} catch (InterruptedException ignore) {}
|
||||||
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static TestProtos.EmptyRequestProto newEmptyRequest() {
|
||||||
|
return TestProtos.EmptyRequestProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static TestProtos.EchoRequestProto newEchoRequest(String msg) {
|
||||||
|
return TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String convert(TestProtos.EchoResponseProto response) {
|
||||||
|
return response.getMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static TestProtos.SlowPingRequestProto newSlowPingRequest(
|
||||||
|
boolean shouldSlow) throws ServiceException {
|
||||||
|
return TestProtos.SlowPingRequestProto.newBuilder().
|
||||||
|
setShouldSlow(shouldSlow).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static TestProtos.SleepRequestProto newSleepRequest(
|
||||||
|
int milliSeconds) {
|
||||||
|
return TestProtos.SleepRequestProto.newBuilder()
|
||||||
|
.setMilliSeconds(milliSeconds).build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -49,3 +49,36 @@ message SleepRequestProto{
|
||||||
|
|
||||||
message SleepResponseProto{
|
message SleepResponseProto{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message SlowPingRequestProto {
|
||||||
|
required bool shouldSlow = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message EchoRequestProto2 {
|
||||||
|
repeated string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message EchoResponseProto2 {
|
||||||
|
repeated string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message AddRequestProto {
|
||||||
|
required int32 param1 = 1;
|
||||||
|
required int32 param2 = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message AddRequestProto2 {
|
||||||
|
repeated int32 params = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message AddResponseProto {
|
||||||
|
required int32 result = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ExchangeRequestProto {
|
||||||
|
repeated int32 values = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ExchangeResponseProto {
|
||||||
|
repeated int32 values = 1;
|
||||||
|
}
|
|
@ -32,6 +32,13 @@ service TestProtobufRpcProto {
|
||||||
rpc echo(EchoRequestProto) returns (EchoResponseProto);
|
rpc echo(EchoRequestProto) returns (EchoResponseProto);
|
||||||
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
|
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
|
||||||
rpc error2(EmptyRequestProto) returns (EmptyResponseProto);
|
rpc error2(EmptyRequestProto) returns (EmptyResponseProto);
|
||||||
|
rpc slowPing(SlowPingRequestProto) returns (EmptyResponseProto);
|
||||||
|
rpc echo2(EchoRequestProto2) returns (EchoResponseProto2);
|
||||||
|
rpc add(AddRequestProto) returns (AddResponseProto);
|
||||||
|
rpc add2(AddRequestProto2) returns (AddResponseProto);
|
||||||
|
rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
|
||||||
|
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
|
||||||
|
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
|
||||||
}
|
}
|
||||||
|
|
||||||
service TestProtobufRpc2Proto {
|
service TestProtobufRpc2Proto {
|
||||||
|
|
|
@ -24,15 +24,25 @@ import static org.mockito.Mockito.when;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.ClientId;
|
||||||
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.TestRPC.TestImpl;
|
import org.apache.hadoop.ipc.TestRPC.TestImpl;
|
||||||
|
import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
|
||||||
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||||
|
import org.apache.hadoop.ipc.TestRpcBase;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.Keys;
|
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.Keys;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -188,12 +198,19 @@ public class TestNMAuditLogger {
|
||||||
* A special extension of {@link TestImpl} RPC server with
|
* A special extension of {@link TestImpl} RPC server with
|
||||||
* {@link TestImpl#ping()} testing the audit logs.
|
* {@link TestImpl#ping()} testing the audit logs.
|
||||||
*/
|
*/
|
||||||
private class MyTestRPCServer extends TestImpl {
|
private class MyTestRPCServer extends TestRpcBase.PBServerImpl {
|
||||||
@Override
|
@Override
|
||||||
public void ping() {
|
public TestProtos.EmptyResponseProto ping(
|
||||||
|
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
// Ensure clientId is received
|
||||||
|
byte[] clientId = Server.getClientId();
|
||||||
|
Assert.assertNotNull(clientId);
|
||||||
|
Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length);
|
||||||
// test with ip set
|
// test with ip set
|
||||||
testSuccessLogFormat(true);
|
testSuccessLogFormat(true);
|
||||||
testFailureLogFormat(true);
|
testFailureLogFormat(true);
|
||||||
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,9 +220,17 @@ public class TestNMAuditLogger {
|
||||||
@Test
|
@Test
|
||||||
public void testNMAuditLoggerWithIP() throws Exception {
|
public void testNMAuditLoggerWithIP() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||||
|
|
||||||
|
// Create server side implementation
|
||||||
|
MyTestRPCServer serverImpl = new MyTestRPCServer();
|
||||||
|
BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto
|
||||||
|
.newReflectiveBlockingService(serverImpl);
|
||||||
|
|
||||||
// start the IPC server
|
// start the IPC server
|
||||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
Server server = new RPC.Builder(conf)
|
||||||
.setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
|
.setProtocol(TestRpcBase.TestRpcService.class)
|
||||||
|
.setInstance(service).setBindAddress("0.0.0.0")
|
||||||
.setPort(0).setNumHandlers(5).setVerbose(true).build();
|
.setPort(0).setNumHandlers(5).setVerbose(true).build();
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
@ -213,11 +238,14 @@ public class TestNMAuditLogger {
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
// Make a client connection and test the audit log
|
// Make a client connection and test the audit log
|
||||||
TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class,
|
TestRpcService proxy = RPC.getProxy(TestRpcService.class,
|
||||||
TestProtocol.versionID, addr, conf);
|
TestProtocol.versionID, addr, conf);
|
||||||
// Start the testcase
|
// Start the testcase
|
||||||
proxy.ping();
|
TestProtos.EmptyRequestProto pingRequest =
|
||||||
|
TestProtos.EmptyRequestProto.newBuilder().build();
|
||||||
|
proxy.ping(null, pingRequest);
|
||||||
|
|
||||||
server.stop();
|
server.stop();
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,17 +24,27 @@ import static org.mockito.Mockito.when;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.CallerContext;
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
|
import org.apache.hadoop.ipc.ClientId;
|
||||||
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.TestRPC.TestImpl;
|
import org.apache.hadoop.ipc.TestRPC.TestImpl;
|
||||||
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
|
||||||
|
import org.apache.hadoop.ipc.TestRpcBase;
|
||||||
|
import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -255,12 +265,19 @@ public class TestRMAuditLogger {
|
||||||
* A special extension of {@link TestImpl} RPC server with
|
* A special extension of {@link TestImpl} RPC server with
|
||||||
* {@link TestImpl#ping()} testing the audit logs.
|
* {@link TestImpl#ping()} testing the audit logs.
|
||||||
*/
|
*/
|
||||||
private class MyTestRPCServer extends TestImpl {
|
private class MyTestRPCServer extends TestRpcBase.PBServerImpl {
|
||||||
@Override
|
@Override
|
||||||
public void ping() {
|
public TestProtos.EmptyResponseProto ping(
|
||||||
|
RpcController unused, TestProtos.EmptyRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
// Ensure clientId is received
|
||||||
|
byte[] clientId = Server.getClientId();
|
||||||
|
Assert.assertNotNull(clientId);
|
||||||
|
Assert.assertEquals(ClientId.BYTE_LENGTH, clientId.length);
|
||||||
// test with ip set
|
// test with ip set
|
||||||
testSuccessLogFormat(true);
|
testSuccessLogFormat(true);
|
||||||
testFailureLogFormat(true);
|
testFailureLogFormat(true);
|
||||||
|
return TestProtos.EmptyResponseProto.newBuilder().build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,20 +287,33 @@ public class TestRMAuditLogger {
|
||||||
@Test
|
@Test
|
||||||
public void testRMAuditLoggerWithIP() throws Exception {
|
public void testRMAuditLoggerWithIP() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
RPC.setProtocolEngine(conf, TestRpcService.class,
|
||||||
|
ProtobufRpcEngine.class);
|
||||||
|
|
||||||
|
// Create server side implementation
|
||||||
|
MyTestRPCServer serverImpl = new MyTestRPCServer();
|
||||||
|
BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto
|
||||||
|
.newReflectiveBlockingService(serverImpl);
|
||||||
|
|
||||||
// start the IPC server
|
// start the IPC server
|
||||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
Server server = new RPC.Builder(conf)
|
||||||
.setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
|
.setProtocol(TestRpcService.class)
|
||||||
|
.setInstance(service).setBindAddress("0.0.0.0")
|
||||||
.setPort(0).setNumHandlers(5).setVerbose(true).build();
|
.setPort(0).setNumHandlers(5).setVerbose(true).build();
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
// Make a client connection and test the audit log
|
// Make a client connection and test the audit log
|
||||||
TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class,
|
TestRpcService proxy = RPC.getProxy(TestRpcService.class,
|
||||||
TestProtocol.versionID, addr, conf);
|
TestProtocol.versionID, addr, conf);
|
||||||
// Start the testcase
|
// Start the testcase
|
||||||
proxy.ping();
|
TestProtos.EmptyRequestProto pingRequest =
|
||||||
|
TestProtos.EmptyRequestProto.newBuilder().build();
|
||||||
|
proxy.ping(null, pingRequest);
|
||||||
|
|
||||||
server.stop();
|
server.stop();
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue