From 93d8a7f2a2d72a1719d02b1ed90678397900b6ed Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Wed, 1 Jun 2016 08:41:15 +0800 Subject: [PATCH] Revert "HADOOP-12579. Deprecate and remove WriteableRPCEngine. Contributed by Kai Zheng" This reverts commit a6c79f92d503c664f2d109355b719124f29a30e5. --- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 5 +- .../main/java/org/apache/hadoop/ipc/RPC.java | 21 +- .../java/org/apache/hadoop/ipc/Server.java | 4 +- .../apache/hadoop/ipc/WritableRpcEngine.java | 564 ++++++++++++++++++ .../hadoop/security/UserGroupInformation.java | 4 +- .../org/apache/hadoop/util/ProtoUtil.java | 2 + .../src/main/proto/RpcHeader.proto | 2 +- .../apache/hadoop/ipc/RPCCallBenchmark.java | 38 +- .../ipc/TestMultipleProtocolServer.java | 238 +++++++- .../hadoop/ipc/TestRPCCallBenchmark.java | 13 + .../hadoop/ipc/TestRPCCompatibility.java | 244 +++++++- .../hadoop/ipc/TestRPCWaitForProxy.java | 37 +- .../org/apache/hadoop/ipc/TestRpcBase.java | 50 +- .../org/apache/hadoop/ipc/TestSaslRPC.java | 91 +-- .../security/TestDoAsEffectiveUser.java | 299 ++++++---- .../security/TestUserGroupInformation.java | 28 +- .../hadoop-common/src/test/proto/test.proto | 4 +- .../src/test/proto/test_rpc_service.proto | 4 +- .../server/namenode/NameNodeRpcServer.java | 3 + ...TestClientProtocolWithDelegationToken.java | 119 ++++ .../mapreduce/v2/hs/server/HSAdminServer.java | 3 + 21 files changed, 1454 insertions(+), 319 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 7c11e22e577..0f43fc6d3d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -67,7 +67,7 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ThreadLocal> ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); - static { // Register the rpcRequest deserializer for ProtobufRpcEngine + static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, new Server.ProtoBufRpcInvoker()); @@ -201,8 +201,7 @@ public class ProtobufRpcEngine implements RpcEngine { } if (args.length != 2) { // RpcController + Message - throw new ServiceException( - "Too many or few parameters for request. Method: [" + throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + args.length); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index a544f2fc667..3f68d6334c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ipc; -import java.io.IOException; -import java.io.InterruptedIOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; @@ -28,6 +26,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; +import java.io.*; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; @@ -38,12 +37,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.*; + import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.*; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; @@ -56,6 +54,7 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.*; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; @@ -85,10 +84,10 @@ public class RPC { final static int RPC_SERVICE_CLASS_DEFAULT = 0; public enum RpcKind { RPC_BUILTIN ((short) 1), // Used for built in calls by tests - // 2 for WritableRpcEngine, obsolete and removed + RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size - private final short value; + public final short value; //TODO make it private RpcKind(short val) { this.value = val; @@ -208,7 +207,7 @@ public class RPC { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), - ProtobufRpcEngine.class); + WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); PROTOCOL_ENGINES.put(protocol, engine); } @@ -950,10 +949,10 @@ public class RPC { return new VerProtocolImpl(highestVersion, highest); } - protected Server(String bindAddress, int port, + protected Server(String bindAddress, int port, Class paramClass, int handlerCount, int numReaders, int queueSizePerHandler, - Configuration conf, String serverName, + Configuration conf, String serverName, SecretManager secretManager, String portRangeConfig) throws IOException { super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index be46e765b13..88c1f3c8267 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -243,14 +243,14 @@ public abstract class Server { static class RpcKindMapValue { final Class rpcRequestWrapperClass; final RpcInvoker rpcInvoker; - RpcKindMapValue (Class rpcRequestWrapperClass, RpcInvoker rpcInvoker) { this.rpcInvoker = rpcInvoker; this.rpcRequestWrapperClass = rpcRequestWrapperClass; } } - static Map rpcKindMap = new HashMap<>(4); + static Map rpcKindMap = new + HashMap(4); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java new file mode 100644 index 00000000000..a9dbb41fd98 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -0,0 +1,564 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.lang.reflect.Proxy; +import java.lang.reflect.Method; +import java.lang.reflect.InvocationTargetException; + +import java.net.InetSocketAddress; +import java.io.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RPC.RpcInvoker; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.*; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; + +/** An RpcEngine implementation for Writable data. */ +@InterfaceStability.Evolving +public class WritableRpcEngine implements RpcEngine { + private static final Log LOG = LogFactory.getLog(RPC.class); + + //writableRpcVersion should be updated if there is a change + //in format of the rpc messages. + + // 2L - added declared class to Invocation + public static final long writableRpcVersion = 2L; + + /** + * Whether or not this class has been initialized. + */ + private static boolean isInitialized = false; + + static { + ensureInitialized(); + } + + /** + * Initialize this class if it isn't already. + */ + public static synchronized void ensureInitialized() { + if (!isInitialized) { + initialize(); + } + } + + /** + * Register the rpcRequest deserializer for WritableRpcEngine + */ + private static synchronized void initialize() { + org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE, + Invocation.class, new Server.WritableRpcInvoker()); + isInitialized = true; + } + + + /** A method invocation, including the method name and its parameters.*/ + private static class Invocation implements Writable, Configurable { + private String methodName; + private Class[] parameterClasses; + private Object[] parameters; + private Configuration conf; + private long clientVersion; + private int clientMethodsHash; + private String declaringClassProtocolName; + + //This could be different from static writableRpcVersion when received + //at server, if client is using a different version. + private long rpcVersion; + + @SuppressWarnings("unused") // called when deserializing an invocation + public Invocation() {} + + public Invocation(Method method, Object[] parameters) { + this.methodName = method.getName(); + this.parameterClasses = method.getParameterTypes(); + this.parameters = parameters; + rpcVersion = writableRpcVersion; + if (method.getDeclaringClass().equals(VersionedProtocol.class)) { + //VersionedProtocol is exempted from version check. + clientVersion = 0; + clientMethodsHash = 0; + } else { + this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass()); + this.clientMethodsHash = ProtocolSignature.getFingerprint(method + .getDeclaringClass().getMethods()); + } + this.declaringClassProtocolName = + RPC.getProtocolName(method.getDeclaringClass()); + } + + /** The name of the method invoked. */ + public String getMethodName() { return methodName; } + + /** The parameter classes. */ + public Class[] getParameterClasses() { return parameterClasses; } + + /** The parameter instances. */ + public Object[] getParameters() { return parameters; } + + private long getProtocolVersion() { + return clientVersion; + } + + @SuppressWarnings("unused") + private int getClientMethodsHash() { + return clientMethodsHash; + } + + /** + * Returns the rpc version used by the client. + * @return rpcVersion + */ + public long getRpcVersion() { + return rpcVersion; + } + + @Override + @SuppressWarnings("deprecation") + public void readFields(DataInput in) throws IOException { + rpcVersion = in.readLong(); + declaringClassProtocolName = UTF8.readString(in); + methodName = UTF8.readString(in); + clientVersion = in.readLong(); + clientMethodsHash = in.readInt(); + parameters = new Object[in.readInt()]; + parameterClasses = new Class[parameters.length]; + ObjectWritable objectWritable = new ObjectWritable(); + for (int i = 0; i < parameters.length; i++) { + parameters[i] = + ObjectWritable.readObject(in, objectWritable, this.conf); + parameterClasses[i] = objectWritable.getDeclaredClass(); + } + } + + @Override + @SuppressWarnings("deprecation") + public void write(DataOutput out) throws IOException { + out.writeLong(rpcVersion); + UTF8.writeString(out, declaringClassProtocolName); + UTF8.writeString(out, methodName); + out.writeLong(clientVersion); + out.writeInt(clientMethodsHash); + out.writeInt(parameterClasses.length); + for (int i = 0; i < parameterClasses.length; i++) { + ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], + conf, true); + } + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append(methodName); + buffer.append("("); + for (int i = 0; i < parameters.length; i++) { + if (i != 0) + buffer.append(", "); + buffer.append(parameters[i]); + } + buffer.append(")"); + buffer.append(", rpc version="+rpcVersion); + buffer.append(", client version="+clientVersion); + buffer.append(", methodsFingerPrint="+clientMethodsHash); + return buffer.toString(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + } + + private static ClientCache CLIENTS=new ClientCache(); + + private static class Invoker implements RpcInvocationHandler { + private Client.ConnectionId remoteId; + private Client client; + private boolean isClosed = false; + private final AtomicBoolean fallbackToSimpleAuth; + + public Invoker(Class protocol, + InetSocketAddress address, UserGroupInformation ticket, + Configuration conf, SocketFactory factory, + int rpcTimeout, AtomicBoolean fallbackToSimpleAuth) + throws IOException { + this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, + ticket, rpcTimeout, null, conf); + this.client = CLIENTS.getClient(conf, factory); + this.fallbackToSimpleAuth = fallbackToSimpleAuth; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + long startTime = 0; + if (LOG.isDebugEnabled()) { + startTime = Time.now(); + } + + // if Tracing is on then start a new span for this rpc. + // guard it in the if statement to make sure there isn't + // any extra string manipulation. + Tracer tracer = Tracer.curThreadTracer(); + TraceScope traceScope = null; + if (tracer != null) { + traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method)); + } + ObjectWritable value; + try { + value = (ObjectWritable) + client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), + remoteId, fallbackToSimpleAuth); + } finally { + if (traceScope != null) traceScope.close(); + } + if (LOG.isDebugEnabled()) { + long callTime = Time.now() - startTime; + LOG.debug("Call: " + method.getName() + " " + callTime); + } + return value.get(); + } + + /* close the IPC client that's responsible for this invoker's RPCs */ + @Override + synchronized public void close() { + if (!isClosed) { + isClosed = true; + CLIENTS.stopClient(client); + } + } + + @Override + public ConnectionId getConnectionId() { + return remoteId; + } + } + + // for unit testing only + @InterfaceAudience.Private + @InterfaceStability.Unstable + static Client getClient(Configuration conf) { + return CLIENTS.getClient(conf); + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. + * @param */ + @Override + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, + Configuration conf, SocketFactory factory, + int rpcTimeout, RetryPolicy connectionRetryPolicy) + throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy, null); + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. + * @param */ + @Override + @SuppressWarnings("unchecked") + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, + Configuration conf, SocketFactory factory, + int rpcTimeout, RetryPolicy connectionRetryPolicy, + AtomicBoolean fallbackToSimpleAuth) + throws IOException { + + if (connectionRetryPolicy != null) { + throw new UnsupportedOperationException( + "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); + } + + T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, + factory, rpcTimeout, fallbackToSimpleAuth)); + return new ProtocolProxy(protocol, proxy, true); + } + + /* Construct a server for a protocol implementation instance listening on a + * port and address. */ + @Override + public RPC.Server getServer(Class protocolClass, + Object protocolImpl, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, + SecretManager secretManager, + String portRangeConfig) + throws IOException { + return new Server(protocolClass, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, + portRangeConfig); + } + + + /** An RPC Server. */ + public static class Server extends RPC.Server { + /** + * Construct an RPC server. + * @param instance the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * + * @deprecated Use #Server(Class, Object, Configuration, String, int) + */ + @Deprecated + public Server(Object instance, Configuration conf, String bindAddress, + int port) throws IOException { + this(null, instance, conf, bindAddress, port); + } + + + /** Construct an RPC server. + * @param protocolClass class + * @param protocolImpl the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + */ + public Server(Class protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port) + throws IOException { + this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1, + false, null, null); + } + + /** + * Construct an RPC server. + * @param protocolImpl the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + * + * @deprecated use Server#Server(Class, Object, + * Configuration, String, int, int, int, int, boolean, SecretManager) + */ + @Deprecated + public Server(Object protocolImpl, Configuration conf, String bindAddress, + int port, int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, SecretManager secretManager) + throws IOException { + this(null, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, + secretManager, null); + + } + + /** + * Construct an RPC server. + * @param protocolClass - the protocol being registered + * can be null for compatibility with old usage (see below for details) + * @param protocolImpl the protocol impl that will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + */ + public Server(Class protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, SecretManager secretManager, + String portRangeConfig) + throws IOException { + super(bindAddress, port, null, numHandlers, numReaders, + queueSizePerHandler, conf, + classNameBase(protocolImpl.getClass().getName()), secretManager, + portRangeConfig); + + this.verbose = verbose; + + + Class[] protocols; + if (protocolClass == null) { // derive protocol from impl + /* + * In order to remain compatible with the old usage where a single + * target protocolImpl is suppled for all protocol interfaces, and + * the protocolImpl is derived from the protocolClass(es) + * we register all interfaces extended by the protocolImpl + */ + protocols = RPC.getProtocolInterfaces(protocolImpl.getClass()); + + } else { + if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) { + throw new IOException("protocolClass "+ protocolClass + + " is not implemented by protocolImpl which is of class " + + protocolImpl.getClass()); + } + // register protocol class and its super interfaces + registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); + protocols = RPC.getProtocolInterfaces(protocolClass); + } + for (Class p : protocols) { + if (!p.equals(VersionedProtocol.class)) { + registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl); + } + } + + } + + private static void log(String value) { + if (value!= null && value.length() > 55) + value = value.substring(0, 55)+"..."; + LOG.info(value); + } + + static class WritableRpcInvoker implements RpcInvoker { + + @Override + public Writable call(org.apache.hadoop.ipc.RPC.Server server, + String protocolName, Writable rpcRequest, long receivedTime) + throws IOException, RPC.VersionMismatch { + + Invocation call = (Invocation)rpcRequest; + if (server.verbose) log("Call: " + call); + + // Verify writable rpc version + if (call.getRpcVersion() != writableRpcVersion) { + // Client is using a different version of WritableRpc + throw new RpcServerException( + "WritableRpc version mismatch, client side version=" + + call.getRpcVersion() + ", server side version=" + + writableRpcVersion); + } + + long clientVersion = call.getProtocolVersion(); + final String protoName; + ProtoClassProtoImpl protocolImpl; + if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) { + // VersionProtocol methods are often used by client to figure out + // which version of protocol to use. + // + // Versioned protocol methods should go the protocolName protocol + // rather than the declaring class of the method since the + // the declaring class is VersionedProtocol which is not + // registered directly. + // Send the call to the highest protocol version + VerProtocolImpl highest = server.getHighestSupportedProtocol( + RPC.RpcKind.RPC_WRITABLE, protocolName); + if (highest == null) { + throw new RpcServerException("Unknown protocol: " + protocolName); + } + protocolImpl = highest.protocolTarget; + } else { + protoName = call.declaringClassProtocolName; + + // Find the right impl for the protocol based on client version. + ProtoNameVer pv = + new ProtoNameVer(call.declaringClassProtocolName, clientVersion); + protocolImpl = + server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv); + if (protocolImpl == null) { // no match for Protocol AND Version + VerProtocolImpl highest = + server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, + protoName); + if (highest == null) { + throw new RpcServerException("Unknown protocol: " + protoName); + } else { // protocol supported but not the version that client wants + throw new RPC.VersionMismatch(protoName, clientVersion, + highest.version); + } + } + } + + // Invoke the protocol method + long startTime = Time.now(); + int qTime = (int) (startTime-receivedTime); + Exception exception = null; + try { + Method method = + protocolImpl.protocolClass.getMethod(call.getMethodName(), + call.getParameterClasses()); + method.setAccessible(true); + server.rpcDetailedMetrics.init(protocolImpl.protocolClass); + Object value = + method.invoke(protocolImpl.protocolImpl, call.getParameters()); + if (server.verbose) log("Return: "+value); + return new ObjectWritable(method.getReturnType(), value); + + } catch (InvocationTargetException e) { + Throwable target = e.getTargetException(); + if (target instanceof IOException) { + exception = (IOException)target; + throw (IOException)target; + } else { + IOException ioe = new IOException(target.toString()); + ioe.setStackTrace(target.getStackTrace()); + exception = ioe; + throw ioe; + } + } catch (Throwable e) { + if (!(e instanceof IOException)) { + LOG.error("Unexpected throwable object ", e); + } + IOException ioe = new IOException(e.toString()); + ioe.setStackTrace(e.getStackTrace()); + exception = ioe; + throw ioe; + } finally { + int processingTime = (int) (Time.now() - startTime); + if (LOG.isDebugEnabled()) { + String msg = "Served: " + call.getMethodName() + + " queueTime= " + qTime + " procesingTime= " + processingTime; + if (exception != null) { + msg += " exception= " + exception.getClass().getSimpleName(); + } + LOG.debug(msg); + } + String detailedMetricsName = (exception == null) ? + call.getMethodName() : + exception.getClass().getSimpleName(); + server.updateMetrics(detailedMetricsName, qTime, processingTime); + } + } + } + } + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index aa334f3481d..798aa01f29d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -689,7 +689,7 @@ public class UserGroupInformation { * * @param user The principal name to load from the ticket * cache - * @param ticketCache the path to the ticket cache file + * @param ticketCachePath the path to the ticket cache file * * @throws IOException if the kerberos login fails */ @@ -749,7 +749,7 @@ public class UserGroupInformation { /** * Create a UserGroupInformation from a Subject with Kerberos principal. * - * @param subject The KerberosPrincipal to use in UGI + * @param user The KerberosPrincipal to use in UGI * * @throws IOException if the kerberos login fails */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 04e14e8bd4f..1a5acbab6ec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -146,6 +146,7 @@ public abstract class ProtoUtil { static RpcKindProto convert(RPC.RpcKind kind) { switch (kind) { case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN; + case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE; case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER; } return null; @@ -155,6 +156,7 @@ public abstract class ProtoUtil { public static RPC.RpcKind convert( RpcKindProto kind) { switch (kind) { case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN; + case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE; case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER; } return null; diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index f1a36aef74e..aa146162896 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -44,10 +44,10 @@ package hadoop.common; /** * RpcKind determine the rpcEngine and the serialization of the rpc request - * Note: 1 for RPC_WRITABLE, WritableRpcEngine, obsolete and removed */ enum RpcKindProto { RPC_BUILTIN = 0; // Used for built in calls by tests + RPC_WRITABLE = 1; // Use WritableRpcEngine RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java index 9356dabe2f7..eb7b9497092 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.ipc; -import com.google.common.base.Joiner; -import com.google.protobuf.BlockingService; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -29,6 +34,7 @@ import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; @@ -39,12 +45,8 @@ import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Joiner; +import com.google.protobuf.BlockingService; /** * Benchmark for protobuf RPC. @@ -66,7 +68,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool { public int secondsToRun = 15; private int msgSize = 1024; public Class rpcEngine = - ProtobufRpcEngine.class; + WritableRpcEngine.class; private MyOptions(String args[]) { try { @@ -133,7 +135,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool { opts.addOption( OptionBuilder.withLongOpt("engine").hasArg(true) - .withArgName("protobuf") + .withArgName("writable|protobuf") .withDescription("engine to use") .create('e')); @@ -182,6 +184,8 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool { String eng = line.getOptionValue('e'); if ("protobuf".equals(eng)) { rpcEngine = ProtobufRpcEngine.class; + } else if ("writable".equals(eng)) { + rpcEngine = WritableRpcEngine.class; } else { throw new ParseException("invalid engine: " + eng); } @@ -233,6 +237,11 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool { server = new RPC.Builder(conf).setProtocol(TestRpcService.class) .setInstance(service).setBindAddress(opts.host).setPort(opts.getPort()) .setNumHandlers(opts.serverThreads).setVerbose(false).build(); + } else if (opts.rpcEngine == WritableRpcEngine.class) { + server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host) + .setPort(opts.getPort()).setNumHandlers(opts.serverThreads) + .setVerbose(false).build(); } else { throw new RuntimeException("Bad engine: " + opts.rpcEngine); } @@ -390,6 +399,15 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool { return responseProto.getMessage(); } }; + } else if (opts.rpcEngine == WritableRpcEngine.class) { + final TestProtocol proxy = RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, addr, conf); + return new RpcServiceWrapper() { + @Override + public String doEcho(String msg) throws Exception { + return proxy.echo(msg); + } + }; } else { throw new RuntimeException("unsupported engine: " + opts.rpcEngine); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java index 10e23baefef..8b419e36d4a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -17,28 +17,252 @@ */ package org.apache.hadoop.ipc; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.junit.Assert; + import org.apache.hadoop.conf.Configuration; -import org.junit.After; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.net.NetUtils; import org.junit.Before; +import org.junit.After; import org.junit.Test; +import com.google.protobuf.BlockingService; public class TestMultipleProtocolServer extends TestRpcBase { - + private static InetSocketAddress addr; private static RPC.Server server; - @Before - public void setUp() throws Exception { - super.setupConf(); - - server = setupTestServer(conf, 2); + private static Configuration conf = new Configuration(); + + + @ProtocolInfo(protocolName="Foo") + interface Foo0 extends VersionedProtocol { + public static final long versionID = 0L; + String ping() throws IOException; + + } + + @ProtocolInfo(protocolName="Foo") + interface Foo1 extends VersionedProtocol { + public static final long versionID = 1L; + String ping() throws IOException; + String ping2() throws IOException; + } + + @ProtocolInfo(protocolName="Foo") + interface FooUnimplemented extends VersionedProtocol { + public static final long versionID = 2L; + String ping() throws IOException; + } + + interface Mixin extends VersionedProtocol{ + public static final long versionID = 0L; + void hello() throws IOException; } + interface Bar extends Mixin { + public static final long versionID = 0L; + int echo(int i) throws IOException; + } + + class Foo0Impl implements Foo0 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo0.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo0"; + } + + } + + class Foo1Impl implements Foo1 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo1.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo1"; + } + + @Override + public String ping2() { + return "Foo1"; + + } + + } + + + class BarImpl implements Bar { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Bar.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public int echo(int i) { + return i; + } + + @Override + public void hello() { + + + } + } + @Before + public void setUp() throws Exception { + // create a server with two handlers + server = new RPC.Builder(conf).setProtocol(Foo0.class) + .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); + + + // Add Protobuf server + // Create server side implementation + PBServerImpl pbServerImpl = new PBServerImpl(); + BlockingService service = TestProtobufRpcProto + .newReflectiveBlockingService(pbServerImpl); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, + service); + server.start(); + addr = NetUtils.getConnectAddress(server); + } + @After public void tearDown() throws Exception { server.stop(); } + @Test + public void test1() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf); + Foo0 foo0 = (Foo0)proxy.getProxy(); + Assert.assertEquals("Foo0", foo0.ping()); + + + proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf); + + + Foo1 foo1 = (Foo1)proxy.getProxy(); + Assert.assertEquals("Foo1", foo1.ping()); + Assert.assertEquals("Foo1", foo1.ping()); + + + proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf); + + + Bar bar = (Bar)proxy.getProxy(); + Assert.assertEquals(99, bar.echo(99)); + + // Now test Mixin class method + + Mixin mixin = bar; + mixin.hello(); + } + + + // Server does not implement the FooUnimplemented version of protocol Foo. + // See that calls to it fail. + @Test(expected=IOException.class) + public void testNonExistingProtocol() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + foo.ping(); + } + + /** + * getProtocolVersion of an unimplemented version should return highest version + * Similarly getProtocolSignature should work. + * @throws IOException + */ + @Test + public void testNonExistingProtocol2() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + Assert.assertEquals(Foo1.versionID, + foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID)); + foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID, 0); + } + + @Test(expected=IOException.class) + public void testIncorrectServerCreation() throws IOException { + new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl()) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false) + .build(); + } + // Now test a PB service - a server hosts both PB and Writable Rpcs. @Test public void testPBService() throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java index 6d83d7d368c..969f728f77b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java @@ -25,6 +25,19 @@ import org.junit.Test; public class TestRPCCallBenchmark { + @Test(timeout=20000) + public void testBenchmarkWithWritable() throws Exception { + int rc = ToolRunner.run(new RPCCallBenchmark(), + new String[] { + "--clientThreads", "30", + "--serverThreads", "30", + "--time", "5", + "--serverReaderThreads", "4", + "--messageSize", "1024", + "--engine", "writable"}); + assertEquals(0, rc); + } + @Test(timeout=20000) public void testBenchmarkWithProto() throws Exception { int rc = ToolRunner.run(new RPCCallBenchmark(), diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index a06d9fdc01e..2ac2be990d5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -18,19 +18,27 @@ package org.apache.hadoop.ipc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import org.junit.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; +import org.apache.hadoop.net.NetUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; /** Unit test for supporting method-name based compatible RPCs. */ public class TestRPCCompatibility { @@ -41,7 +49,7 @@ public class TestRPCCompatibility { public static final Log LOG = LogFactory.getLog(TestRPCCompatibility.class); - + private static Configuration conf = new Configuration(); public interface TestProtocol0 extends VersionedProtocol { @@ -112,21 +120,6 @@ public class TestRPCCompatibility { @Before public void setUp() { ProtocolSignature.resetCache(); - - RPC.setProtocolEngine(conf, - TestProtocol0.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol1.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol2.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol3.class, ProtobufRpcEngine.class); - - RPC.setProtocolEngine(conf, - TestProtocol4.class, ProtobufRpcEngine.class); } @After @@ -140,7 +133,117 @@ public class TestRPCCompatibility { server = null; } } + + @Test // old client vs new server + public void testVersion0ClientVersion1Server() throws Exception { + // create a server with two handlers + TestImpl1 impl = new TestImpl1(); + server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + addr = NetUtils.getConnectAddress(server); + proxy = RPC.getProtocolProxy( + TestProtocol0.class, TestProtocol0.versionID, addr, conf); + + TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy(); + proxy0.ping(); + } + + @Test // old client vs new server + public void testVersion1ClientVersion0Server() throws Exception { + // create a server with two handlers + server = new RPC.Builder(conf).setProtocol(TestProtocol0.class) + .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.start(); + addr = NetUtils.getConnectAddress(server); + + proxy = RPC.getProtocolProxy( + TestProtocol1.class, TestProtocol1.versionID, addr, conf); + + TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy(); + proxy1.ping(); + try { + proxy1.echo("hello"); + fail("Echo should fail"); + } catch(IOException e) { + } + } + + private class Version2Client { + + private TestProtocol2 proxy2; + private ProtocolProxy serverInfo; + + private Version2Client() throws IOException { + serverInfo = RPC.getProtocolProxy( + TestProtocol2.class, TestProtocol2.versionID, addr, conf); + proxy2 = serverInfo.getProxy(); + } + + public int echo(int value) throws IOException, NumberFormatException { + if (serverInfo.isMethodSupported("echo", int.class)) { +System.out.println("echo int is supported"); + return -value; // use version 3 echo long + } else { // server is version 2 +System.out.println("echo int is NOT supported"); + return Integer.parseInt(proxy2.echo(String.valueOf(value))); + } + } + + public String echo(String value) throws IOException { + return proxy2.echo(value); + } + + public void ping() throws IOException { + proxy2.ping(); + } + } + + @Test // Compatible new client & old server + public void testVersion2ClientVersion1Server() throws Exception { + // create a server with two handlers + TestImpl1 impl = new TestImpl1(); + server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + addr = NetUtils.getConnectAddress(server); + + + Version2Client client = new Version2Client(); + client.ping(); + assertEquals("hello", client.echo("hello")); + + // echo(int) is not supported by server, so returning 3 + // This verifies that echo(int) and echo(String)'s hash codes are different + assertEquals(3, client.echo(3)); + } + + @Test // equal version client and server + public void testVersion2ClientVersion2Server() throws Exception { + // create a server with two handlers + TestImpl2 impl = new TestImpl2(); + server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + addr = NetUtils.getConnectAddress(server); + + Version2Client client = new Version2Client(); + + client.ping(); + assertEquals("hello", client.echo("hello")); + + // now that echo(int) is supported by the server, echo(int) should return -3 + assertEquals(-3, client.echo(3)); + } + public interface TestProtocol3 { int echo(String value); int echo(int value); @@ -194,4 +297,97 @@ public class TestRPCCompatibility { @Override int echo(int value) throws IOException; } + + @Test + public void testVersionMismatch() throws IOException { + server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) + .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class, + TestProtocol4.versionID, addr, conf); + try { + proxy.echo(21); + fail("The call must throw VersionMismatch exception"); + } catch (RemoteException ex) { + Assert.assertEquals(RPC.VersionMismatch.class.getName(), + ex.getClassName()); + Assert.assertTrue(ex.getErrorCode().equals( + RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH)); + } catch (IOException ex) { + fail("Expected version mismatch but got " + ex); + } + } + + @Test + public void testIsMethodSupported() throws IOException { + server = new RPC.Builder(conf).setProtocol(TestProtocol2.class) + .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, + TestProtocol2.versionID, addr, conf); + boolean supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertTrue(supported); + supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertFalse(supported); + } + + /** + * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up + * the server registry to extract protocol signatures and versions. + */ + @Test + public void testProtocolMetaInfoSSTranslatorPB() throws Exception { + TestImpl1 impl = new TestImpl1(); + server = new RPC.Builder(conf).setProtocol(TestProtocol1.class) + .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2) + .setVerbose(false).build(); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(server); + + GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RPC.RpcKind.RPC_PROTOCOL_BUFFER)); + //No signatures should be found + Assert.assertEquals(0, resp.getProtocolSignatureCount()); + resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RPC.RpcKind.RPC_WRITABLE)); + Assert.assertEquals(1, resp.getProtocolSignatureCount()); + ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); + Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); + boolean found = false; + int expected = ProtocolSignature.getFingerprint(TestProtocol1.class + .getMethod("echo", String.class)); + for (int m : sig.getMethodsList()) { + if (expected == m) { + found = true; + break; + } + } + Assert.assertTrue(found); + } + + private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( + Class protocol, RPC.RpcKind rpcKind) { + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + return builder.build(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java index b22f91b8e80..5807998a157 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; +import org.apache.hadoop.ipc.TestRPC.TestProtocol; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -28,13 +30,11 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.channels.ClosedByInterruptException; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; - /** * tests that the proxy can be interrupted */ -public class TestRPCWaitForProxy extends TestRpcBase { +public class TestRPCWaitForProxy extends Assert { + private static final String ADDRESS = "0.0.0.0"; private static final Logger LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class); @@ -46,15 +46,14 @@ public class TestRPCWaitForProxy extends TestRpcBase { * * @throws Throwable any exception other than that which was expected */ - @Test(timeout = 50000) + @Test(timeout = 10000) public void testWaitForProxy() throws Throwable { RpcThread worker = new RpcThread(0); worker.start(); worker.join(); Throwable caught = worker.getCaught(); - Throwable cause = caught.getCause(); - Assert.assertNotNull("No exception was raised", cause); - if (!(cause instanceof ConnectException)) { + assertNotNull("No exception was raised", caught); + if (!(caught instanceof ConnectException)) { throw caught; } } @@ -70,11 +69,11 @@ public class TestRPCWaitForProxy extends TestRpcBase { RpcThread worker = new RpcThread(100); worker.start(); Thread.sleep(1000); - Assert.assertTrue("worker hasn't started", worker.waitStarted); + assertTrue("worker hasn't started", worker.waitStarted); worker.interrupt(); worker.join(); Throwable caught = worker.getCaught(); - Assert.assertNotNull("No exception was raised", caught); + assertNotNull("No exception was raised", caught); // looking for the root cause here, which can be wrapped // as part of the NetUtils work. Having this test look // a the type of exception there would be brittle to improvements @@ -83,8 +82,6 @@ public class TestRPCWaitForProxy extends TestRpcBase { if (cause == null) { // no inner cause, use outer exception as root cause. cause = caught; - } else if (cause.getCause() != null) { - cause = cause.getCause(); } if (!(cause instanceof InterruptedIOException) && !(cause instanceof ClosedByInterruptException)) { @@ -115,16 +112,12 @@ public class TestRPCWaitForProxy extends TestRpcBase { IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, connectRetries); waitStarted = true; - - short invalidPort = 20; - InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS, - invalidPort); - TestRpcBase.TestRpcService proxy = RPC.getProxy( - TestRpcBase.TestRpcService.class, - 1L, invalidAddress, conf); - // Test echo method - proxy.echo(null, newEchoRequest("hello")); - + TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, + TestProtocol.versionID, + new InetSocketAddress(ADDRESS, 20), + config, + 15000L); + proxy.echo(""); } catch (Throwable throwable) { caught = throwable; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index 5a8f8d0124a..bc604a47ef2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -112,8 +112,7 @@ public class TestRpcBase { return setupTestServer(builder); } - protected static RPC.Server setupTestServer( - RPC.Builder builder) throws IOException { + protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException { RPC.Server server = builder.build(); server.start(); @@ -176,21 +175,17 @@ public class TestRpcBase { public TestTokenIdentifier() { this(new Text(), new Text()); } - public TestTokenIdentifier(Text tokenid) { this(tokenid, new Text()); } - public TestTokenIdentifier(Text tokenid, Text realUser) { this.tokenid = tokenid == null ? new Text() : tokenid; this.realUser = realUser == null ? new Text() : realUser; } - @Override public Text getKind() { return KIND_NAME; } - @Override public UserGroupInformation getUser() { if (realUser.toString().isEmpty()) { @@ -208,7 +203,6 @@ public class TestRpcBase { tokenid.readFields(in); realUser.readFields(in); } - @Override public void write(DataOutput out) throws IOException { tokenid.write(out); @@ -240,7 +234,7 @@ public class TestRpcBase { @SuppressWarnings("unchecked") @Override public Token selectToken(Text service, - Collection> tokens) { + Collection> tokens) { if (service == null) { return null; } @@ -394,17 +388,19 @@ public class TestRpcBase { } @Override - public TestProtos.UserResponseProto getAuthUser( + public TestProtos.AuthUserResponseProto getAuthUser( RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException { - UserGroupInformation authUser; + UserGroupInformation authUser = null; try { authUser = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new ServiceException(e); } - return newUserResponse(authUser.getUserName()); + return TestProtos.AuthUserResponseProto.newBuilder() + .setAuthUser(authUser.getUserName()) + .build(); } @Override @@ -436,34 +432,6 @@ public class TestRpcBase { return TestProtos.EmptyResponseProto.newBuilder().build(); } - - @Override - public TestProtos.UserResponseProto getCurrentUser( - RpcController controller, - TestProtos.EmptyRequestProto request) throws ServiceException { - String user; - try { - user = UserGroupInformation.getCurrentUser().toString(); - } catch (IOException e) { - throw new ServiceException("Failed to get current user", e); - } - - return newUserResponse(user); - } - - @Override - public TestProtos.UserResponseProto getServerRemoteUser( - RpcController controller, - TestProtos.EmptyRequestProto request) throws ServiceException { - String serverRemoteUser = Server.getRemoteUser().toString(); - return newUserResponse(serverRemoteUser); - } - - private TestProtos.UserResponseProto newUserResponse(String user) { - return TestProtos.UserResponseProto.newBuilder() - .setUser(user) - .build(); - } } protected static TestProtos.EmptyRequestProto newEmptyRequest() { @@ -510,4 +478,8 @@ public class TestRpcBase { } return null; } + + protected static String convert(TestProtos.AuthUserResponseProto response) { + return response.getAuthUser(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index 3809448ad4e..ec53e8c9762 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -29,25 +29,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.KerberosInfo; -import org.apache.hadoop.security.SaslInputStream; -import org.apache.hadoop.security.SaslPlainServer; -import org.apache.hadoop.security.SaslPropertiesResolver; -import org.apache.hadoop.security.SaslRpcClient; -import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.*; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; -import org.apache.hadoop.security.SecurityInfo; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.TestUserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.*; import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenInfo; -import org.apache.hadoop.security.token.TokenSelector; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Before; @@ -57,55 +44,30 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; +import javax.security.auth.callback.*; +import javax.security.sasl.*; import java.io.IOException; import java.lang.annotation.Annotation; import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.security.Security; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*; +import static org.junit.Assert.*; /** Unit tests for using Sasl over RPC. */ @RunWith(Parameterized.class) public class TestSaslRPC extends TestRpcBase { @Parameters public static Collection data() { - Collection params = new ArrayList<>(); + Collection params = new ArrayList(); for (QualityOfProtection qop : QualityOfProtection.values()) { params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null }); } @@ -151,7 +113,7 @@ public class TestSaslRPC extends TestRpcBase { NONE(), VALID(), INVALID(), - OTHER() + OTHER(); } @BeforeClass @@ -267,7 +229,7 @@ public class TestSaslRPC extends TestRpcBase { final Server server = setupTestServer(conf, 5, sm); doDigestRpc(server, sm); } finally { - SecurityUtil.setSecurityInfoProviders(); + SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]); } } @@ -296,7 +258,7 @@ public class TestSaslRPC extends TestRpcBase { addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName())); - Token token = new Token<>(tokenId, sm); + Token token = new Token(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); @@ -324,8 +286,8 @@ public class TestSaslRPC extends TestRpcBase { // set doPing to true newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); - ConnectionId remoteId = ConnectionId.getConnectionId( - new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf); + ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0), + TestRpcService.class, null, 0, null, newConf); assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, remoteId.getPingInterval()); // set doPing to false @@ -834,13 +796,13 @@ public class TestSaslRPC extends TestRpcBase { final TestTokenSecretManager sm = new TestTokenSecretManager(); boolean useSecretManager = (serverAuth != SIMPLE); if (enableSecretManager != null) { - useSecretManager &= enableSecretManager; + useSecretManager &= enableSecretManager.booleanValue(); } if (forceSecretManager != null) { - useSecretManager |= forceSecretManager; + useSecretManager |= forceSecretManager.booleanValue(); } final SecretManager serverSm = useSecretManager ? sm : null; - + Server server = serverUgi.doAs(new PrivilegedExceptionAction() { @Override public Server run() throws IOException { @@ -895,13 +857,13 @@ public class TestSaslRPC extends TestRpcBase { proxy.ping(null, newEmptyRequest()); // make sure the other side thinks we are who we said we are!!! assertEquals(clientUgi.getUserName(), - proxy.getAuthUser(null, newEmptyRequest()).getUser()); + convert(proxy.getAuthUser(null, newEmptyRequest()))); AuthMethod authMethod = convert(proxy.getAuthMethod(null, newEmptyRequest())); // verify sasl completed with correct QOP assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null, - RPC.getConnectionIdForProxy(proxy).getSaslQop()); - return authMethod != null ? authMethod.toString() : null; + RPC.getConnectionIdForProxy(proxy).getSaslQop()); + return authMethod.toString(); } catch (ServiceException se) { if (se.getCause() instanceof RemoteException) { throw (RemoteException) se.getCause(); @@ -926,18 +888,21 @@ public class TestSaslRPC extends TestRpcBase { String actual) { assertEquals(expect.toString(), actual); } - - private static void assertAuthEquals(Pattern expect, String actual) { + + private static void assertAuthEquals(Pattern expect, + String actual) { // this allows us to see the regexp and the value it didn't match if (!expect.matcher(actual).matches()) { - fail(); // it failed + assertEquals(expect, actual); // it failed + } else { + assertTrue(true); // it matched } } /* * Class used to test overriding QOP values using SaslPropertiesResolver */ - static class AuthSaslPropertiesResolver extends SaslPropertiesResolver { + static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{ @Override public Map getServerProperties(InetAddress address) { @@ -946,7 +911,7 @@ public class TestSaslRPC extends TestRpcBase { return newPropertes; } } - + public static void main(String[] args) throws Exception { System.out.println("Testing Kerberos authentication over RPC"); if (args.length != 2) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java index c4dbcac4c2d..50d389c6465 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java @@ -17,35 +17,40 @@ */ package org.apache.hadoop.security; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.TestRpcBase; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.security.token.Token; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Enumeration; +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenInfo; +import org.junit.Before; +import org.junit.Test; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector; +import org.apache.commons.logging.*; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; + /** - * Test do as effective user. + * */ -public class TestDoAsEffectiveUser extends TestRpcBase { +public class TestDoAsEffectiveUser { final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG"; final private static String REAL_USER_SHORT_NAME = "realUser1"; final private static String PROXY_USER_NAME = "proxyUser"; @@ -53,8 +58,8 @@ public class TestDoAsEffectiveUser extends TestRpcBase { final private static String GROUP2_NAME = "group2"; final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME, GROUP2_NAME }; - - private TestRpcService client; + private static final String ADDRESS = "0.0.0.0"; + private TestProtocol proxy; private static final Configuration masterConf = new Configuration(); @@ -77,7 +82,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase { private void configureSuperUserIPAddresses(Configuration conf, String superUserShortName) throws IOException { - ArrayList ipList = new ArrayList<>(); + ArrayList ipList = new ArrayList(); Enumeration netInterfaceList = NetworkInterface .getNetworkInterfaces(); while (netInterfaceList.hasMoreElements()) { @@ -125,19 +130,50 @@ public class TestDoAsEffectiveUser extends TestRpcBase { curUGI.toString()); } - private void checkRemoteUgi(final UserGroupInformation ugi, - final Configuration conf) throws Exception { + @TokenInfo(TestTokenSelector.class) + public interface TestProtocol extends VersionedProtocol { + public static final long versionID = 1L; + + String aMethod() throws IOException; + String getServerRemoteUser() throws IOException; + } + + public class TestImpl implements TestProtocol { + + @Override + public String aMethod() throws IOException { + return UserGroupInformation.getCurrentUser().toString(); + } + + @Override + public String getServerRemoteUser() throws IOException { + return Server.getRemoteUser().toString(); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return TestProtocol.versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return new ProtocolSignature(TestProtocol.versionID, null); + } + } + + private void checkRemoteUgi(final Server server, + final UserGroupInformation ugi, final Configuration conf) + throws Exception { ugi.doAs(new PrivilegedExceptionAction() { @Override - public Void run() throws ServiceException { - client = getClient(addr, conf); - String currentUser = client.getCurrentUser(null, - newEmptyRequest()).getUser(); - String serverRemoteUser = client.getServerRemoteUser(null, - newEmptyRequest()).getUser(); - - Assert.assertEquals(ugi.toString(), currentUser); - Assert.assertEquals(ugi.toString(), serverRemoteUser); + public Void run() throws IOException { + proxy = RPC.getProxy( + TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + Assert.assertEquals(ugi.toString(), proxy.aMethod()); + Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser()); return null; } }); @@ -149,27 +185,29 @@ public class TestDoAsEffectiveUser extends TestRpcBase { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - // Set RPC engine to protobuf RPC engine - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(5).setVerbose(true).build(); refreshConf(conf); try { + server.start(); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(realUserUgi, conf); + checkRemoteUgi(server, realUserUgi, conf); - UserGroupInformation proxyUserUgi = - UserGroupInformation.createProxyUserForTesting( + UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting( PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(proxyUserUgi, conf); + checkRemoteUgi(server, proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -180,25 +218,29 @@ public class TestDoAsEffectiveUser extends TestRpcBase { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - checkRemoteUgi(realUserUgi, conf); + checkRemoteUgi(server, realUserUgi, conf); UserGroupInformation proxyUserUgi = UserGroupInformation .createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES); - checkRemoteUgi(proxyUserUgi, conf); + checkRemoteUgi(server, proxyUserUgi, conf); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -214,14 +256,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -230,10 +275,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -241,7 +287,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -250,14 +299,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase { final Configuration conf = new Configuration(); conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 2); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -266,10 +318,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -277,7 +330,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -285,12 +341,15 @@ public class TestDoAsEffectiveUser extends TestRpcBase { public void testRealUserGroupNotSpecified() throws IOException { final Configuration conf = new Configuration(); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 2); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -299,10 +358,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -310,7 +370,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -321,14 +384,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase { conf.setStrings(DefaultImpersonationProvider.getTestProvider(). getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group3"); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); - UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 2); + Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(2).setVerbose(false).build(); refreshConf(conf); try { + server.start(); + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + UserGroupInformation realUserUgi = UserGroupInformation .createRemoteUser(REAL_USER_NAME); @@ -337,10 +403,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction() { @Override - public String run() throws ServiceException { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + public String run() throws IOException { + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } }); @@ -348,7 +415,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase { } catch (Exception e) { e.printStackTrace(); } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } @@ -362,17 +432,20 @@ public class TestDoAsEffectiveUser extends TestRpcBase { final Configuration conf = new Configuration(masterConf); TestTokenSecretManager sm = new TestTokenSecretManager(); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); - RPC.setProtocolEngine(conf, TestRpcService.class, - ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(conf); - final Server server = setupTestServer(conf, 5, sm); + final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) + .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) + .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build(); + + server.start(); final UserGroupInformation current = UserGroupInformation .createRemoteUser(REAL_USER_NAME); - + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token<>(tokenId, + Token token = new Token(tokenId, sm); SecurityUtil.setTokenService(token, addr); UserGroupInformation proxyUserUgi = UserGroupInformation @@ -380,19 +453,23 @@ public class TestDoAsEffectiveUser extends TestRpcBase { proxyUserUgi.addToken(token); refreshConf(conf); - + String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - client = getClient(addr, conf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + String ret = proxy.aMethod(); + return ret; } catch (Exception e) { e.printStackTrace(); throw e; } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } }); @@ -409,34 +486,42 @@ public class TestDoAsEffectiveUser extends TestRpcBase { TestTokenSecretManager sm = new TestTokenSecretManager(); final Configuration newConf = new Configuration(masterConf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf); - // Set RPC engine to protobuf RPC engine - RPC.setProtocolEngine(newConf, TestRpcService.class, - ProtobufRpcEngine.class); UserGroupInformation.setConfiguration(newConf); - final Server server = setupTestServer(newConf, 5, sm); + final Server server = new RPC.Builder(newConf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) + .setSecretManager(sm).build(); + + server.start(); final UserGroupInformation current = UserGroupInformation .createUserForTesting(REAL_USER_NAME, GROUP_NAMES); refreshConf(newConf); - + + final InetSocketAddress addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName()), new Text("SomeSuperUser")); - Token token = new Token<>(tokenId, sm); + Token token = new Token(tokenId, + sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); String retVal = current.doAs(new PrivilegedExceptionAction() { @Override public String run() throws Exception { try { - client = getClient(addr, newConf); - return client.getCurrentUser(null, - newEmptyRequest()).getUser(); + proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, newConf); + String ret = proxy.aMethod(); + return ret; } catch (Exception e) { e.printStackTrace(); throw e; } finally { - stop(server, client); + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } } } }); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java index 462f0a48223..91f36e599c6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java @@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -29,11 +28,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; @@ -53,22 +48,9 @@ import java.util.Set; import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; -import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; -import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; -import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.apache.hadoop.ipc.TestSaslRPC.*; +import static org.apache.hadoop.test.MetricsAsserts.*; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -125,7 +107,7 @@ public class TestUserGroupInformation { UserGroupInformation.setLoginUser(null); } - @Test(timeout = 30000) + @Test (timeout = 30000) public void testSimpleLogin() throws IOException { tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true); } diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto index 6411f97ab65..99cd93d711c 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto @@ -88,6 +88,6 @@ message AuthMethodResponseProto { required string mechanismName = 2; } -message UserResponseProto { - required string user = 1; +message AuthUserResponseProto { + required string authUser = 1; } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index 06f6c4fc1db..32921158857 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -40,11 +40,9 @@ service TestProtobufRpcProto { rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); rpc sleep(SleepRequestProto) returns (EmptyResponseProto); rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto); - rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto); + rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto); rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto); rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto); - rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto); - rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto); } service TestProtobufRpc2Proto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 57f7cb197b6..6b529498686 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -168,6 +168,7 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntry; import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.net.Node; @@ -316,6 +317,8 @@ public class NameNodeRpcServer implements NamenodeProtocols { new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); + + WritableRpcEngine.ensureInitialized(); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); if (serviceRpcAddr != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java new file mode 100644 index 00000000000..0b7ee337d8b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.security; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.mockito.Mockito.mock; + +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslRpcClient; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Test; + +/** Unit tests for using Delegation Token over RPC. */ +public class TestClientProtocolWithDelegationToken { + private static final String ADDRESS = "0.0.0.0"; + + public static final Log LOG = LogFactory + .getLog(TestClientProtocolWithDelegationToken.class); + + private static final Configuration conf; + static { + conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + static { + GenericTestUtils.setLogLevel(Client.LOG, Level.ALL); + GenericTestUtils.setLogLevel(Server.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL); + } + + @Test + public void testDelegationTokenRpc() throws Exception { + ClientProtocol mockNN = mock(ClientProtocol.class); + FSNamesystem mockNameSys = mock(FSNamesystem.class); + + DelegationTokenSecretManager sm = new DelegationTokenSecretManager( + DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, + 3600000, mockNameSys); + sm.startThreads(); + final Server server = new RPC.Builder(conf) + .setProtocol(ClientProtocol.class).setInstance(mockNN) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) + .setSecretManager(sm).build(); + + server.start(); + + final UserGroupInformation current = UserGroupInformation.getCurrentUser(); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + String user = current.getUserName(); + Text owner = new Text(user); + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null); + Token token = new Token( + dtId, sm); + SecurityUtil.setTokenService(token, addr); + LOG.info("Service for token is " + token.getService()); + current.addToken(token); + current.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + ClientProtocol proxy = null; + try { + proxy = RPC.getProxy(ClientProtocol.class, + ClientProtocol.versionID, addr, conf); + proxy.getServerDefaults(); + } finally { + server.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + return null; + } + }); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index 729af0a951d..3fef5e278b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -97,6 +98,8 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService .newReflectiveBlockingService(refreshHSAdminProtocolXlator); + WritableRpcEngine.ensureInitialized(); + clientRpcAddress = conf.getSocketAddr( JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.JHS_ADMIN_ADDRESS,