HADOOP-13218. Migrate other Hadoop side tests to prepare for removing WritableRPCEngine. Contributed by Wei Zhou and Kai Zheng

This commit is contained in:
Kai Zheng 2016-09-07 17:05:33 +08:00
parent f9557127b3
commit 62a9667136
18 changed files with 299 additions and 881 deletions

View File

@ -60,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ThreadLocal<AsyncGet<Message, Exception>>
ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for WritableRpcEngine
static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
@ -194,7 +194,8 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (args.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
throw new ServiceException(
"Too many or few parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
@ -26,7 +28,6 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.io.*;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
@ -37,11 +38,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
@ -54,7 +56,6 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@ -87,7 +88,7 @@ public class RPC {
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
public final short value; //TODO make it private
private final short value;
RpcKind(short val) {
this.value = val;
@ -207,7 +208,7 @@ public class RPC {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
ProtobufRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}

View File

@ -236,14 +236,14 @@ public abstract class Server {
static class RpcKindMapValue {
final Class<? extends Writable> rpcRequestWrapperClass;
final RpcInvoker rpcInvoker;
RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
RpcInvoker rpcInvoker) {
this.rpcInvoker = rpcInvoker;
this.rpcRequestWrapperClass = rpcRequestWrapperClass;
}
}
static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new
HashMap<RPC.RpcKind, RpcKindMapValue>(4);
static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new HashMap<>(4);

View File

@ -715,7 +715,7 @@ public class UserGroupInformation {
*
* @param user The principal name to load from the ticket
* cache
* @param ticketCachePath the path to the ticket cache file
* @param ticketCache the path to the ticket cache file
*
* @throws IOException if the kerberos login fails
*/
@ -775,7 +775,7 @@ public class UserGroupInformation {
/**
* Create a UserGroupInformation from a Subject with Kerberos principal.
*
* @param user The KerberosPrincipal to use in UGI
* @param subject The KerberosPrincipal to use in UGI
*
* @throws IOException if the kerberos login fails
*/

View File

@ -17,13 +17,8 @@
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Joiner;
import com.google.protobuf.BlockingService;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@ -34,7 +29,6 @@ import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
@ -45,8 +39,12 @@ import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Joiner;
import com.google.protobuf.BlockingService;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicLong;
/**
* Benchmark for protobuf RPC.
@ -68,7 +66,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
public int secondsToRun = 15;
private int msgSize = 1024;
public Class<? extends RpcEngine> rpcEngine =
WritableRpcEngine.class;
ProtobufRpcEngine.class;
private MyOptions(String args[]) {
try {
@ -135,7 +133,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
opts.addOption(
OptionBuilder.withLongOpt("engine").hasArg(true)
.withArgName("writable|protobuf")
.withArgName("protobuf")
.withDescription("engine to use")
.create('e'));
@ -184,8 +182,6 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
String eng = line.getOptionValue('e');
if ("protobuf".equals(eng)) {
rpcEngine = ProtobufRpcEngine.class;
} else if ("writable".equals(eng)) {
rpcEngine = WritableRpcEngine.class;
} else {
throw new ParseException("invalid engine: " + eng);
}
@ -237,11 +233,6 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(opts.host).setPort(opts.getPort())
.setNumHandlers(opts.serverThreads).setVerbose(false).build();
} else if (opts.rpcEngine == WritableRpcEngine.class) {
server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host)
.setPort(opts.getPort()).setNumHandlers(opts.serverThreads)
.setVerbose(false).build();
} else {
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
}
@ -399,15 +390,6 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
return responseProto.getMessage();
}
};
} else if (opts.rpcEngine == WritableRpcEngine.class) {
final TestProtocol proxy = RPC.getProxy(
TestProtocol.class, TestProtocol.versionID, addr, conf);
return new RpcServiceWrapper() {
@Override
public String doEcho(String msg) throws Exception {
return proxy.echo(msg);
}
};
} else {
throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
}

View File

@ -17,252 +17,28 @@
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.net.NetUtils;
import org.junit.Before;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.protobuf.BlockingService;
public class TestMultipleProtocolServer extends TestRpcBase {
private static InetSocketAddress addr;
private static RPC.Server server;
private static Configuration conf = new Configuration();
@ProtocolInfo(protocolName="Foo")
interface Foo0 extends VersionedProtocol {
public static final long versionID = 0L;
String ping() throws IOException;
}
@ProtocolInfo(protocolName="Foo")
interface Foo1 extends VersionedProtocol {
public static final long versionID = 1L;
String ping() throws IOException;
String ping2() throws IOException;
}
@ProtocolInfo(protocolName="Foo")
interface FooUnimplemented extends VersionedProtocol {
public static final long versionID = 2L;
String ping() throws IOException;
}
interface Mixin extends VersionedProtocol{
public static final long versionID = 0L;
void hello() throws IOException;
}
interface Bar extends Mixin {
public static final long versionID = 0L;
int echo(int i) throws IOException;
}
class Foo0Impl implements Foo0 {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Foo0.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public String ping() {
return "Foo0";
}
}
class Foo1Impl implements Foo1 {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Foo1.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public String ping() {
return "Foo1";
}
@Override
public String ping2() {
return "Foo1";
}
}
class BarImpl implements Bar {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Bar.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public int echo(int i) {
return i;
}
@Override
public void hello() {
}
}
@Before
public void setUp() throws Exception {
// create a server with two handlers
server = new RPC.Builder(conf).setProtocol(Foo0.class)
.setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
// Add Protobuf server
// Create server side implementation
PBServerImpl pbServerImpl = new PBServerImpl();
BlockingService service = TestProtobufRpcProto
.newReflectiveBlockingService(pbServerImpl);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
service);
server.start();
addr = NetUtils.getConnectAddress(server);
super.setupConf();
server = setupTestServer(conf, 2);
}
@After
public void tearDown() throws Exception {
server.stop();
}
@Test
public void test1() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
Foo0 foo0 = (Foo0)proxy.getProxy();
Assert.assertEquals("Foo0", foo0.ping());
proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
Foo1 foo1 = (Foo1)proxy.getProxy();
Assert.assertEquals("Foo1", foo1.ping());
Assert.assertEquals("Foo1", foo1.ping());
proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
Bar bar = (Bar)proxy.getProxy();
Assert.assertEquals(99, bar.echo(99));
// Now test Mixin class method
Mixin mixin = bar;
mixin.hello();
}
// Server does not implement the FooUnimplemented version of protocol Foo.
// See that calls to it fail.
@Test(expected=IOException.class)
public void testNonExistingProtocol() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(FooUnimplemented.class,
FooUnimplemented.versionID, addr, conf);
FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
foo.ping();
}
/**
* getProtocolVersion of an unimplemented version should return highest version
* Similarly getProtocolSignature should work.
* @throws IOException
*/
@Test
public void testNonExistingProtocol2() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(FooUnimplemented.class,
FooUnimplemented.versionID, addr, conf);
FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
Assert.assertEquals(Foo1.versionID,
foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class),
FooUnimplemented.versionID));
foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class),
FooUnimplemented.versionID, 0);
}
@Test(expected=IOException.class)
public void testIncorrectServerCreation() throws IOException {
new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
.build();
}
// Now test a PB service - a server hosts both PB and Writable Rpcs.
@Test
public void testPBService() throws Exception {

View File

@ -25,19 +25,6 @@ import org.junit.Test;
public class TestRPCCallBenchmark {
@Test(timeout=20000)
public void testBenchmarkWithWritable() throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(),
new String[] {
"--clientThreads", "30",
"--serverThreads", "30",
"--time", "5",
"--serverReaderThreads", "4",
"--messageSize", "1024",
"--engine", "writable"});
assertEquals(0, rc);
}
@Test(timeout=20000)
public void testBenchmarkWithProto() throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(),

View File

@ -18,27 +18,19 @@
package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/** Unit test for supporting method-name based compatible RPCs. */
public class TestRPCCompatibility {
@ -49,7 +41,7 @@ public class TestRPCCompatibility {
public static final Log LOG =
LogFactory.getLog(TestRPCCompatibility.class);
private static Configuration conf = new Configuration();
public interface TestProtocol0 extends VersionedProtocol {
@ -120,6 +112,21 @@ public class TestRPCCompatibility {
@Before
public void setUp() {
ProtocolSignature.resetCache();
RPC.setProtocolEngine(conf,
TestProtocol0.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol1.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol2.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol3.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol4.class, ProtobufRpcEngine.class);
}
@After
@ -133,117 +140,7 @@ public class TestRPCCompatibility {
server = null;
}
}
@Test // old client vs new server
public void testVersion0ClientVersion1Server() throws Exception {
// create a server with two handlers
TestImpl1 impl = new TestImpl1();
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
proxy = RPC.getProtocolProxy(
TestProtocol0.class, TestProtocol0.versionID, addr, conf);
TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy();
proxy0.ping();
}
@Test // old client vs new server
public void testVersion1ClientVersion0Server() throws Exception {
// create a server with two handlers
server = new RPC.Builder(conf).setProtocol(TestProtocol0.class)
.setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start();
addr = NetUtils.getConnectAddress(server);
proxy = RPC.getProtocolProxy(
TestProtocol1.class, TestProtocol1.versionID, addr, conf);
TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy();
proxy1.ping();
try {
proxy1.echo("hello");
fail("Echo should fail");
} catch(IOException e) {
}
}
private class Version2Client {
private TestProtocol2 proxy2;
private ProtocolProxy<TestProtocol2> serverInfo;
private Version2Client() throws IOException {
serverInfo = RPC.getProtocolProxy(
TestProtocol2.class, TestProtocol2.versionID, addr, conf);
proxy2 = serverInfo.getProxy();
}
public int echo(int value) throws IOException, NumberFormatException {
if (serverInfo.isMethodSupported("echo", int.class)) {
System.out.println("echo int is supported");
return -value; // use version 3 echo long
} else { // server is version 2
System.out.println("echo int is NOT supported");
return Integer.parseInt(proxy2.echo(String.valueOf(value)));
}
}
public String echo(String value) throws IOException {
return proxy2.echo(value);
}
public void ping() throws IOException {
proxy2.ping();
}
}
@Test // Compatible new client & old server
public void testVersion2ClientVersion1Server() throws Exception {
// create a server with two handlers
TestImpl1 impl = new TestImpl1();
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
Version2Client client = new Version2Client();
client.ping();
assertEquals("hello", client.echo("hello"));
// echo(int) is not supported by server, so returning 3
// This verifies that echo(int) and echo(String)'s hash codes are different
assertEquals(3, client.echo(3));
}
@Test // equal version client and server
public void testVersion2ClientVersion2Server() throws Exception {
// create a server with two handlers
TestImpl2 impl = new TestImpl2();
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
Version2Client client = new Version2Client();
client.ping();
assertEquals("hello", client.echo("hello"));
// now that echo(int) is supported by the server, echo(int) should return -3
assertEquals(-3, client.echo(3));
}
public interface TestProtocol3 {
int echo(String value);
int echo(int value);
@ -297,97 +194,4 @@ System.out.println("echo int is NOT supported");
@Override
int echo(int value) throws IOException;
}
@Test
public void testVersionMismatch() throws IOException {
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
.setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start();
addr = NetUtils.getConnectAddress(server);
TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
TestProtocol4.versionID, addr, conf);
try {
proxy.echo(21);
fail("The call must throw VersionMismatch exception");
} catch (RemoteException ex) {
Assert.assertEquals(RPC.VersionMismatch.class.getName(),
ex.getClassName());
Assert.assertTrue(ex.getErrorCode().equals(
RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH));
} catch (IOException ex) {
fail("Expected version mismatch but got " + ex);
}
}
@Test
public void testIsMethodSupported() throws IOException {
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
.setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start();
addr = NetUtils.getConnectAddress(server);
TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
TestProtocol2.versionID, addr, conf);
boolean supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertTrue(supported);
supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertFalse(supported);
}
/**
* Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
* the server registry to extract protocol signatures and versions.
*/
@Test
public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
TestImpl1 impl = new TestImpl1();
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(server);
GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RPC.RpcKind.RPC_PROTOCOL_BUFFER));
//No signatures should be found
Assert.assertEquals(0, resp.getProtocolSignatureCount());
resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RPC.RpcKind.RPC_WRITABLE));
Assert.assertEquals(1, resp.getProtocolSignatureCount());
ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
boolean found = false;
int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
.getMethod("echo", String.class));
for (int m : sig.getMethodsList()) {
if (expected == m) {
found = true;
break;
}
}
Assert.assertTrue(found);
}
private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
Class<?> protocol, RPC.RpcKind rpcKind) {
GetProtocolSignatureRequestProto.Builder builder =
GetProtocolSignatureRequestProto.newBuilder();
builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString());
return builder.build();
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.ipc;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@ -30,11 +28,13 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
/**
* tests that the proxy can be interrupted
*/
public class TestRPCWaitForProxy extends Assert {
private static final String ADDRESS = "0.0.0.0";
public class TestRPCWaitForProxy extends TestRpcBase {
private static final Logger
LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
@ -46,14 +46,15 @@ public class TestRPCWaitForProxy extends Assert {
*
* @throws Throwable any exception other than that which was expected
*/
@Test(timeout = 10000)
@Test(timeout = 50000)
public void testWaitForProxy() throws Throwable {
RpcThread worker = new RpcThread(0);
worker.start();
worker.join();
Throwable caught = worker.getCaught();
assertNotNull("No exception was raised", caught);
if (!(caught instanceof ConnectException)) {
Throwable cause = caught.getCause();
Assert.assertNotNull("No exception was raised", cause);
if (!(cause instanceof ConnectException)) {
throw caught;
}
}
@ -69,11 +70,11 @@ public class TestRPCWaitForProxy extends Assert {
RpcThread worker = new RpcThread(100);
worker.start();
Thread.sleep(1000);
assertTrue("worker hasn't started", worker.waitStarted);
Assert.assertTrue("worker hasn't started", worker.waitStarted);
worker.interrupt();
worker.join();
Throwable caught = worker.getCaught();
assertNotNull("No exception was raised", caught);
Assert.assertNotNull("No exception was raised", caught);
// looking for the root cause here, which can be wrapped
// as part of the NetUtils work. Having this test look
// a the type of exception there would be brittle to improvements
@ -82,6 +83,8 @@ public class TestRPCWaitForProxy extends Assert {
if (cause == null) {
// no inner cause, use outer exception as root cause.
cause = caught;
} else if (cause.getCause() != null) {
cause = cause.getCause();
}
if (!(cause instanceof InterruptedIOException)
&& !(cause instanceof ClosedByInterruptException)) {
@ -112,12 +115,16 @@ public class TestRPCWaitForProxy extends Assert {
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
connectRetries);
waitStarted = true;
TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
TestProtocol.versionID,
new InetSocketAddress(ADDRESS, 20),
config,
15000L);
proxy.echo("");
short invalidPort = 20;
InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS,
invalidPort);
TestRpcBase.TestRpcService proxy = RPC.getProxy(
TestRpcBase.TestRpcService.class,
1L, invalidAddress, conf);
// Test echo method
proxy.echo(null, newEchoRequest("hello"));
} catch (Throwable throwable) {
caught = throwable;
}

View File

@ -112,7 +112,8 @@ public class TestRpcBase {
return setupTestServer(builder);
}
protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException {
protected static RPC.Server setupTestServer(
RPC.Builder builder) throws IOException {
RPC.Server server = builder.build();
server.start();
@ -175,17 +176,21 @@ public class TestRpcBase {
public TestTokenIdentifier() {
this(new Text(), new Text());
}
public TestTokenIdentifier(Text tokenid) {
this(tokenid, new Text());
}
public TestTokenIdentifier(Text tokenid, Text realUser) {
this.tokenid = tokenid == null ? new Text() : tokenid;
this.realUser = realUser == null ? new Text() : realUser;
}
@Override
public Text getKind() {
return KIND_NAME;
}
@Override
public UserGroupInformation getUser() {
if (realUser.toString().isEmpty()) {
@ -203,6 +208,7 @@ public class TestRpcBase {
tokenid.readFields(in);
realUser.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
tokenid.write(out);
@ -234,7 +240,7 @@ public class TestRpcBase {
@SuppressWarnings("unchecked")
@Override
public Token<TestTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) {
Collection<Token<? extends TokenIdentifier>> tokens) {
if (service == null) {
return null;
}
@ -388,19 +394,17 @@ public class TestRpcBase {
}
@Override
public TestProtos.AuthUserResponseProto getAuthUser(
public TestProtos.UserResponseProto getAuthUser(
RpcController controller, TestProtos.EmptyRequestProto request)
throws ServiceException {
UserGroupInformation authUser = null;
UserGroupInformation authUser;
try {
authUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new ServiceException(e);
}
return TestProtos.AuthUserResponseProto.newBuilder()
.setAuthUser(authUser.getUserName())
.build();
return newUserResponse(authUser.getUserName());
}
@Override
@ -432,6 +436,34 @@ public class TestRpcBase {
return TestProtos.EmptyResponseProto.newBuilder().build();
}
@Override
public TestProtos.UserResponseProto getCurrentUser(
RpcController controller,
TestProtos.EmptyRequestProto request) throws ServiceException {
String user;
try {
user = UserGroupInformation.getCurrentUser().toString();
} catch (IOException e) {
throw new ServiceException("Failed to get current user", e);
}
return newUserResponse(user);
}
@Override
public TestProtos.UserResponseProto getServerRemoteUser(
RpcController controller,
TestProtos.EmptyRequestProto request) throws ServiceException {
String serverRemoteUser = Server.getRemoteUser().toString();
return newUserResponse(serverRemoteUser);
}
private TestProtos.UserResponseProto newUserResponse(String user) {
return TestProtos.UserResponseProto.newBuilder()
.setUser(user)
.build();
}
}
protected static TestProtos.EmptyRequestProto newEmptyRequest() {
@ -478,8 +510,4 @@ public class TestRpcBase {
}
return null;
}
protected static String convert(TestProtos.AuthUserResponseProto response) {
return response.getAuthUser();
}
}

View File

@ -45,30 +45,55 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import javax.security.auth.callback.*;
import javax.security.sasl.*;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.security.Security;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*;
import static org.junit.Assert.*;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Unit tests for using Sasl over RPC. */
@RunWith(Parameterized.class)
public class TestSaslRPC extends TestRpcBase {
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
Collection<Object[]> params = new ArrayList<>();
for (QualityOfProtection qop : QualityOfProtection.values()) {
params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null });
}
@ -114,7 +139,7 @@ public class TestSaslRPC extends TestRpcBase {
NONE(),
VALID(),
INVALID(),
OTHER();
OTHER()
}
@BeforeClass
@ -230,7 +255,7 @@ public class TestSaslRPC extends TestRpcBase {
final Server server = setupTestServer(conf, 5, sm);
doDigestRpc(server, sm);
} finally {
SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
SecurityUtil.setSecurityInfoProviders();
}
}
@ -259,7 +284,7 @@ public class TestSaslRPC extends TestRpcBase {
addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()));
Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId, sm);
Token<TestTokenIdentifier> token = new Token<>(tokenId, sm);
SecurityUtil.setTokenService(token, addr);
current.addToken(token);
@ -296,8 +321,8 @@ public class TestSaslRPC extends TestRpcBase {
// set doPing to true
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0),
TestRpcService.class, null, 0, null, newConf);
ConnectionId remoteId = ConnectionId.getConnectionId(
new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf);
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
remoteId.getPingInterval());
// set doPing to false
@ -806,13 +831,13 @@ public class TestSaslRPC extends TestRpcBase {
final TestTokenSecretManager sm = new TestTokenSecretManager();
boolean useSecretManager = (serverAuth != SIMPLE);
if (enableSecretManager != null) {
useSecretManager &= enableSecretManager.booleanValue();
useSecretManager &= enableSecretManager;
}
if (forceSecretManager != null) {
useSecretManager |= forceSecretManager.booleanValue();
useSecretManager |= forceSecretManager;
}
final SecretManager<?> serverSm = useSecretManager ? sm : null;
Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
@Override
public Server run() throws IOException {
@ -867,13 +892,13 @@ public class TestSaslRPC extends TestRpcBase {
proxy.ping(null, newEmptyRequest());
// make sure the other side thinks we are who we said we are!!!
assertEquals(clientUgi.getUserName(),
convert(proxy.getAuthUser(null, newEmptyRequest())));
proxy.getAuthUser(null, newEmptyRequest()).getUser());
AuthMethod authMethod =
convert(proxy.getAuthMethod(null, newEmptyRequest()));
// verify sasl completed with correct QOP
assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
RPC.getConnectionIdForProxy(proxy).getSaslQop());
return authMethod.toString();
RPC.getConnectionIdForProxy(proxy).getSaslQop());
return authMethod != null ? authMethod.toString() : null;
} catch (ServiceException se) {
if (se.getCause() instanceof RemoteException) {
throw (RemoteException) se.getCause();
@ -898,21 +923,18 @@ public class TestSaslRPC extends TestRpcBase {
String actual) {
assertEquals(expect.toString(), actual);
}
private static void assertAuthEquals(Pattern expect,
String actual) {
private static void assertAuthEquals(Pattern expect, String actual) {
// this allows us to see the regexp and the value it didn't match
if (!expect.matcher(actual).matches()) {
assertEquals(expect, actual); // it failed
} else {
assertTrue(true); // it matched
fail(); // it failed
}
}
/*
* Class used to test overriding QOP values using SaslPropertiesResolver
*/
static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{
static class AuthSaslPropertiesResolver extends SaslPropertiesResolver {
@Override
public Map<String, String> getServerProperties(InetAddress address) {
@ -921,7 +943,7 @@ public class TestSaslRPC extends TestRpcBase {
return newPropertes;
}
}
public static void main(String[] args) throws Exception {
System.out.println("Testing Kerberos authentication over RPC");
if (args.length != 2) {

View File

@ -17,40 +17,35 @@
*/
package org.apache.hadoop.security;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRpcBase;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Enumeration;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
/**
*
* Test do as effective user.
*/
public class TestDoAsEffectiveUser {
public class TestDoAsEffectiveUser extends TestRpcBase {
final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG";
final private static String REAL_USER_SHORT_NAME = "realUser1";
final private static String PROXY_USER_NAME = "proxyUser";
@ -58,8 +53,8 @@ public class TestDoAsEffectiveUser {
final private static String GROUP2_NAME = "group2";
final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME,
GROUP2_NAME };
private static final String ADDRESS = "0.0.0.0";
private TestProtocol proxy;
private TestRpcService client;
private static final Configuration masterConf = new Configuration();
@ -82,7 +77,7 @@ public class TestDoAsEffectiveUser {
private void configureSuperUserIPAddresses(Configuration conf,
String superUserShortName) throws IOException {
ArrayList<String> ipList = new ArrayList<String>();
ArrayList<String> ipList = new ArrayList<>();
Enumeration<NetworkInterface> netInterfaceList = NetworkInterface
.getNetworkInterfaces();
while (netInterfaceList.hasMoreElements()) {
@ -130,50 +125,19 @@ public class TestDoAsEffectiveUser {
curUGI.toString());
}
@TokenInfo(TestTokenSelector.class)
public interface TestProtocol extends VersionedProtocol {
public static final long versionID = 1L;
String aMethod() throws IOException;
String getServerRemoteUser() throws IOException;
}
public class TestImpl implements TestProtocol {
@Override
public String aMethod() throws IOException {
return UserGroupInformation.getCurrentUser().toString();
}
@Override
public String getServerRemoteUser() throws IOException {
return Server.getRemoteUser().toString();
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return TestProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(TestProtocol.versionID, null);
}
}
private void checkRemoteUgi(final Server server,
final UserGroupInformation ugi, final Configuration conf)
throws Exception {
private void checkRemoteUgi(final UserGroupInformation ugi,
final Configuration conf) throws Exception {
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
proxy = RPC.getProxy(
TestProtocol.class, TestProtocol.versionID,
NetUtils.getConnectAddress(server), conf);
Assert.assertEquals(ugi.toString(), proxy.aMethod());
Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser());
public Void run() throws ServiceException {
client = getClient(addr, conf);
String currentUser = client.getCurrentUser(null,
newEmptyRequest()).getUser();
String serverRemoteUser = client.getServerRemoteUser(null,
newEmptyRequest()).getUser();
Assert.assertEquals(ugi.toString(), currentUser);
Assert.assertEquals(ugi.toString(), serverRemoteUser);
return null;
}
});
@ -185,29 +149,27 @@ public class TestDoAsEffectiveUser {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5);
refreshConf(conf);
try {
server.start();
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
checkRemoteUgi(server, realUserUgi, conf);
checkRemoteUgi(realUserUgi, conf);
UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
UserGroupInformation proxyUserUgi =
UserGroupInformation.createProxyUserForTesting(
PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
checkRemoteUgi(server, proxyUserUgi, conf);
checkRemoteUgi(proxyUserUgi, conf);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
@ -218,29 +180,25 @@ public class TestDoAsEffectiveUser {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5);
refreshConf(conf);
try {
server.start();
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
checkRemoteUgi(server, realUserUgi, conf);
checkRemoteUgi(realUserUgi, conf);
UserGroupInformation proxyUserUgi = UserGroupInformation
.createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
checkRemoteUgi(server, proxyUserUgi, conf);
checkRemoteUgi(proxyUserUgi, conf);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
@ -256,17 +214,14 @@ public class TestDoAsEffectiveUser {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5);
refreshConf(conf);
try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@ -275,11 +230,10 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws IOException {
proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
public String run() throws ServiceException {
client = getClient(addr, conf);
return client.getCurrentUser(null,
newEmptyRequest()).getUser();
}
});
@ -287,10 +241,7 @@ public class TestDoAsEffectiveUser {
} catch (Exception e) {
e.printStackTrace();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
@ -299,17 +250,14 @@ public class TestDoAsEffectiveUser {
final Configuration conf = new Configuration();
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 2);
refreshConf(conf);
try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@ -318,11 +266,10 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws IOException {
proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
public String run() throws ServiceException {
client = getClient(addr, conf);
return client.getCurrentUser(null,
newEmptyRequest()).getUser();
}
});
@ -330,10 +277,7 @@ public class TestDoAsEffectiveUser {
} catch (Exception e) {
e.printStackTrace();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
@ -341,15 +285,12 @@ public class TestDoAsEffectiveUser {
public void testRealUserGroupNotSpecified() throws IOException {
final Configuration conf = new Configuration();
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 2);
try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@ -358,11 +299,10 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws IOException {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
public String run() throws ServiceException {
client = getClient(addr, conf);
return client.getCurrentUser(null,
newEmptyRequest()).getUser();
}
});
@ -370,10 +310,7 @@ public class TestDoAsEffectiveUser {
} catch (Exception e) {
e.printStackTrace();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
@ -384,17 +321,14 @@ public class TestDoAsEffectiveUser {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group3");
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 2);
refreshConf(conf);
try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@ -403,11 +337,10 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws IOException {
proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
public String run() throws ServiceException {
client = getClient(addr, conf);
return client.getCurrentUser(null,
newEmptyRequest()).getUser();
}
});
@ -415,10 +348,7 @@ public class TestDoAsEffectiveUser {
} catch (Exception e) {
e.printStackTrace();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
@ -432,20 +362,17 @@ public class TestDoAsEffectiveUser {
final Configuration conf = new Configuration(masterConf);
TestTokenSecretManager sm = new TestTokenSecretManager();
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
server.start();
final Server server = setupTestServer(conf, 5, sm);
final UserGroupInformation current = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()), new Text("SomeSuperUser"));
Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
Token<TestTokenIdentifier> token = new Token<>(tokenId,
sm);
SecurityUtil.setTokenService(token, addr);
UserGroupInformation proxyUserUgi = UserGroupInformation
@ -453,23 +380,19 @@ public class TestDoAsEffectiveUser {
proxyUserUgi.addToken(token);
refreshConf(conf);
String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try {
proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
client = getClient(addr, conf);
return client.getCurrentUser(null,
newEmptyRequest()).getUser();
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
});
@ -486,42 +409,34 @@ public class TestDoAsEffectiveUser {
TestTokenSecretManager sm = new TestTokenSecretManager();
final Configuration newConf = new Configuration(masterConf);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(newConf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(newConf);
final Server server = new RPC.Builder(newConf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
server.start();
final Server server = setupTestServer(newConf, 5, sm);
final UserGroupInformation current = UserGroupInformation
.createUserForTesting(REAL_USER_NAME, GROUP_NAMES);
refreshConf(newConf);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()), new Text("SomeSuperUser"));
Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
sm);
Token<TestTokenIdentifier> token = new Token<>(tokenId, sm);
SecurityUtil.setTokenService(token, addr);
current.addToken(token);
String retVal = current.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try {
proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, newConf);
String ret = proxy.aMethod();
return ret;
client = getClient(addr, newConf);
return client.getCurrentUser(null,
newEmptyRequest()).getUser();
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
stop(server, client);
}
}
});

View File

@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -28,7 +29,11 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
@ -49,9 +54,22 @@ import java.util.Set;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
import static org.apache.hadoop.ipc.TestSaslRPC.*;
import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.junit.Assert.*;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -108,7 +126,7 @@ public class TestUserGroupInformation {
UserGroupInformation.setLoginUser(null);
}
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testSimpleLogin() throws IOException {
tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true);
}

View File

@ -88,6 +88,6 @@ message AuthMethodResponseProto {
required string mechanismName = 2;
}
message AuthUserResponseProto {
required string authUser = 1;
message UserResponseProto {
required string user = 1;
}

View File

@ -40,9 +40,11 @@ service TestProtobufRpcProto {
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto);
rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto);
rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto);
rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto);
}
service TestProtobufRpc2Proto {

View File

@ -168,7 +168,6 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.Node;
@ -317,8 +316,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
WritableRpcEngine.ensureInitialized();
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) {

View File

@ -1,119 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.security;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.mockito.Mockito.mock;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Test;
/** Unit tests for using Delegation Token over RPC. */
public class TestClientProtocolWithDelegationToken {
private static final String ADDRESS = "0.0.0.0";
public static final Log LOG = LogFactory
.getLog(TestClientProtocolWithDelegationToken.class);
private static final Configuration conf;
static {
conf = new Configuration();
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
}
static {
GenericTestUtils.setLogLevel(Client.LOG, Level.ALL);
GenericTestUtils.setLogLevel(Server.LOG, Level.ALL);
GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL);
GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL);
GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL);
}
@Test
public void testDelegationTokenRpc() throws Exception {
ClientProtocol mockNN = mock(ClientProtocol.class);
FSNamesystem mockNameSys = mock(FSNamesystem.class);
DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
3600000, mockNameSys);
sm.startThreads();
final Server server = new RPC.Builder(conf)
.setProtocol(ClientProtocol.class).setInstance(mockNN)
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
server.start();
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
String user = current.getUserName();
Text owner = new Text(user);
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null);
Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
dtId, sm);
SecurityUtil.setTokenService(token, addr);
LOG.info("Service for token is " + token.getService());
current.addToken(token);
current.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
ClientProtocol proxy = null;
try {
proxy = RPC.getProxy(ClientProtocol.class,
ClientProtocol.versionID, addr, conf);
proxy.getServerDefaults();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
}
return null;
}
});
}
}

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
@ -98,8 +97,6 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService
.newReflectiveBlockingService(refreshHSAdminProtocolXlator);
WritableRpcEngine.ensureInitialized();
clientRpcAddress = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.JHS_ADMIN_ADDRESS,