Revert "HADOOP-12579. Deprecate and remove WriteableRPCEngine. Contributed by Kai Zheng"

This reverts commit a6c79f92d5.
This commit is contained in:
Kai Zheng 2016-06-01 08:41:15 +08:00
parent 4e1f56e111
commit 93d8a7f2a2
21 changed files with 1454 additions and 319 deletions

View File

@ -67,7 +67,7 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ThreadLocal<AsyncGet<Message, Exception>> private static final ThreadLocal<AsyncGet<Message, Exception>>
ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for ProtobufRpcEngine static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine( org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
new Server.ProtoBufRpcInvoker()); new Server.ProtoBufRpcInvoker());
@ -201,8 +201,7 @@ public Object invoke(Object proxy, final Method method, Object[] args)
} }
if (args.length != 2) { // RpcController + Message if (args.length != 2) { // RpcController + Message
throw new ServiceException( throw new ServiceException("Too many parameters for request. Method: ["
"Too many or few parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: " + method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length); + args.length);
} }

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -28,6 +26,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.NoRouteToHostException; import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.io.*;
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -38,12 +37,11 @@
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.*;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
@ -56,6 +54,7 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -85,10 +84,10 @@ public class RPC {
final static int RPC_SERVICE_CLASS_DEFAULT = 0; final static int RPC_SERVICE_CLASS_DEFAULT = 0;
public enum RpcKind { public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests RPC_BUILTIN ((short) 1), // Used for built in calls by tests
// 2 for WritableRpcEngine, obsolete and removed RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
private final short value; public final short value; //TODO make it private
RpcKind(short val) { RpcKind(short val) {
this.value = val; this.value = val;
@ -208,7 +207,7 @@ static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
RpcEngine engine = PROTOCOL_ENGINES.get(protocol); RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) { if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
ProtobufRpcEngine.class); WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine); PROTOCOL_ENGINES.put(protocol, engine);
} }
@ -950,10 +949,10 @@ VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind,
return new VerProtocolImpl(highestVersion, highest); return new VerProtocolImpl(highestVersion, highest);
} }
protected Server(String bindAddress, int port, protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount, Class<? extends Writable> paramClass, int handlerCount,
int numReaders, int queueSizePerHandler, int numReaders, int queueSizePerHandler,
Configuration conf, String serverName, Configuration conf, String serverName,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) throws IOException { String portRangeConfig) throws IOException {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,

View File

@ -243,14 +243,14 @@ private static Set<String> addExceptions(
static class RpcKindMapValue { static class RpcKindMapValue {
final Class<? extends Writable> rpcRequestWrapperClass; final Class<? extends Writable> rpcRequestWrapperClass;
final RpcInvoker rpcInvoker; final RpcInvoker rpcInvoker;
RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass, RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
RpcInvoker rpcInvoker) { RpcInvoker rpcInvoker) {
this.rpcInvoker = rpcInvoker; this.rpcInvoker = rpcInvoker;
this.rpcRequestWrapperClass = rpcRequestWrapperClass; this.rpcRequestWrapperClass = rpcRequestWrapperClass;
} }
} }
static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new HashMap<>(4); static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new
HashMap<RPC.RpcKind, RpcKindMapValue>(4);

View File

@ -0,0 +1,564 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.io.*;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
/** An RpcEngine implementation for Writable data. */
@InterfaceStability.Evolving
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
//writableRpcVersion should be updated if there is a change
//in format of the rpc messages.
// 2L - added declared class to Invocation
public static final long writableRpcVersion = 2L;
/**
* Whether or not this class has been initialized.
*/
private static boolean isInitialized = false;
static {
ensureInitialized();
}
/**
* Initialize this class if it isn't already.
*/
public static synchronized void ensureInitialized() {
if (!isInitialized) {
initialize();
}
}
/**
* Register the rpcRequest deserializer for WritableRpcEngine
*/
private static synchronized void initialize() {
org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE,
Invocation.class, new Server.WritableRpcInvoker());
isInitialized = true;
}
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class<?>[] parameterClasses;
private Object[] parameters;
private Configuration conf;
private long clientVersion;
private int clientMethodsHash;
private String declaringClassProtocolName;
//This could be different from static writableRpcVersion when received
//at server, if client is using a different version.
private long rpcVersion;
@SuppressWarnings("unused") // called when deserializing an invocation
public Invocation() {}
public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
rpcVersion = writableRpcVersion;
if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
//VersionedProtocol is exempted from version check.
clientVersion = 0;
clientMethodsHash = 0;
} else {
this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
this.clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
}
this.declaringClassProtocolName =
RPC.getProtocolName(method.getDeclaringClass());
}
/** The name of the method invoked. */
public String getMethodName() { return methodName; }
/** The parameter classes. */
public Class<?>[] getParameterClasses() { return parameterClasses; }
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
private long getProtocolVersion() {
return clientVersion;
}
@SuppressWarnings("unused")
private int getClientMethodsHash() {
return clientMethodsHash;
}
/**
* Returns the rpc version used by the client.
* @return rpcVersion
*/
public long getRpcVersion() {
return rpcVersion;
}
@Override
@SuppressWarnings("deprecation")
public void readFields(DataInput in) throws IOException {
rpcVersion = in.readLong();
declaringClassProtocolName = UTF8.readString(in);
methodName = UTF8.readString(in);
clientVersion = in.readLong();
clientMethodsHash = in.readInt();
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] =
ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
@Override
@SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion);
UTF8.writeString(out, declaringClassProtocolName);
UTF8.writeString(out, methodName);
out.writeLong(clientVersion);
out.writeInt(clientMethodsHash);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf, true);
}
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append(methodName);
buffer.append("(");
for (int i = 0; i < parameters.length; i++) {
if (i != 0)
buffer.append(", ");
buffer.append(parameters[i]);
}
buffer.append(")");
buffer.append(", rpc version="+rpcVersion);
buffer.append(", client version="+clientVersion);
buffer.append(", methodsFingerPrint="+clientMethodsHash);
return buffer.toString();
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
}
private static ClientCache CLIENTS=new ClientCache();
private static class Invoker implements RpcInvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
private final AtomicBoolean fallbackToSimpleAuth;
public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, null, conf);
this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
// if Tracing is on then start a new span for this rpc.
// guard it in the if statement to make sure there isn't
// any extra string manipulation.
Tracer tracer = Tracer.curThreadTracer();
TraceScope traceScope = null;
if (tracer != null) {
traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
}
ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
@Override
synchronized public void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
@Override
public ConnectionId getConnectionId() {
return remoteId;
}
}
// for unit testing only
@InterfaceAudience.Private
@InterfaceStability.Unstable
static Client getClient(Configuration conf) {
return CLIENTS.getClient(conf);
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
* @param <T>*/
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy)
throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null);
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
* @param <T>*/
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (connectionRetryPolicy != null) {
throw new UnsupportedOperationException(
"Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
}
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy<T>(protocol, proxy, true);
}
/* Construct a server for a protocol implementation instance listening on a
* port and address. */
@Override
public RPC.Server getServer(Class<?> protocolClass,
Object protocolImpl, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
}
/** An RPC Server. */
public static class Server extends RPC.Server {
/**
* Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*
* @deprecated Use #Server(Class, Object, Configuration, String, int)
*/
@Deprecated
public Server(Object instance, Configuration conf, String bindAddress,
int port) throws IOException {
this(null, instance, conf, bindAddress, port);
}
/** Construct an RPC server.
* @param protocolClass class
* @param protocolImpl the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port)
throws IOException {
this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
false, null, null);
}
/**
* Construct an RPC server.
* @param protocolImpl the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*
* @deprecated use Server#Server(Class, Object,
* Configuration, String, int, int, int, int, boolean, SecretManager)
*/
@Deprecated
public Server(Object protocolImpl, Configuration conf, String bindAddress,
int port, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this(null, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose,
secretManager, null);
}
/**
* Construct an RPC server.
* @param protocolClass - the protocol being registered
* can be null for compatibility with old usage (see below for details)
* @param protocolImpl the protocol impl that will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager,
portRangeConfig);
this.verbose = verbose;
Class<?>[] protocols;
if (protocolClass == null) { // derive protocol from impl
/*
* In order to remain compatible with the old usage where a single
* target protocolImpl is suppled for all protocol interfaces, and
* the protocolImpl is derived from the protocolClass(es)
* we register all interfaces extended by the protocolImpl
*/
protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
} else {
if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
throw new IOException("protocolClass "+ protocolClass +
" is not implemented by protocolImpl which is of class " +
protocolImpl.getClass());
}
// register protocol class and its super interfaces
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
protocols = RPC.getProtocolInterfaces(protocolClass);
}
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
}
}
}
private static void log(String value) {
if (value!= null && value.length() > 55)
value = value.substring(0, 55)+"...";
LOG.info(value);
}
static class WritableRpcInvoker implements RpcInvoker {
@Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
String protocolName, Writable rpcRequest, long receivedTime)
throws IOException, RPC.VersionMismatch {
Invocation call = (Invocation)rpcRequest;
if (server.verbose) log("Call: " + call);
// Verify writable rpc version
if (call.getRpcVersion() != writableRpcVersion) {
// Client is using a different version of WritableRpc
throw new RpcServerException(
"WritableRpc version mismatch, client side version="
+ call.getRpcVersion() + ", server side version="
+ writableRpcVersion);
}
long clientVersion = call.getProtocolVersion();
final String protoName;
ProtoClassProtoImpl protocolImpl;
if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
// VersionProtocol methods are often used by client to figure out
// which version of protocol to use.
//
// Versioned protocol methods should go the protocolName protocol
// rather than the declaring class of the method since the
// the declaring class is VersionedProtocol which is not
// registered directly.
// Send the call to the highest protocol version
VerProtocolImpl highest = server.getHighestSupportedProtocol(
RPC.RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) {
throw new RpcServerException("Unknown protocol: " + protocolName);
}
protocolImpl = highest.protocolTarget;
} else {
protoName = call.declaringClassProtocolName;
// Find the right impl for the protocol based on client version.
ProtoNameVer pv =
new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
protocolImpl =
server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
protoName);
if (highest == null) {
throw new RpcServerException("Unknown protocol: " + protoName);
} else { // protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
}
}
// Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try {
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
if (server.verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
exception = (IOException)target;
throw (IOException)target;
} else {
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
exception = ioe;
throw ioe;
}
} catch (Throwable e) {
if (!(e instanceof IOException)) {
LOG.error("Unexpected throwable object ", e);
}
IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
exception = ioe;
throw ioe;
} finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + call.getMethodName() +
" queueTime= " + qTime + " procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
call.getMethodName() :
exception.getClass().getSimpleName();
server.updateMetrics(detailedMetricsName, qTime, processingTime);
}
}
}
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
}

View File

@ -689,7 +689,7 @@ public static UserGroupInformation getBestUGI(
* *
* @param user The principal name to load from the ticket * @param user The principal name to load from the ticket
* cache * cache
* @param ticketCache the path to the ticket cache file * @param ticketCachePath the path to the ticket cache file
* *
* @throws IOException if the kerberos login fails * @throws IOException if the kerberos login fails
*/ */
@ -749,7 +749,7 @@ public static UserGroupInformation getUGIFromTicketCache(
/** /**
* Create a UserGroupInformation from a Subject with Kerberos principal. * Create a UserGroupInformation from a Subject with Kerberos principal.
* *
* @param subject The KerberosPrincipal to use in UGI * @param user The KerberosPrincipal to use in UGI
* *
* @throws IOException if the kerberos login fails * @throws IOException if the kerberos login fails
*/ */

View File

@ -146,6 +146,7 @@ public static UserGroupInformation getUgi(UserInformationProto userInfo) {
static RpcKindProto convert(RPC.RpcKind kind) { static RpcKindProto convert(RPC.RpcKind kind) {
switch (kind) { switch (kind) {
case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN; case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN;
case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE;
case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER; case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER;
} }
return null; return null;
@ -155,6 +156,7 @@ static RpcKindProto convert(RPC.RpcKind kind) {
public static RPC.RpcKind convert( RpcKindProto kind) { public static RPC.RpcKind convert( RpcKindProto kind) {
switch (kind) { switch (kind) {
case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN; case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN;
case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE;
case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER; case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER;
} }
return null; return null;

View File

@ -44,10 +44,10 @@ package hadoop.common;
/** /**
* RpcKind determine the rpcEngine and the serialization of the rpc request * RpcKind determine the rpcEngine and the serialization of the rpc request
* Note: 1 for RPC_WRITABLE, WritableRpcEngine, obsolete and removed
*/ */
enum RpcKindProto { enum RpcKindProto {
RPC_BUILTIN = 0; // Used for built in calls by tests RPC_BUILTIN = 0; // Used for built in calls by tests
RPC_WRITABLE = 1; // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
} }

View File

@ -17,8 +17,13 @@
*/ */
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import com.google.common.base.Joiner; import java.io.IOException;
import com.google.protobuf.BlockingService; import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.GnuParser;
@ -29,6 +34,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
@ -39,12 +45,8 @@
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import java.io.IOException; import com.google.common.base.Joiner;
import java.lang.management.ManagementFactory; import com.google.protobuf.BlockingService;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* Benchmark for protobuf RPC. * Benchmark for protobuf RPC.
@ -66,7 +68,7 @@ private static class MyOptions {
public int secondsToRun = 15; public int secondsToRun = 15;
private int msgSize = 1024; private int msgSize = 1024;
public Class<? extends RpcEngine> rpcEngine = public Class<? extends RpcEngine> rpcEngine =
ProtobufRpcEngine.class; WritableRpcEngine.class;
private MyOptions(String args[]) { private MyOptions(String args[]) {
try { try {
@ -133,7 +135,7 @@ private Options buildOptions() {
opts.addOption( opts.addOption(
OptionBuilder.withLongOpt("engine").hasArg(true) OptionBuilder.withLongOpt("engine").hasArg(true)
.withArgName("protobuf") .withArgName("writable|protobuf")
.withDescription("engine to use") .withDescription("engine to use")
.create('e')); .create('e'));
@ -182,6 +184,8 @@ private void processOptions(CommandLine line, Options opts)
String eng = line.getOptionValue('e'); String eng = line.getOptionValue('e');
if ("protobuf".equals(eng)) { if ("protobuf".equals(eng)) {
rpcEngine = ProtobufRpcEngine.class; rpcEngine = ProtobufRpcEngine.class;
} else if ("writable".equals(eng)) {
rpcEngine = WritableRpcEngine.class;
} else { } else {
throw new ParseException("invalid engine: " + eng); throw new ParseException("invalid engine: " + eng);
} }
@ -233,6 +237,11 @@ private Server startServer(MyOptions opts) throws IOException {
server = new RPC.Builder(conf).setProtocol(TestRpcService.class) server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(opts.host).setPort(opts.getPort()) .setInstance(service).setBindAddress(opts.host).setPort(opts.getPort())
.setNumHandlers(opts.serverThreads).setVerbose(false).build(); .setNumHandlers(opts.serverThreads).setVerbose(false).build();
} else if (opts.rpcEngine == WritableRpcEngine.class) {
server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host)
.setPort(opts.getPort()).setNumHandlers(opts.serverThreads)
.setVerbose(false).build();
} else { } else {
throw new RuntimeException("Bad engine: " + opts.rpcEngine); throw new RuntimeException("Bad engine: " + opts.rpcEngine);
} }
@ -390,6 +399,15 @@ public String doEcho(String msg) throws Exception {
return responseProto.getMessage(); return responseProto.getMessage();
} }
}; };
} else if (opts.rpcEngine == WritableRpcEngine.class) {
final TestProtocol proxy = RPC.getProxy(
TestProtocol.class, TestProtocol.versionID, addr, conf);
return new RpcServiceWrapper() {
@Override
public String doEcho(String msg) throws Exception {
return proxy.echo(msg);
}
};
} else { } else {
throw new RuntimeException("unsupported engine: " + opts.rpcEngine); throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
} }

View File

@ -17,28 +17,252 @@
*/ */
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.After; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.net.NetUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import com.google.protobuf.BlockingService;
public class TestMultipleProtocolServer extends TestRpcBase { public class TestMultipleProtocolServer extends TestRpcBase {
private static InetSocketAddress addr;
private static RPC.Server server; private static RPC.Server server;
@Before private static Configuration conf = new Configuration();
public void setUp() throws Exception {
super.setupConf();
@ProtocolInfo(protocolName="Foo")
server = setupTestServer(conf, 2); interface Foo0 extends VersionedProtocol {
public static final long versionID = 0L;
String ping() throws IOException;
}
@ProtocolInfo(protocolName="Foo")
interface Foo1 extends VersionedProtocol {
public static final long versionID = 1L;
String ping() throws IOException;
String ping2() throws IOException;
}
@ProtocolInfo(protocolName="Foo")
interface FooUnimplemented extends VersionedProtocol {
public static final long versionID = 2L;
String ping() throws IOException;
}
interface Mixin extends VersionedProtocol{
public static final long versionID = 0L;
void hello() throws IOException;
} }
interface Bar extends Mixin {
public static final long versionID = 0L;
int echo(int i) throws IOException;
}
class Foo0Impl implements Foo0 {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Foo0.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public String ping() {
return "Foo0";
}
}
class Foo1Impl implements Foo1 {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Foo1.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public String ping() {
return "Foo1";
}
@Override
public String ping2() {
return "Foo1";
}
}
class BarImpl implements Bar {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Bar.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public int echo(int i) {
return i;
}
@Override
public void hello() {
}
}
@Before
public void setUp() throws Exception {
// create a server with two handlers
server = new RPC.Builder(conf).setProtocol(Foo0.class)
.setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
// Add Protobuf server
// Create server side implementation
PBServerImpl pbServerImpl = new PBServerImpl();
BlockingService service = TestProtobufRpcProto
.newReflectiveBlockingService(pbServerImpl);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
service);
server.start();
addr = NetUtils.getConnectAddress(server);
}
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
server.stop(); server.stop();
} }
@Test
public void test1() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
Foo0 foo0 = (Foo0)proxy.getProxy();
Assert.assertEquals("Foo0", foo0.ping());
proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
Foo1 foo1 = (Foo1)proxy.getProxy();
Assert.assertEquals("Foo1", foo1.ping());
Assert.assertEquals("Foo1", foo1.ping());
proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
Bar bar = (Bar)proxy.getProxy();
Assert.assertEquals(99, bar.echo(99));
// Now test Mixin class method
Mixin mixin = bar;
mixin.hello();
}
// Server does not implement the FooUnimplemented version of protocol Foo.
// See that calls to it fail.
@Test(expected=IOException.class)
public void testNonExistingProtocol() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(FooUnimplemented.class,
FooUnimplemented.versionID, addr, conf);
FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
foo.ping();
}
/**
* getProtocolVersion of an unimplemented version should return highest version
* Similarly getProtocolSignature should work.
* @throws IOException
*/
@Test
public void testNonExistingProtocol2() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(FooUnimplemented.class,
FooUnimplemented.versionID, addr, conf);
FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
Assert.assertEquals(Foo1.versionID,
foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class),
FooUnimplemented.versionID));
foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class),
FooUnimplemented.versionID, 0);
}
@Test(expected=IOException.class)
public void testIncorrectServerCreation() throws IOException {
new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
.build();
}
// Now test a PB service - a server hosts both PB and Writable Rpcs. // Now test a PB service - a server hosts both PB and Writable Rpcs.
@Test @Test
public void testPBService() throws Exception { public void testPBService() throws Exception {

View File

@ -25,6 +25,19 @@
public class TestRPCCallBenchmark { public class TestRPCCallBenchmark {
@Test(timeout=20000)
public void testBenchmarkWithWritable() throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(),
new String[] {
"--clientThreads", "30",
"--serverThreads", "30",
"--time", "5",
"--serverReaderThreads", "4",
"--messageSize", "1024",
"--engine", "writable"});
assertEquals(0, rc);
}
@Test(timeout=20000) @Test(timeout=20000)
public void testBenchmarkWithProto() throws Exception { public void testBenchmarkWithProto() throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(), int rc = ToolRunner.run(new RPCCallBenchmark(),

View File

@ -18,19 +18,27 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import org.apache.commons.logging.Log; import static org.junit.Assert.assertEquals;
import org.apache.commons.logging.LogFactory; import static org.junit.Assert.assertFalse;
import org.apache.hadoop.conf.Configuration; import static org.junit.Assert.fail;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import static org.junit.Assert.assertEquals; import org.junit.Assert;
import static org.junit.Assert.assertFalse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/** Unit test for supporting method-name based compatible RPCs. */ /** Unit test for supporting method-name based compatible RPCs. */
public class TestRPCCompatibility { public class TestRPCCompatibility {
@ -41,7 +49,7 @@ public class TestRPCCompatibility {
public static final Log LOG = public static final Log LOG =
LogFactory.getLog(TestRPCCompatibility.class); LogFactory.getLog(TestRPCCompatibility.class);
private static Configuration conf = new Configuration(); private static Configuration conf = new Configuration();
public interface TestProtocol0 extends VersionedProtocol { public interface TestProtocol0 extends VersionedProtocol {
@ -112,21 +120,6 @@ public long getProtocolVersion(String protocol,
@Before @Before
public void setUp() { public void setUp() {
ProtocolSignature.resetCache(); ProtocolSignature.resetCache();
RPC.setProtocolEngine(conf,
TestProtocol0.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol1.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol2.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol3.class, ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf,
TestProtocol4.class, ProtobufRpcEngine.class);
} }
@After @After
@ -140,7 +133,117 @@ public void tearDown() {
server = null; server = null;
} }
} }
@Test // old client vs new server
public void testVersion0ClientVersion1Server() throws Exception {
// create a server with two handlers
TestImpl1 impl = new TestImpl1();
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
proxy = RPC.getProtocolProxy(
TestProtocol0.class, TestProtocol0.versionID, addr, conf);
TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy();
proxy0.ping();
}
@Test // old client vs new server
public void testVersion1ClientVersion0Server() throws Exception {
// create a server with two handlers
server = new RPC.Builder(conf).setProtocol(TestProtocol0.class)
.setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start();
addr = NetUtils.getConnectAddress(server);
proxy = RPC.getProtocolProxy(
TestProtocol1.class, TestProtocol1.versionID, addr, conf);
TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy();
proxy1.ping();
try {
proxy1.echo("hello");
fail("Echo should fail");
} catch(IOException e) {
}
}
private class Version2Client {
private TestProtocol2 proxy2;
private ProtocolProxy<TestProtocol2> serverInfo;
private Version2Client() throws IOException {
serverInfo = RPC.getProtocolProxy(
TestProtocol2.class, TestProtocol2.versionID, addr, conf);
proxy2 = serverInfo.getProxy();
}
public int echo(int value) throws IOException, NumberFormatException {
if (serverInfo.isMethodSupported("echo", int.class)) {
System.out.println("echo int is supported");
return -value; // use version 3 echo long
} else { // server is version 2
System.out.println("echo int is NOT supported");
return Integer.parseInt(proxy2.echo(String.valueOf(value)));
}
}
public String echo(String value) throws IOException {
return proxy2.echo(value);
}
public void ping() throws IOException {
proxy2.ping();
}
}
@Test // Compatible new client & old server
public void testVersion2ClientVersion1Server() throws Exception {
// create a server with two handlers
TestImpl1 impl = new TestImpl1();
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
Version2Client client = new Version2Client();
client.ping();
assertEquals("hello", client.echo("hello"));
// echo(int) is not supported by server, so returning 3
// This verifies that echo(int) and echo(String)'s hash codes are different
assertEquals(3, client.echo(3));
}
@Test // equal version client and server
public void testVersion2ClientVersion2Server() throws Exception {
// create a server with two handlers
TestImpl2 impl = new TestImpl2();
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
Version2Client client = new Version2Client();
client.ping();
assertEquals("hello", client.echo("hello"));
// now that echo(int) is supported by the server, echo(int) should return -3
assertEquals(-3, client.echo(3));
}
public interface TestProtocol3 { public interface TestProtocol3 {
int echo(String value); int echo(String value);
int echo(int value); int echo(int value);
@ -194,4 +297,97 @@ public interface TestProtocol4 extends TestProtocol2 {
@Override @Override
int echo(int value) throws IOException; int echo(int value) throws IOException;
} }
@Test
public void testVersionMismatch() throws IOException {
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
.setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start();
addr = NetUtils.getConnectAddress(server);
TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
TestProtocol4.versionID, addr, conf);
try {
proxy.echo(21);
fail("The call must throw VersionMismatch exception");
} catch (RemoteException ex) {
Assert.assertEquals(RPC.VersionMismatch.class.getName(),
ex.getClassName());
Assert.assertTrue(ex.getErrorCode().equals(
RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH));
} catch (IOException ex) {
fail("Expected version mismatch but got " + ex);
}
}
@Test
public void testIsMethodSupported() throws IOException {
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
.setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start();
addr = NetUtils.getConnectAddress(server);
TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
TestProtocol2.versionID, addr, conf);
boolean supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertTrue(supported);
supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertFalse(supported);
}
/**
* Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
* the server registry to extract protocol signatures and versions.
*/
@Test
public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
TestImpl1 impl = new TestImpl1();
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(server);
GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RPC.RpcKind.RPC_PROTOCOL_BUFFER));
//No signatures should be found
Assert.assertEquals(0, resp.getProtocolSignatureCount());
resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RPC.RpcKind.RPC_WRITABLE));
Assert.assertEquals(1, resp.getProtocolSignatureCount());
ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
boolean found = false;
int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
.getMethod("echo", String.class));
for (int m : sig.getMethodsList()) {
if (expected == m) {
found = true;
break;
}
}
Assert.assertTrue(found);
}
private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
Class<?> protocol, RPC.RpcKind rpcKind) {
GetProtocolSignatureRequestProto.Builder builder =
GetProtocolSignatureRequestProto.newBuilder();
builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString());
return builder.build();
}
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -28,13 +30,11 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
/** /**
* tests that the proxy can be interrupted * tests that the proxy can be interrupted
*/ */
public class TestRPCWaitForProxy extends TestRpcBase { public class TestRPCWaitForProxy extends Assert {
private static final String ADDRESS = "0.0.0.0";
private static final Logger private static final Logger
LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class); LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
@ -46,15 +46,14 @@ public class TestRPCWaitForProxy extends TestRpcBase {
* *
* @throws Throwable any exception other than that which was expected * @throws Throwable any exception other than that which was expected
*/ */
@Test(timeout = 50000) @Test(timeout = 10000)
public void testWaitForProxy() throws Throwable { public void testWaitForProxy() throws Throwable {
RpcThread worker = new RpcThread(0); RpcThread worker = new RpcThread(0);
worker.start(); worker.start();
worker.join(); worker.join();
Throwable caught = worker.getCaught(); Throwable caught = worker.getCaught();
Throwable cause = caught.getCause(); assertNotNull("No exception was raised", caught);
Assert.assertNotNull("No exception was raised", cause); if (!(caught instanceof ConnectException)) {
if (!(cause instanceof ConnectException)) {
throw caught; throw caught;
} }
} }
@ -70,11 +69,11 @@ public void testInterruptedWaitForProxy() throws Throwable {
RpcThread worker = new RpcThread(100); RpcThread worker = new RpcThread(100);
worker.start(); worker.start();
Thread.sleep(1000); Thread.sleep(1000);
Assert.assertTrue("worker hasn't started", worker.waitStarted); assertTrue("worker hasn't started", worker.waitStarted);
worker.interrupt(); worker.interrupt();
worker.join(); worker.join();
Throwable caught = worker.getCaught(); Throwable caught = worker.getCaught();
Assert.assertNotNull("No exception was raised", caught); assertNotNull("No exception was raised", caught);
// looking for the root cause here, which can be wrapped // looking for the root cause here, which can be wrapped
// as part of the NetUtils work. Having this test look // as part of the NetUtils work. Having this test look
// a the type of exception there would be brittle to improvements // a the type of exception there would be brittle to improvements
@ -83,8 +82,6 @@ public void testInterruptedWaitForProxy() throws Throwable {
if (cause == null) { if (cause == null) {
// no inner cause, use outer exception as root cause. // no inner cause, use outer exception as root cause.
cause = caught; cause = caught;
} else if (cause.getCause() != null) {
cause = cause.getCause();
} }
if (!(cause instanceof InterruptedIOException) if (!(cause instanceof InterruptedIOException)
&& !(cause instanceof ClosedByInterruptException)) { && !(cause instanceof ClosedByInterruptException)) {
@ -115,16 +112,12 @@ public void run() {
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
connectRetries); connectRetries);
waitStarted = true; waitStarted = true;
TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
short invalidPort = 20; TestProtocol.versionID,
InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS, new InetSocketAddress(ADDRESS, 20),
invalidPort); config,
TestRpcBase.TestRpcService proxy = RPC.getProxy( 15000L);
TestRpcBase.TestRpcService.class, proxy.echo("");
1L, invalidAddress, conf);
// Test echo method
proxy.echo(null, newEchoRequest("hello"));
} catch (Throwable throwable) { } catch (Throwable throwable) {
caught = throwable; caught = throwable;
} }

View File

@ -112,8 +112,7 @@ protected static RPC.Server setupTestServer(Configuration serverConf,
return setupTestServer(builder); return setupTestServer(builder);
} }
protected static RPC.Server setupTestServer( protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException {
RPC.Builder builder) throws IOException {
RPC.Server server = builder.build(); RPC.Server server = builder.build();
server.start(); server.start();
@ -176,21 +175,17 @@ public static class TestTokenIdentifier extends TokenIdentifier {
public TestTokenIdentifier() { public TestTokenIdentifier() {
this(new Text(), new Text()); this(new Text(), new Text());
} }
public TestTokenIdentifier(Text tokenid) { public TestTokenIdentifier(Text tokenid) {
this(tokenid, new Text()); this(tokenid, new Text());
} }
public TestTokenIdentifier(Text tokenid, Text realUser) { public TestTokenIdentifier(Text tokenid, Text realUser) {
this.tokenid = tokenid == null ? new Text() : tokenid; this.tokenid = tokenid == null ? new Text() : tokenid;
this.realUser = realUser == null ? new Text() : realUser; this.realUser = realUser == null ? new Text() : realUser;
} }
@Override @Override
public Text getKind() { public Text getKind() {
return KIND_NAME; return KIND_NAME;
} }
@Override @Override
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
if (realUser.toString().isEmpty()) { if (realUser.toString().isEmpty()) {
@ -208,7 +203,6 @@ public void readFields(DataInput in) throws IOException {
tokenid.readFields(in); tokenid.readFields(in);
realUser.readFields(in); realUser.readFields(in);
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
tokenid.write(out); tokenid.write(out);
@ -240,7 +234,7 @@ public static class TestTokenSelector implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public Token<TestTokenIdentifier> selectToken(Text service, public Token<TestTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) { Collection<Token<? extends TokenIdentifier>> tokens) {
if (service == null) { if (service == null) {
return null; return null;
} }
@ -394,17 +388,19 @@ public TestProtos.AuthMethodResponseProto getAuthMethod(
} }
@Override @Override
public TestProtos.UserResponseProto getAuthUser( public TestProtos.AuthUserResponseProto getAuthUser(
RpcController controller, TestProtos.EmptyRequestProto request) RpcController controller, TestProtos.EmptyRequestProto request)
throws ServiceException { throws ServiceException {
UserGroupInformation authUser; UserGroupInformation authUser = null;
try { try {
authUser = UserGroupInformation.getCurrentUser(); authUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
return newUserResponse(authUser.getUserName()); return TestProtos.AuthUserResponseProto.newBuilder()
.setAuthUser(authUser.getUserName())
.build();
} }
@Override @Override
@ -436,34 +432,6 @@ public TestProtos.EmptyResponseProto sendPostponed(
return TestProtos.EmptyResponseProto.newBuilder().build(); return TestProtos.EmptyResponseProto.newBuilder().build();
} }
@Override
public TestProtos.UserResponseProto getCurrentUser(
RpcController controller,
TestProtos.EmptyRequestProto request) throws ServiceException {
String user;
try {
user = UserGroupInformation.getCurrentUser().toString();
} catch (IOException e) {
throw new ServiceException("Failed to get current user", e);
}
return newUserResponse(user);
}
@Override
public TestProtos.UserResponseProto getServerRemoteUser(
RpcController controller,
TestProtos.EmptyRequestProto request) throws ServiceException {
String serverRemoteUser = Server.getRemoteUser().toString();
return newUserResponse(serverRemoteUser);
}
private TestProtos.UserResponseProto newUserResponse(String user) {
return TestProtos.UserResponseProto.newBuilder()
.setUser(user)
.build();
}
} }
protected static TestProtos.EmptyRequestProto newEmptyRequest() { protected static TestProtos.EmptyRequestProto newEmptyRequest() {
@ -510,4 +478,8 @@ protected static AuthMethod convert(
} }
return null; return null;
} }
protected static String convert(TestProtos.AuthUserResponseProto response) {
return response.getAuthUser();
}
} }

View File

@ -29,25 +29,12 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.*;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslPlainServer;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.TestUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.*;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -57,55 +44,30 @@
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import javax.security.auth.callback.Callback; import javax.security.auth.callback.*;
import javax.security.auth.callback.CallbackHandler; import javax.security.sasl.*;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.io.IOException; import java.io.IOException;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.security.Security; import java.security.Security;
import java.util.ArrayList; import java.util.*;
import java.util.Collection; import java.util.concurrent.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS; import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE; import static org.junit.Assert.*;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Unit tests for using Sasl over RPC. */ /** Unit tests for using Sasl over RPC. */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestSaslRPC extends TestRpcBase { public class TestSaslRPC extends TestRpcBase {
@Parameters @Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<>(); Collection<Object[]> params = new ArrayList<Object[]>();
for (QualityOfProtection qop : QualityOfProtection.values()) { for (QualityOfProtection qop : QualityOfProtection.values()) {
params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null }); params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null });
} }
@ -151,7 +113,7 @@ enum UseToken {
NONE(), NONE(),
VALID(), VALID(),
INVALID(), INVALID(),
OTHER() OTHER();
} }
@BeforeClass @BeforeClass
@ -267,7 +229,7 @@ public void testDigestRpcWithoutAnnotation() throws Exception {
final Server server = setupTestServer(conf, 5, sm); final Server server = setupTestServer(conf, 5, sm);
doDigestRpc(server, sm); doDigestRpc(server, sm);
} finally { } finally {
SecurityUtil.setSecurityInfoProviders(); SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
} }
} }
@ -296,7 +258,7 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm)
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName())); .getUserName()));
Token<TestTokenIdentifier> token = new Token<>(tokenId, sm); Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId, sm);
SecurityUtil.setTokenService(token, addr); SecurityUtil.setTokenService(token, addr);
current.addToken(token); current.addToken(token);
@ -324,8 +286,8 @@ public void testPingInterval() throws Exception {
// set doPing to true // set doPing to true
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
ConnectionId remoteId = ConnectionId.getConnectionId( ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0),
new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf); TestRpcService.class, null, 0, null, newConf);
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
remoteId.getPingInterval()); remoteId.getPingInterval());
// set doPing to false // set doPing to false
@ -834,13 +796,13 @@ private String internalGetAuthMethod(
final TestTokenSecretManager sm = new TestTokenSecretManager(); final TestTokenSecretManager sm = new TestTokenSecretManager();
boolean useSecretManager = (serverAuth != SIMPLE); boolean useSecretManager = (serverAuth != SIMPLE);
if (enableSecretManager != null) { if (enableSecretManager != null) {
useSecretManager &= enableSecretManager; useSecretManager &= enableSecretManager.booleanValue();
} }
if (forceSecretManager != null) { if (forceSecretManager != null) {
useSecretManager |= forceSecretManager; useSecretManager |= forceSecretManager.booleanValue();
} }
final SecretManager<?> serverSm = useSecretManager ? sm : null; final SecretManager<?> serverSm = useSecretManager ? sm : null;
Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() { Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
@Override @Override
public Server run() throws IOException { public Server run() throws IOException {
@ -895,13 +857,13 @@ public String run() throws IOException {
proxy.ping(null, newEmptyRequest()); proxy.ping(null, newEmptyRequest());
// make sure the other side thinks we are who we said we are!!! // make sure the other side thinks we are who we said we are!!!
assertEquals(clientUgi.getUserName(), assertEquals(clientUgi.getUserName(),
proxy.getAuthUser(null, newEmptyRequest()).getUser()); convert(proxy.getAuthUser(null, newEmptyRequest())));
AuthMethod authMethod = AuthMethod authMethod =
convert(proxy.getAuthMethod(null, newEmptyRequest())); convert(proxy.getAuthMethod(null, newEmptyRequest()));
// verify sasl completed with correct QOP // verify sasl completed with correct QOP
assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null, assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
RPC.getConnectionIdForProxy(proxy).getSaslQop()); RPC.getConnectionIdForProxy(proxy).getSaslQop());
return authMethod != null ? authMethod.toString() : null; return authMethod.toString();
} catch (ServiceException se) { } catch (ServiceException se) {
if (se.getCause() instanceof RemoteException) { if (se.getCause() instanceof RemoteException) {
throw (RemoteException) se.getCause(); throw (RemoteException) se.getCause();
@ -926,18 +888,21 @@ private static void assertAuthEquals(AuthMethod expect,
String actual) { String actual) {
assertEquals(expect.toString(), actual); assertEquals(expect.toString(), actual);
} }
private static void assertAuthEquals(Pattern expect, String actual) { private static void assertAuthEquals(Pattern expect,
String actual) {
// this allows us to see the regexp and the value it didn't match // this allows us to see the regexp and the value it didn't match
if (!expect.matcher(actual).matches()) { if (!expect.matcher(actual).matches()) {
fail(); // it failed assertEquals(expect, actual); // it failed
} else {
assertTrue(true); // it matched
} }
} }
/* /*
* Class used to test overriding QOP values using SaslPropertiesResolver * Class used to test overriding QOP values using SaslPropertiesResolver
*/ */
static class AuthSaslPropertiesResolver extends SaslPropertiesResolver { static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{
@Override @Override
public Map<String, String> getServerProperties(InetAddress address) { public Map<String, String> getServerProperties(InetAddress address) {
@ -946,7 +911,7 @@ public Map<String, String> getServerProperties(InetAddress address) {
return newPropertes; return newPropertes;
} }
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
System.out.println("Testing Kerberos authentication over RPC"); System.out.println("Testing Kerberos authentication over RPC");
if (args.length != 2) { if (args.length != 2) {

View File

@ -17,35 +17,40 @@
*/ */
package org.apache.hadoop.security; package org.apache.hadoop.security;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRpcBase;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
/** /**
* Test do as effective user. *
*/ */
public class TestDoAsEffectiveUser extends TestRpcBase { public class TestDoAsEffectiveUser {
final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG"; final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG";
final private static String REAL_USER_SHORT_NAME = "realUser1"; final private static String REAL_USER_SHORT_NAME = "realUser1";
final private static String PROXY_USER_NAME = "proxyUser"; final private static String PROXY_USER_NAME = "proxyUser";
@ -53,8 +58,8 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
final private static String GROUP2_NAME = "group2"; final private static String GROUP2_NAME = "group2";
final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME, final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME,
GROUP2_NAME }; GROUP2_NAME };
private static final String ADDRESS = "0.0.0.0";
private TestRpcService client; private TestProtocol proxy;
private static final Configuration masterConf = new Configuration(); private static final Configuration masterConf = new Configuration();
@ -77,7 +82,7 @@ public void setMasterConf() throws IOException {
private void configureSuperUserIPAddresses(Configuration conf, private void configureSuperUserIPAddresses(Configuration conf,
String superUserShortName) throws IOException { String superUserShortName) throws IOException {
ArrayList<String> ipList = new ArrayList<>(); ArrayList<String> ipList = new ArrayList<String>();
Enumeration<NetworkInterface> netInterfaceList = NetworkInterface Enumeration<NetworkInterface> netInterfaceList = NetworkInterface
.getNetworkInterfaces(); .getNetworkInterfaces();
while (netInterfaceList.hasMoreElements()) { while (netInterfaceList.hasMoreElements()) {
@ -125,19 +130,50 @@ public UserGroupInformation run() throws IOException {
curUGI.toString()); curUGI.toString());
} }
private void checkRemoteUgi(final UserGroupInformation ugi, @TokenInfo(TestTokenSelector.class)
final Configuration conf) throws Exception { public interface TestProtocol extends VersionedProtocol {
public static final long versionID = 1L;
String aMethod() throws IOException;
String getServerRemoteUser() throws IOException;
}
public class TestImpl implements TestProtocol {
@Override
public String aMethod() throws IOException {
return UserGroupInformation.getCurrentUser().toString();
}
@Override
public String getServerRemoteUser() throws IOException {
return Server.getRemoteUser().toString();
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return TestProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(TestProtocol.versionID, null);
}
}
private void checkRemoteUgi(final Server server,
final UserGroupInformation ugi, final Configuration conf)
throws Exception {
ugi.doAs(new PrivilegedExceptionAction<Void>() { ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override @Override
public Void run() throws ServiceException { public Void run() throws IOException {
client = getClient(addr, conf); proxy = RPC.getProxy(
String currentUser = client.getCurrentUser(null, TestProtocol.class, TestProtocol.versionID,
newEmptyRequest()).getUser(); NetUtils.getConnectAddress(server), conf);
String serverRemoteUser = client.getServerRemoteUser(null, Assert.assertEquals(ugi.toString(), proxy.aMethod());
newEmptyRequest()).getUser(); Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser());
Assert.assertEquals(ugi.toString(), currentUser);
Assert.assertEquals(ugi.toString(), serverRemoteUser);
return null; return null;
} }
}); });
@ -149,27 +185,29 @@ public void testRealUserSetup() throws IOException {
conf.setStrings(DefaultImpersonationProvider.getTestProvider(). conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
// Set RPC engine to protobuf RPC engine Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
RPC.setProtocolEngine(conf, TestRpcService.class, .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
ProtobufRpcEngine.class); .setNumHandlers(5).setVerbose(true).build();
UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5);
refreshConf(conf); refreshConf(conf);
try { try {
server.start();
UserGroupInformation realUserUgi = UserGroupInformation UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME); .createRemoteUser(REAL_USER_NAME);
checkRemoteUgi(realUserUgi, conf); checkRemoteUgi(server, realUserUgi, conf);
UserGroupInformation proxyUserUgi = UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
UserGroupInformation.createProxyUserForTesting(
PROXY_USER_NAME, realUserUgi, GROUP_NAMES); PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
checkRemoteUgi(proxyUserUgi, conf); checkRemoteUgi(server, proxyUserUgi, conf);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Assert.fail(); Assert.fail();
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
@ -180,25 +218,29 @@ public void testRealUserAuthorizationSuccess() throws IOException {
conf.setStrings(DefaultImpersonationProvider.getTestProvider(). conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1"); "group1");
RPC.setProtocolEngine(conf, TestRpcService.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
ProtobufRpcEngine.class); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
UserGroupInformation.setConfiguration(conf); .setNumHandlers(2).setVerbose(false).build();
final Server server = setupTestServer(conf, 5);
refreshConf(conf); refreshConf(conf);
try { try {
server.start();
UserGroupInformation realUserUgi = UserGroupInformation UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME); .createRemoteUser(REAL_USER_NAME);
checkRemoteUgi(realUserUgi, conf); checkRemoteUgi(server, realUserUgi, conf);
UserGroupInformation proxyUserUgi = UserGroupInformation UserGroupInformation proxyUserUgi = UserGroupInformation
.createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES); .createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
checkRemoteUgi(proxyUserUgi, conf); checkRemoteUgi(server, proxyUserUgi, conf);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Assert.fail(); Assert.fail();
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
@ -214,14 +256,17 @@ public void testRealUserIPAuthorizationFailure() throws IOException {
conf.setStrings(DefaultImpersonationProvider.getTestProvider(). conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1"); "group1");
RPC.setProtocolEngine(conf, TestRpcService.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
ProtobufRpcEngine.class); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
UserGroupInformation.setConfiguration(conf); .setNumHandlers(2).setVerbose(false).build();
final Server server = setupTestServer(conf, 5);
refreshConf(conf); refreshConf(conf);
try { try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME); .createRemoteUser(REAL_USER_NAME);
@ -230,10 +275,11 @@ public void testRealUserIPAuthorizationFailure() throws IOException {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
@Override @Override
public String run() throws ServiceException { public String run() throws IOException {
client = getClient(addr, conf); proxy = RPC.getProxy(TestProtocol.class,
return client.getCurrentUser(null, TestProtocol.versionID, addr, conf);
newEmptyRequest()).getUser(); String ret = proxy.aMethod();
return ret;
} }
}); });
@ -241,7 +287,10 @@ public String run() throws ServiceException {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
@ -250,14 +299,17 @@ public void testRealUserIPNotSpecified() throws IOException {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
conf.setStrings(DefaultImpersonationProvider.getTestProvider(). conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
RPC.setProtocolEngine(conf, TestRpcService.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
ProtobufRpcEngine.class); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
UserGroupInformation.setConfiguration(conf); .setNumHandlers(2).setVerbose(false).build();
final Server server = setupTestServer(conf, 2);
refreshConf(conf); refreshConf(conf);
try { try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME); .createRemoteUser(REAL_USER_NAME);
@ -266,10 +318,11 @@ public void testRealUserIPNotSpecified() throws IOException {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
@Override @Override
public String run() throws ServiceException { public String run() throws IOException {
client = getClient(addr, conf); proxy = RPC.getProxy(TestProtocol.class,
return client.getCurrentUser(null, TestProtocol.versionID, addr, conf);
newEmptyRequest()).getUser(); String ret = proxy.aMethod();
return ret;
} }
}); });
@ -277,7 +330,10 @@ public String run() throws ServiceException {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
@ -285,12 +341,15 @@ public String run() throws ServiceException {
public void testRealUserGroupNotSpecified() throws IOException { public void testRealUserGroupNotSpecified() throws IOException {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
RPC.setProtocolEngine(conf, TestRpcService.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
ProtobufRpcEngine.class); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
UserGroupInformation.setConfiguration(conf); .setNumHandlers(2).setVerbose(false).build();
final Server server = setupTestServer(conf, 2);
try { try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME); .createRemoteUser(REAL_USER_NAME);
@ -299,10 +358,11 @@ public void testRealUserGroupNotSpecified() throws IOException {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
@Override @Override
public String run() throws ServiceException { public String run() throws IOException {
client = getClient(addr, conf); proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
return client.getCurrentUser(null, TestProtocol.versionID, addr, conf);
newEmptyRequest()).getUser(); String ret = proxy.aMethod();
return ret;
} }
}); });
@ -310,7 +370,10 @@ public String run() throws ServiceException {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
@ -321,14 +384,17 @@ public void testRealUserGroupAuthorizationFailure() throws IOException {
conf.setStrings(DefaultImpersonationProvider.getTestProvider(). conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group3"); "group3");
RPC.setProtocolEngine(conf, TestRpcService.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
ProtobufRpcEngine.class); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
UserGroupInformation.setConfiguration(conf); .setNumHandlers(2).setVerbose(false).build();
final Server server = setupTestServer(conf, 2);
refreshConf(conf); refreshConf(conf);
try { try {
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
UserGroupInformation realUserUgi = UserGroupInformation UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME); .createRemoteUser(REAL_USER_NAME);
@ -337,10 +403,11 @@ public void testRealUserGroupAuthorizationFailure() throws IOException {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
@Override @Override
public String run() throws ServiceException { public String run() throws IOException {
client = getClient(addr, conf); proxy = RPC.getProxy(TestProtocol.class,
return client.getCurrentUser(null, TestProtocol.versionID, addr, conf);
newEmptyRequest()).getUser(); String ret = proxy.aMethod();
return ret;
} }
}); });
@ -348,7 +415,10 @@ public String run() throws ServiceException {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
@ -362,17 +432,20 @@ public void testProxyWithToken() throws Exception {
final Configuration conf = new Configuration(masterConf); final Configuration conf = new Configuration(masterConf);
TestTokenSecretManager sm = new TestTokenSecretManager(); TestTokenSecretManager sm = new TestTokenSecretManager();
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
final Server server = setupTestServer(conf, 5, sm); final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
server.start();
final UserGroupInformation current = UserGroupInformation final UserGroupInformation current = UserGroupInformation
.createRemoteUser(REAL_USER_NAME); .createRemoteUser(REAL_USER_NAME);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()), new Text("SomeSuperUser")); .getUserName()), new Text("SomeSuperUser"));
Token<TestTokenIdentifier> token = new Token<>(tokenId, Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
sm); sm);
SecurityUtil.setTokenService(token, addr); SecurityUtil.setTokenService(token, addr);
UserGroupInformation proxyUserUgi = UserGroupInformation UserGroupInformation proxyUserUgi = UserGroupInformation
@ -380,19 +453,23 @@ public void testProxyWithToken() throws Exception {
proxyUserUgi.addToken(token); proxyUserUgi.addToken(token);
refreshConf(conf); refreshConf(conf);
String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction<String>() { String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override @Override
public String run() throws Exception { public String run() throws Exception {
try { try {
client = getClient(addr, conf); proxy = RPC.getProxy(TestProtocol.class,
return client.getCurrentUser(null, TestProtocol.versionID, addr, conf);
newEmptyRequest()).getUser(); String ret = proxy.aMethod();
return ret;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
throw e; throw e;
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
}); });
@ -409,34 +486,42 @@ public void testTokenBySuperUser() throws Exception {
TestTokenSecretManager sm = new TestTokenSecretManager(); TestTokenSecretManager sm = new TestTokenSecretManager();
final Configuration newConf = new Configuration(masterConf); final Configuration newConf = new Configuration(masterConf);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(newConf, TestRpcService.class,
ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(newConf); UserGroupInformation.setConfiguration(newConf);
final Server server = setupTestServer(newConf, 5, sm); final Server server = new RPC.Builder(newConf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
server.start();
final UserGroupInformation current = UserGroupInformation final UserGroupInformation current = UserGroupInformation
.createUserForTesting(REAL_USER_NAME, GROUP_NAMES); .createUserForTesting(REAL_USER_NAME, GROUP_NAMES);
refreshConf(newConf); refreshConf(newConf);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()), new Text("SomeSuperUser")); .getUserName()), new Text("SomeSuperUser"));
Token<TestTokenIdentifier> token = new Token<>(tokenId, sm); Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
sm);
SecurityUtil.setTokenService(token, addr); SecurityUtil.setTokenService(token, addr);
current.addToken(token); current.addToken(token);
String retVal = current.doAs(new PrivilegedExceptionAction<String>() { String retVal = current.doAs(new PrivilegedExceptionAction<String>() {
@Override @Override
public String run() throws Exception { public String run() throws Exception {
try { try {
client = getClient(addr, newConf); proxy = RPC.getProxy(TestProtocol.class,
return client.getCurrentUser(null, TestProtocol.versionID, addr, newConf);
newEmptyRequest()).getUser(); String ret = proxy.aMethod();
return ret;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
throw e; throw e;
} finally { } finally {
stop(server, client); server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
} }
} }
}); });

View File

@ -20,7 +20,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -29,11 +28,7 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.After; import org.junit.*;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.security.auth.Subject; import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.auth.kerberos.KerberosPrincipal;
@ -53,22 +48,9 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS; import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.ipc.TestSaslRPC.*;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; import static org.junit.Assert.*;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -125,7 +107,7 @@ public void resetUgi() {
UserGroupInformation.setLoginUser(null); UserGroupInformation.setLoginUser(null);
} }
@Test(timeout = 30000) @Test (timeout = 30000)
public void testSimpleLogin() throws IOException { public void testSimpleLogin() throws IOException {
tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true); tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true);
} }

View File

@ -88,6 +88,6 @@ message AuthMethodResponseProto {
required string mechanismName = 2; required string mechanismName = 2;
} }
message UserResponseProto { message AuthUserResponseProto {
required string user = 1; required string authUser = 1;
} }

View File

@ -40,11 +40,9 @@ service TestProtobufRpcProto {
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
rpc sleep(SleepRequestProto) returns (EmptyResponseProto); rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto); rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto); rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto);
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto); rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto); rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto);
rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto);
rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto);
} }
service TestProtobufRpc2Proto { service TestProtobufRpc2Proto {

View File

@ -168,6 +168,7 @@
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
@ -316,6 +317,8 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
new TraceAdminProtocolServerSideTranslatorPB(this); new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator); .newReflectiveBlockingService(traceAdminXlator);
WritableRpcEngine.ensureInitialized();
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) { if (serviceRpcAddr != null) {

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.security;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.mockito.Mockito.mock;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Test;
/** Unit tests for using Delegation Token over RPC. */
public class TestClientProtocolWithDelegationToken {
private static final String ADDRESS = "0.0.0.0";
public static final Log LOG = LogFactory
.getLog(TestClientProtocolWithDelegationToken.class);
private static final Configuration conf;
static {
conf = new Configuration();
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
}
static {
GenericTestUtils.setLogLevel(Client.LOG, Level.ALL);
GenericTestUtils.setLogLevel(Server.LOG, Level.ALL);
GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL);
GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL);
GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL);
}
@Test
public void testDelegationTokenRpc() throws Exception {
ClientProtocol mockNN = mock(ClientProtocol.class);
FSNamesystem mockNameSys = mock(FSNamesystem.class);
DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
3600000, mockNameSys);
sm.startThreads();
final Server server = new RPC.Builder(conf)
.setProtocol(ClientProtocol.class).setInstance(mockNN)
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
server.start();
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
String user = current.getUserName();
Text owner = new Text(user);
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null);
Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
dtId, sm);
SecurityUtil.setTokenService(token, addr);
LOG.info("Service for token is " + token.getService());
current.addToken(token);
current.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
ClientProtocol proxy = null;
try {
proxy = RPC.getProxy(ClientProtocol.class,
ClientProtocol.versionID, addr, conf);
proxy.getServerDefaults();
} finally {
server.stop();
if (proxy != null) {
RPC.stopProxy(proxy);
}
}
return null;
}
});
}
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
@ -97,6 +98,8 @@ public void serviceInit(Configuration conf) throws Exception {
BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService
.newReflectiveBlockingService(refreshHSAdminProtocolXlator); .newReflectiveBlockingService(refreshHSAdminProtocolXlator);
WritableRpcEngine.ensureInitialized();
clientRpcAddress = conf.getSocketAddr( clientRpcAddress = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.JHS_ADMIN_ADDRESS, JHAdminConfig.JHS_ADMIN_ADDRESS,