Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-20 10:34:45 -07:00
commit 09dab88d3e
59 changed files with 1557 additions and 287 deletions

View File

@ -197,6 +197,8 @@ public class ZKSignerSecretProvider extends RolloverSignerSecretProvider {
client = (CuratorFramework) curatorClientObj;
} else {
client = createCuratorClient(config);
servletContext.setAttribute(
ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE, client);
}
this.tokenValidity = tokenValidity;
shouldDisconnect = Boolean.parseBoolean(

View File

@ -535,6 +535,11 @@ Release 2.6.0 - UNRELEASED
HADOOP-11106. Document considerations of HAR and Encryption. (clamb via wang)
HADOOP-10970. Cleanup KMS configuration keys. (wang)
HADOOP-11017. KMS delegation token secret manager should be able to use
zookeeper as store. (asuresh via tucu)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@ -589,6 +594,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-10833. Remove unused cache in UserProvider. (Benoy Antony)
HADOOP-11112. TestKMSWithZK does not use KEY_PROVIDER_URI. (tucu via wang)
BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry
@ -838,6 +845,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11109. Site build is broken. (Jian He via atm)
HADOOP-10946. Fix a bunch of typos in log messages (Ray Chiang via aw)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -218,6 +218,19 @@
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>

View File

@ -41,7 +41,7 @@ public class VersionMismatchException extends IOException {
/** Returns a string representation of this object. */
@Override
public String toString(){
return "A record version mismatch occured. Expecting v"
return "A record version mismatch occurred. Expecting v"
+ expectedVersion + ", found v" + foundVersion;
}
}

View File

@ -687,7 +687,8 @@ public class Client {
* a header to the server and starts
* the connection thread that waits for responses.
*/
private synchronized void setupIOstreams() {
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
@ -738,11 +739,18 @@ public class Client {
remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
} else if (UserGroupInformation.isSecurityEnabled() &&
!fallbackAllowed) {
throw new IOException("Server asks us to fall back to SIMPLE " +
"auth, but this client is configured to only allow secure " +
"connections.");
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(false);
}
} else if (UserGroupInformation.isSecurityEnabled()) {
if (!fallbackAllowed) {
throw new IOException("Server asks us to fall back to SIMPLE " +
"auth, but this client is configured to only allow secure " +
"connections.");
}
if (fallbackToSimpleAuth != null) {
fallbackToSimpleAuth.set(true);
}
}
}
@ -1375,6 +1383,26 @@ public class Client {
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc respond.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
fallbackToSimpleAuth);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
@ -1386,8 +1414,29 @@ public class Client {
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass) throws IOException {
return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call, serviceClass);
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
@ -1444,7 +1493,8 @@ public class Client {
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
Call call, int serviceClass) throws IOException {
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
@ -1468,7 +1518,7 @@ public class Client {
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}

View File

@ -27,6 +27,7 @@ import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@ -84,14 +85,23 @@ public class ProtobufRpcEngine implements RpcEngine {
}
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null);
}
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy);
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
@ -115,13 +125,16 @@ public class ProtobufRpcEngine implements RpcEngine {
private final Client client;
private final long clientProtocolVersion;
private final String protocolName;
private AtomicBoolean fallbackToSimpleAuth;
private Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
/**
@ -217,7 +230,8 @@ public class ProtobufRpcEngine implements RpcEngine {
final RpcResponseWrapper val;
try {
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) {

View File

@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@ -524,6 +525,7 @@ public class RPC {
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @param connectionRetryPolicy retry policy
* @return the proxy
* @throws IOException if any error occurs
*/
@ -535,11 +537,43 @@ public class RPC {
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, ticket,
conf, factory, rpcTimeout, connectionRetryPolicy, null);
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @param connectionRetryPolicy retry policy
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
}
/**

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@ -43,6 +44,14 @@ public interface RpcEngine {
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException;
/** Construct a client-side proxy object. */
<T> ProtocolProxy<T> getProxy(Class<T> protocol,
long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException;
/**
* Construct a server for a protocol implementation instance.
*

View File

@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.io.*;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@ -212,14 +213,17 @@ public class WritableRpcEngine implements RpcEngine {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
private final AtomicBoolean fallbackToSimpleAuth;
public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
@Override
@ -238,7 +242,8 @@ public class WritableRpcEngine implements RpcEngine {
ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
@ -275,11 +280,25 @@ public class WritableRpcEngine implements RpcEngine {
* talking to a server at the named address.
* @param <T>*/
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy)
throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null);
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
* @param <T>*/
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (connectionRetryPolicy != null) {
@ -289,7 +308,7 @@ public class WritableRpcEngine implements RpcEngine {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout));
factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy<T>(protocol, proxy, true);
}

View File

@ -160,7 +160,7 @@ public abstract class MetricsDynamicMBeanBase implements DynamicMBean {
else if (attributeName.endsWith(MAX_TIME))
return or.getMaxTime();
else {
MetricsUtil.LOG.error("Unexpected attrubute suffix");
MetricsUtil.LOG.error("Unexpected attribute suffix");
throw new AttributeNotFoundException();
}
} else {

View File

@ -127,7 +127,7 @@ extends AbstractDelegationTokenIdentifier>
public synchronized void reset() {
currentId = 0;
allKeys.clear();
delegationTokenSequenceNumber = 0;
setDelegationTokenSeqNum(0);
currentTokens.clear();
}
@ -141,7 +141,7 @@ extends AbstractDelegationTokenIdentifier>
if (key.getKeyId() > currentId) {
currentId = key.getKeyId();
}
allKeys.put(key.getKeyId(), key);
storeDelegationKey(key);
}
public synchronized DelegationKey[] getAllKeys() {
@ -163,24 +163,108 @@ extends AbstractDelegationTokenIdentifier>
return;
}
// for ZK based secretManager
protected void updateMasterKey(DelegationKey key) throws IOException{
return;
}
// RM
protected void removeStoredMasterKey(DelegationKey key) {
return;
}
// RM
protected void storeNewToken(TokenIdent ident, long renewDate) {
protected void storeNewToken(TokenIdent ident, long renewDate) throws IOException{
return;
}
// RM
protected void removeStoredToken(TokenIdent ident) throws IOException {
}
// RM
protected void updateStoredToken(TokenIdent ident, long renewDate) {
protected void updateStoredToken(TokenIdent ident, long renewDate) throws IOException {
return;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected int getDelegationTokenSeqNum() {
return delegationTokenSequenceNumber;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected int incrementDelegationTokenSeqNum() {
return ++delegationTokenSequenceNumber;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected void setDelegationTokenSeqNum(int seqNum) {
delegationTokenSequenceNumber = seqNum;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected DelegationKey getDelegationKey(int keyId) {
return allKeys.get(keyId);
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected void storeDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
storeNewMasterKey(key);
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected void updateDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
updateMasterKey(key);
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
return currentTokens.get(ident);
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected void storeToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
currentTokens.put(ident, tokenInfo);
storeNewToken(ident, tokenInfo.getRenewDate());
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected void updateToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
currentTokens.put(ident, tokenInfo);
updateStoredToken(ident, tokenInfo.getRenewDate());
}
/**
* This method is intended to be used for recovering persisted delegation
* tokens
@ -196,17 +280,18 @@ extends AbstractDelegationTokenIdentifier>
"Can't add persisted delegation token to a running SecretManager.");
}
int keyId = identifier.getMasterKeyId();
DelegationKey dKey = allKeys.get(keyId);
DelegationKey dKey = getDelegationKey(keyId);
if (dKey == null) {
LOG.warn("No KEY found for persisted identifier " + identifier.toString());
return;
}
byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) {
this.delegationTokenSequenceNumber = identifier.getSequenceNumber();
int delegationTokenSeqNum = getDelegationTokenSeqNum();
if (identifier.getSequenceNumber() > delegationTokenSeqNum) {
setDelegationTokenSeqNum(identifier.getSequenceNumber());
}
if (currentTokens.get(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
if (getTokenInfo(identifier) == null) {
storeToken(identifier, new DelegationTokenInformation(renewDate,
password, getTrackingIdIfEnabled(identifier)));
} else {
throw new IOException("Same delegation token being added twice.");
@ -234,7 +319,7 @@ extends AbstractDelegationTokenIdentifier>
synchronized (this) {
currentId = newKey.getKeyId();
currentKey = newKey;
allKeys.put(currentKey.getKeyId(), currentKey);
storeDelegationKey(currentKey);
}
}
@ -252,7 +337,7 @@ extends AbstractDelegationTokenIdentifier>
* updateMasterKey() isn't called at expected interval. Add it back to
* allKeys just in case.
*/
allKeys.put(currentKey.getKeyId(), currentKey);
updateDelegationKey(currentKey);
}
updateCurrentKey();
}
@ -276,19 +361,25 @@ extends AbstractDelegationTokenIdentifier>
protected synchronized byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();
sequenceNum = ++delegationTokenSequenceNumber;
sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentId);
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + identifier);
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
storeNewToken(identifier, now + tokenRenewInterval);
currentTokens.put(identifier, new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
storeToken(identifier, tokenInfo);
} catch (IOException ioe) {
LOG.error("Could not store token !!", ioe);
}
return password;
}
/**
* Find the DelegationTokenInformation for the given token id, and verify that
* if the token is expired. Note that this method should be called with
@ -297,7 +388,7 @@ extends AbstractDelegationTokenIdentifier>
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = currentTokens.get(identifier);
DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
throw new InvalidToken("token (" + identifier.toString()
+ ") can't be found in cache");
@ -322,7 +413,7 @@ extends AbstractDelegationTokenIdentifier>
}
public synchronized String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = currentTokens.get(identifier);
DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
return null;
}
@ -373,7 +464,7 @@ extends AbstractDelegationTokenIdentifier>
throw new AccessControlException(renewer +
" tries to renew a token with renewer " + id.getRenewer());
}
DelegationKey key = allKeys.get(id.getMasterKeyId());
DelegationKey key = getDelegationKey(id.getMasterKeyId());
if (key == null) {
throw new InvalidToken("Unable to find master key for keyId="
+ id.getMasterKeyId()
@ -390,11 +481,10 @@ extends AbstractDelegationTokenIdentifier>
DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
password, trackingId);
if (currentTokens.get(id) == null) {
if (getTokenInfo(id) == null) {
throw new InvalidToken("Renewal request for unknown token");
}
currentTokens.put(id, info);
updateStoredToken(id, renewTime);
updateToken(id, info);
return renewTime;
}

View File

@ -0,0 +1,727 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token.delegation;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* An implementation of {@link AbstractDelegationTokenSecretManager} that
* persists TokenIdentifiers and DelegationKeys in Zookeeper. This class can
* be used by HA (Highly available) services that consists of multiple nodes.
* This class ensures that Identifiers and Keys are replicated to all nodes of
* the service.
*/
@InterfaceAudience.Private
public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
extends AbstractDelegationTokenSecretManager<TokenIdent> {
private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
+ "zkNumRetries";
public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
+ "zkSessionTimeout";
public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX
+ "zkConnectionTimeout";
public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX
+ "znodeWorkingPath";
public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX
+ "zkAuthType";
public static final String ZK_DTSM_ZK_CONNECTION_STRING = ZK_CONF_PREFIX
+ "zkConnectionString";
public static final String ZK_DTSM_ZK_KERBEROS_KEYTAB = ZK_CONF_PREFIX
+ "kerberos.keytab";
public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
+ "kerberos.principal";
public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
private static Logger LOG = LoggerFactory
.getLogger(ZKDelegationTokenSecretManager.class);
private static final String JAAS_LOGIN_ENTRY_NAME =
"ZKDelegationTokenSecretManagerClient";
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
private static final String DELEGATION_KEY_PREFIX = "DK_";
private static final String DELEGATION_TOKEN_PREFIX = "DT_";
private static final ThreadLocal<CuratorFramework> CURATOR_TL =
new ThreadLocal<CuratorFramework>();
public static void setCurator(CuratorFramework curator) {
CURATOR_TL.set(curator);
}
private final boolean isExternalClient;
private final CuratorFramework zkClient;
private SharedCount seqCounter;
private PathChildrenCache keyCache;
private PathChildrenCache tokenCache;
private ExecutorService listenerThreadPool;
public ZKDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.MAX_LIFETIME,
DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000),
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
if (CURATOR_TL.get() != null) {
zkClient = CURATOR_TL.get();
isExternalClient = true;
} else {
String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING);
Preconditions.checkNotNull(connString,
"Zookeeper connection string cannot be null");
String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE);
// AuthType has to be explicitly set to 'none' or 'sasl'
Preconditions.checkNotNull(authType, "Zookeeper authType cannot be null !!");
Preconditions.checkArgument(
authType.equals("sasl") || authType.equals("none"),
"Zookeeper authType must be one of [none, sasl]");
Builder builder = null;
try {
ACLProvider aclProvider = null;
if (authType.equals("sasl")) {
LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
+ "and using 'sasl' ACLs");
String principal = setJaasConfiguration(conf);
System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
JAAS_LOGIN_ENTRY_NAME);
System.setProperty("zookeeper.authProvider.1",
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
aclProvider = new SASLOwnerACLProvider(principal);
} else { // "none"
LOG.info("Connecting to ZooKeeper without authentication");
aclProvider = new DefaultACLProvider(); // open to everyone
}
int sessionT =
conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT,
ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT);
int numRetries =
conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, ZK_DTSM_ZK_NUM_RETRIES_DEFAULT);
builder =
CuratorFrameworkFactory
.builder()
.aclProvider(aclProvider)
.namespace(
conf.get(ZK_DTSM_ZNODE_WORKING_PATH,
ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT)
+ "/"
+ ZK_DTSM_NAMESPACE
)
.sessionTimeoutMs(sessionT)
.connectionTimeoutMs(
conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT,
ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT)
)
.retryPolicy(
new RetryNTimes(numRetries, sessionT / numRetries));
} catch (Exception ex) {
throw new RuntimeException("Could not Load ZK acls or auth");
}
zkClient = builder.ensembleProvider(new FixedEnsembleProvider(connString))
.build();
isExternalClient = false;
}
listenerThreadPool = Executors.newFixedThreadPool(2);
}
private String setJaasConfiguration(Configuration config) throws Exception {
String keytabFile =
config.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim();
if (keytabFile == null || keytabFile.length() == 0) {
throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_KEYTAB
+ " must be specified");
}
String principal =
config.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim();
if (principal == null || principal.length() == 0) {
throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_PRINCIPAL
+ " must be specified");
}
JaasConfiguration jConf =
new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile);
javax.security.auth.login.Configuration.setConfiguration(jConf);
return principal.split("[/@]")[0];
}
/**
* Creates a programmatic version of a jaas.conf file. This can be used
* instead of writing a jaas.conf file and setting the system property,
* "java.security.auth.login.config", to point to that file. It is meant to be
* used for connecting to ZooKeeper.
*/
@InterfaceAudience.Private
public static class JaasConfiguration extends
javax.security.auth.login.Configuration {
private static AppConfigurationEntry[] entry;
private String entryName;
/**
* Add an entry to the jaas configuration with the passed in name,
* principal, and keytab. The other necessary options will be set for you.
*
* @param entryName
* The name of the entry (e.g. "Client")
* @param principal
* The principal of the user
* @param keytab
* The location of the keytab
*/
public JaasConfiguration(String entryName, String principal, String keytab) {
this.entryName = entryName;
Map<String, String> options = new HashMap<String, String>();
options.put("keyTab", keytab);
options.put("principal", principal);
options.put("useKeyTab", "true");
options.put("storeKey", "true");
options.put("useTicketCache", "false");
options.put("refreshKrb5Config", "true");
String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG");
if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
options.put("debug", "true");
}
entry = new AppConfigurationEntry[] {
new AppConfigurationEntry(getKrb5LoginModuleName(),
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
options) };
}
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return (entryName.equals(name)) ? entry : null;
}
private String getKrb5LoginModuleName() {
String krb5LoginModuleName;
if (System.getProperty("java.vendor").contains("IBM")) {
krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
} else {
krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
}
return krb5LoginModuleName;
}
}
@Override
public void startThreads() throws IOException {
if (!isExternalClient) {
try {
zkClient.start();
} catch (Exception e) {
throw new IOException("Could not start Curator Framework", e);
}
}
try {
seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (seqCounter != null) {
seqCounter.start();
}
} catch (Exception e) {
throw new IOException("Could not start Sequence Counter", e);
}
try {
createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
createPersistentNode(ZK_DTSM_TOKENS_ROOT);
} catch (Exception e) {
throw new RuntimeException("Could not create ZK paths");
}
try {
keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
if (keyCache != null) {
keyCache.start(StartMode.POST_INITIALIZED_EVENT);
keyCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processKeyRemoved(event.getData().getPath());
break;
default:
break;
}
}
}, listenerThreadPool);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for keys", e);
}
try {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
if (tokenCache != null) {
tokenCache.start(StartMode.POST_INITIALIZED_EVENT);
tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processTokenRemoved(event.getData().getData());
break;
default:
break;
}
}
}, listenerThreadPool);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for tokens", e);
}
super.startThreads();
}
private void processKeyAddOrUpdate(byte[] data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey();
key.readFields(din);
allKeys.put(key.getKeyId(), key);
}
private void processKeyRemoved(String path) {
int i = path.lastIndexOf('/');
if (i > 0) {
String tokSeg = path.substring(i + 1);
int j = tokSeg.indexOf('_');
if (j > 0) {
int keyId = Integer.parseInt(tokSeg.substring(j + 1));
allKeys.remove(keyId);
}
}
}
private void processTokenAddOrUpdate(byte[] data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
long renewDate = din.readLong();
int pwdLen = din.readInt();
byte[] password = new byte[pwdLen];
int numRead = din.read(password, 0, pwdLen);
if (numRead > -1) {
DelegationTokenInformation tokenInfo =
new DelegationTokenInformation(renewDate, password);
currentTokens.put(ident, tokenInfo);
}
}
private void processTokenRemoved(byte[] data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier();
ident.readFields(din);
currentTokens.remove(ident);
}
@Override
public void stopThreads() {
try {
if (!isExternalClient && (zkClient != null)) {
zkClient.close();
}
if (seqCounter != null) {
seqCounter.close();
}
if (keyCache != null) {
keyCache.close();
}
if (tokenCache != null) {
tokenCache.close();
}
} catch (Exception e) {
LOG.error("Could not stop Curator Framework", e);
// Ignore
}
super.stopThreads();
}
private void createPersistentNode(String nodePath) throws Exception {
try {
zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);
} catch (KeeperException.NodeExistsException ne) {
LOG.debug(nodePath + " znode already exists !!");
} catch (Exception e) {
throw new IOException(nodePath + " znode could not be created !!", e);
}
}
@Override
protected int getDelegationTokenSeqNum() {
return seqCounter.getCount();
}
@Override
protected int incrementDelegationTokenSeqNum() {
try {
while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) {
}
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
return seqCounter.getCount();
}
@Override
protected void setDelegationTokenSeqNum(int seqNum) {
delegationTokenSequenceNumber = seqNum;
}
@Override
protected DelegationKey getDelegationKey(int keyId) {
// First check if its I already have this key
DelegationKey key = allKeys.get(keyId);
// Then query ZK
if (key == null) {
try {
key = getKeyFromZK(keyId);
if (key != null) {
allKeys.put(keyId, key);
}
} catch (IOException e) {
LOG.error("Error retrieving key [" + keyId + "] from ZK", e);
}
}
return key;
}
private DelegationKey getKeyFromZK(int keyId) throws IOException {
String nodePath =
getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + keyId);
try {
byte[] data = zkClient.getData().forPath(nodePath);
if ((data == null) || (data.length == 0)) {
return null;
}
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey();
key.readFields(din);
return key;
} catch (KeeperException.NoNodeException e) {
LOG.error("No node in path [" + nodePath + "]");
} catch (Exception ex) {
throw new IOException(ex);
}
return null;
}
@Override
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
// First check if I have this..
DelegationTokenInformation tokenInfo = currentTokens.get(ident);
// Then query ZK
if (tokenInfo == null) {
try {
tokenInfo = getTokenInfoFromZK(ident);
if (tokenInfo != null) {
currentTokens.put(ident, tokenInfo);
}
} catch (IOException e) {
LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber()
+ "] from ZK", e);
}
}
return tokenInfo;
}
private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
throws IOException {
String nodePath =
getNodePath(ZK_DTSM_TOKENS_ROOT,
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
try {
byte[] data = zkClient.getData().forPath(nodePath);
if ((data == null) || (data.length == 0)) {
return null;
}
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
createIdentifier().readFields(din);
long renewDate = din.readLong();
int pwdLen = din.readInt();
byte[] password = new byte[pwdLen];
int numRead = din.read(password, 0, pwdLen);
if (numRead > -1) {
DelegationTokenInformation tokenInfo =
new DelegationTokenInformation(renewDate, password);
return tokenInfo;
}
} catch (KeeperException.NoNodeException e) {
LOG.error("No node in path [" + nodePath + "]");
} catch (Exception ex) {
throw new IOException(ex);
}
return null;
}
@Override
protected void storeDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
addOrUpdateDelegationKey(key, false);
}
@Override
protected void updateDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
addOrUpdateDelegationKey(key, true);
}
private void addOrUpdateDelegationKey(DelegationKey key, boolean isUpdate)
throws IOException {
String nodeCreatePath =
getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
DELEGATION_KEY_PREFIX + key.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing ZKDTSMDelegationKey_" + key.getKeyId());
}
key.write(fsOut);
try {
if (zkClient.checkExists().forPath(nodeCreatePath) != null) {
zkClient.setData().forPath(nodeCreatePath, os.toByteArray())
.setVersion(-1);
if (!isUpdate) {
LOG.debug("Key with path [" + nodeCreatePath
+ "] already exists.. Updating !!");
}
} else {
zkClient.create().withMode(CreateMode.PERSISTENT)
.forPath(nodeCreatePath, os.toByteArray());
if (isUpdate) {
LOG.debug("Updating non existent Key path [" + nodeCreatePath
+ "].. Adding new !!");
}
}
} catch (KeeperException.NodeExistsException ne) {
LOG.debug(nodeCreatePath + " znode already exists !!");
} catch (Exception ex) {
throw new IOException(ex);
} finally {
os.close();
}
}
@Override
protected void removeStoredMasterKey(DelegationKey key) {
String nodeRemovePath =
getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
DELEGATION_KEY_PREFIX + key.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing ZKDTSMDelegationKey_" + key.getKeyId());
}
try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
zkClient.delete().forPath(nodeRemovePath);
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
} catch (Exception e) {
LOG.debug(nodeRemovePath + " znode could not be removed!!");
}
}
@Override
protected void storeToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
currentTokens.put(ident, tokenInfo);
try {
addOrUpdateToken(ident, tokenInfo, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
protected void updateToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
currentTokens.put(ident, tokenInfo);
String nodeRemovePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber());
try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
addOrUpdateToken(ident, tokenInfo, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
addOrUpdateToken(ident, tokenInfo, true);
}
} catch (Exception e) {
throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_"
+ ident.getSequenceNumber(), e);
}
}
@Override
protected void removeStoredToken(TokenIdent ident)
throws IOException {
String nodeRemovePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing ZKDTSMDelegationToken_"
+ ident.getSequenceNumber());
}
try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
} else {
zkClient.delete().forPath(nodeRemovePath);
}
} catch (Exception e) {
throw new RuntimeException(
"Could not remove Stored Token ZKDTSMDelegationToken_"
+ ident.getSequenceNumber(), e);
}
}
private void addOrUpdateToken(TokenIdent ident,
DelegationTokenInformation info, boolean isUpdate) throws Exception {
String nodeCreatePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber());
ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
DataOutputStream tokenOut = new DataOutputStream(tokenOs);
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
try {
ident.write(tokenOut);
tokenOut.writeLong(info.getRenewDate());
tokenOut.writeInt(info.getPassword().length);
tokenOut.write(info.getPassword());
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ")
+ "ZKDTSMDelegationToken_" +
ident.getSequenceNumber());
}
if (isUpdate) {
zkClient.setData().forPath(nodeCreatePath, tokenOs.toByteArray())
.setVersion(-1);
} else {
zkClient.create().withMode(CreateMode.PERSISTENT)
.forPath(nodeCreatePath, tokenOs.toByteArray());
}
} finally {
seqOs.close();
}
}
/**
* Simple implementation of an {@link ACLProvider} that simply returns an ACL
* that gives all permissions only to a single principal.
*/
private static class SASLOwnerACLProvider implements ACLProvider {
private final List<ACL> saslACL;
private SASLOwnerACLProvider(String principal) {
this.saslACL = Collections.singletonList(
new ACL(Perms.ALL, new Id("sasl", principal)));
}
@Override
public List<ACL> getDefaultAcl() {
return saslACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return saslACL;
}
}
@VisibleForTesting
@Private
@Unstable
static String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.security.token.delegation.web;
import com.google.common.annotations.VisibleForTesting;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -28,9 +29,11 @@ import org.apache.hadoop.security.authentication.server.AuthenticationHandler;
import org.apache.hadoop.security.authentication.server.AuthenticationToken;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
import org.apache.hadoop.util.HttpExceptionUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
@ -153,7 +156,14 @@ public class DelegationTokenAuthenticationFilter
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// A single CuratorFramework should be used for a ZK cluster.
// If the ZKSignerSecretProvider has already created it, it has to
// be set here... to be used by the ZKDelegationTokenSecretManager
ZKDelegationTokenSecretManager.setCurator((CuratorFramework)
filterConfig.getServletContext().getAttribute(ZKSignerSecretProvider.
ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE));
super.init(filterConfig);
ZKDelegationTokenSecretManager.setCurator(null);
AuthenticationHandler handler = getAuthenticationHandler();
AbstractDelegationTokenSecretManager dtSecretManager =
(AbstractDelegationTokenSecretManager) filterConfig.getServletContext().

View File

@ -78,19 +78,6 @@ public abstract class DelegationTokenAuthenticationHandler
public static final String TOKEN_KIND = PREFIX + "token-kind";
public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
"removal-scan-interval.sec";
public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
private static final Set<String> DELEGATION_TOKEN_OPS = new HashSet<String>();
static final String DELEGATION_TOKEN_UGI_ATTRIBUTE =
@ -142,7 +129,6 @@ public abstract class DelegationTokenAuthenticationHandler
@VisibleForTesting
@SuppressWarnings("unchecked")
public void initTokenManager(Properties config) {
String configPrefix = authHandler.getType() + ".";
Configuration conf = new Configuration(false);
for (Map.Entry entry : config.entrySet()) {
conf.set((String) entry.getKey(), (String) entry.getValue());
@ -153,17 +139,7 @@ public abstract class DelegationTokenAuthenticationHandler
"The configuration does not define the token kind");
}
tokenKind = tokenKind.trim();
long updateInterval = conf.getLong(configPrefix + UPDATE_INTERVAL,
UPDATE_INTERVAL_DEFAULT);
long maxLifeTime = conf.getLong(configPrefix + MAX_LIFETIME,
MAX_LIFETIME_DEFAULT);
long renewInterval = conf.getLong(configPrefix + RENEW_INTERVAL,
RENEW_INTERVAL_DEFAULT);
long removalScanInterval = conf.getLong(
configPrefix + REMOVAL_SCAN_INTERVAL, REMOVAL_SCAN_INTERVAL_DEFAULT);
tokenManager = new DelegationTokenManager(new Text(tokenKind),
updateInterval * 1000, maxLifeTime * 1000, renewInterval * 1000,
removalScanInterval * 1000);
tokenManager = new DelegationTokenManager(conf, new Text(tokenKind));
tokenManager.init();
}

View File

@ -17,16 +17,20 @@
*/
package org.apache.hadoop.security.token.delegation.web;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
/**
* Delegation Token Manager used by the
@ -35,20 +39,36 @@ import java.io.IOException;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class DelegationTokenManager {
public class DelegationTokenManager {
public static final String ENABLE_ZK_KEY = "zk-dt-secret-manager.enable";
public static final String PREFIX = "delegation-token.";
public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
"removal-scan-interval.sec";
public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
private static class DelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
private Text tokenKind;
public DelegationTokenSecretManager(Text tokenKind,
long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
public DelegationTokenSecretManager(Configuration conf, Text tokenKind) {
super(conf.getLong(UPDATE_INTERVAL, UPDATE_INTERVAL_DEFAULT) * 1000,
conf.getLong(MAX_LIFETIME, MAX_LIFETIME_DEFAULT) * 1000,
conf.getLong(RENEW_INTERVAL, RENEW_INTERVAL_DEFAULT) * 1000,
conf.getLong(REMOVAL_SCAN_INTERVAL,
REMOVAL_SCAN_INTERVAL_DEFAULT * 1000));
this.tokenKind = tokenKind;
}
@ -56,21 +76,34 @@ class DelegationTokenManager {
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier(tokenKind);
}
}
private static class ZKSecretManager
extends ZKDelegationTokenSecretManager<DelegationTokenIdentifier> {
private Text tokenKind;
public ZKSecretManager(Configuration conf, Text tokenKind) {
super(conf);
this.tokenKind = tokenKind;
}
@Override
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier(tokenKind);
}
}
private AbstractDelegationTokenSecretManager secretManager = null;
private boolean managedSecretManager;
private Text tokenKind;
public DelegationTokenManager(Text tokenKind,
long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
this.secretManager = new DelegationTokenSecretManager(tokenKind,
delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
public DelegationTokenManager(Configuration conf, Text tokenKind) {
if (conf.getBoolean(ENABLE_ZK_KEY, false)) {
this.secretManager = new ZKSecretManager(conf, tokenKind);
} else {
this.secretManager = new DelegationTokenSecretManager(conf, tokenKind);
}
this.tokenKind = tokenKind;
managedSecretManager = true;
}
@ -150,4 +183,9 @@ class DelegationTokenManager {
return id.getUser();
}
@VisibleForTesting
@SuppressWarnings("rawtypes")
public AbstractDelegationTokenSecretManager getDelegationTokenSecretManager() {
return secretManager;
}
}

View File

@ -102,7 +102,7 @@ public class DiskChecker {
*/
public static void checkDir(File dir) throws DiskErrorException {
if (!mkdirsWithExistsCheck(dir)) {
throw new DiskErrorException("Can not create directory: "
throw new DiskErrorException("Cannot create directory: "
+ dir.toString());
}
checkDirAccess(dir);

View File

@ -276,12 +276,22 @@ public class TestRPC {
*/
private static class StoppedRpcEngine implements RpcEngine {
@SuppressWarnings("unchecked")
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, null);
}
@SuppressWarnings("unchecked")
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth
) throws IOException {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new StoppedInvocationHandler());
return new ProtocolProxy<T>(protocol, proxy, false);

View File

@ -121,7 +121,7 @@ public class TestDelegationToken {
@Override
protected void storeNewToken(TestDelegationTokenIdentifier ident,
long renewDate) {
long renewDate) throws IOException {
super.storeNewToken(ident, renewDate);
isStoreNewTokenCalled = true;
}
@ -135,7 +135,7 @@ public class TestDelegationToken {
@Override
protected void updateStoredToken(TestDelegationTokenIdentifier ident,
long renewDate) {
long renewDate) throws IOException {
super.updateStoredToken(ident, renewDate);
isUpdateStoredTokenCalled = true;
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token.delegation;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.junit.Assert;
import org.junit.Test;
public class TestZKDelegationTokenSecretManager {
private static final long DAY_IN_SECS = 86400;
@Test
public void testZKDelTokSecretManager() throws Exception {
TestingServer zkServer = new TestingServer();
DelegationTokenManager tm1, tm2 = null;
zkServer.start();
try {
String connectString = zkServer.getConnectString();
Configuration conf = new Configuration();
conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
tm2 = new DelegationTokenManager(conf, new Text("foo"));
tm2.init();
Token<DelegationTokenIdentifier> token =
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm2.verifyToken(token);
token = tm2.createToken(UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token);
tm1.verifyToken(token);
} finally {
zkServer.close();
}
}
}

View File

@ -17,27 +17,28 @@
*/
package org.apache.hadoop.security.token.delegation.web;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
public class TestDelegationTokenManager {
private static final long DAY_IN_SECS = 86400;
@Test
public void testDTManager() throws Exception {
DelegationTokenManager tm = new DelegationTokenManager(new Text("foo"),
DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS);
Configuration conf = new Configuration(false);
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
DelegationTokenManager tm =
new DelegationTokenManager(conf, new Text("foo"));
tm.init();
Token<DelegationTokenIdentifier> token =
tm.createToken(UserGroupInformation.getCurrentUser(), "foo");

View File

@ -23,7 +23,7 @@
<value>*</value>
<description>
ACL for create-key operations.
If the user does is not in the GET ACL, the key material is not returned
If the user is not in the GET ACL, the key material is not returned
as part of the response.
</description>
</property>
@ -58,7 +58,7 @@
<name>hadoop.kms.acl.GET_KEYS</name>
<value>*</value>
<description>
ACL for get-keys operation.
ACL for get-keys operations.
</description>
</property>
@ -66,7 +66,7 @@
<name>hadoop.kms.acl.GET_METADATA</name>
<value>*</value>
<description>
ACL for get-key-metadata an get-keys-metadata operations.
ACL for get-key-metadata and get-keys-metadata operations.
</description>
</property>
@ -74,7 +74,7 @@
<name>hadoop.kms.acl.SET_KEY_MATERIAL</name>
<value>*</value>
<description>
Complimentary ACL for CREATE and ROLLOVER operation to allow the client
Complementary ACL for CREATE and ROLLOVER operations to allow the client
to provide the key material when creating or rolling a key.
</description>
</property>
@ -83,7 +83,7 @@
<name>hadoop.kms.acl.GENERATE_EEK</name>
<value>*</value>
<description>
ACL for generateEncryptedKey CryptoExtension operations
ACL for generateEncryptedKey CryptoExtension operations.
</description>
</property>
@ -91,7 +91,7 @@
<name>hadoop.kms.acl.DECRYPT_EEK</name>
<value>*</value>
<description>
ACL for decrypt EncryptedKey CryptoExtension operations
ACL for decryptEncryptedKey CryptoExtension operations.
</description>
</property>

View File

@ -15,10 +15,12 @@
<configuration>
<!-- KMS Backend KeyProvider -->
<property>
<name>hadoop.kms.key.provider.uri</name>
<value>jceks://file@/${user.home}/kms.keystore</value>
<description>
URI of the backing KeyProvider for the KMS.
</description>
</property>
@ -26,14 +28,52 @@
<name>hadoop.security.keystore.JavaKeyStoreProvider.password</name>
<value>none</value>
<description>
If using the JavaKeyStoreProvider, the password for the keystore file.
</description>
</property>
<!-- KMS Cache -->
<property>
<name>hadoop.kms.cache.enable</name>
<value>true</value>
<description>
Whether the KMS will act as a cache for the backing KeyProvider.
When the cache is enabled, operations like getKeyVersion, getMetadata,
and getCurrentKey will sometimes return cached data without consulting
the backing KeyProvider. Cached values are flushed when keys are deleted
or modified.
</description>
</property>
<property>
<name>hadoop.kms.cache.timeout.ms</name>
<value>600000</value>
<description>
Expiry time for the KMS key version and key metadata cache, in
milliseconds. This affects getKeyVersion and getMetadata.
</description>
</property>
<property>
<name>hadoop.kms.current.key.cache.timeout.ms</name>
<value>30000</value>
<description>
Expiry time for the KMS current key cache, in milliseconds. This
affects getCurrentKey operations.
</description>
</property>
<!-- KMS Audit -->
<property>
<name>hadoop.kms.audit.aggregation.window.ms</name>
<value>10000</value>
<description>
Duplicate audit log events within the aggregation window (specified in
ms) are quashed to reduce log traffic. A single message for aggregated
events is printed at the end of the window, along with a count of the
number of aggregated events.
</description>
</property>
@ -43,7 +83,8 @@
<name>hadoop.kms.authentication.type</name>
<value>simple</value>
<description>
simple or kerberos
Authentication type for the KMS. Can be either &quot;simple&quot;
or &quot;kerberos&quot;.
</description>
</property>
@ -51,6 +92,7 @@
<name>hadoop.kms.authentication.kerberos.keytab</name>
<value>${user.home}/kms.keytab</value>
<description>
Path to the keytab with credentials for the configured Kerberos principal.
</description>
</property>
@ -58,6 +100,8 @@
<name>hadoop.kms.authentication.kerberos.principal</name>
<value>HTTP/localhost</value>
<description>
The Kerberos principal to use for the HTTP endpoint.
The principal must start with 'HTTP/' as per the Kerberos HTTP SPNEGO specification.
</description>
</property>
@ -65,6 +109,7 @@
<name>hadoop.kms.authentication.kerberos.name.rules</name>
<value>DEFAULT</value>
<description>
Rules used to resolve Kerberos principal names.
</description>
</property>

View File

@ -103,9 +103,17 @@ public class KMSAudit {
private static Logger AUDIT_LOG = LoggerFactory.getLogger(KMS_LOGGER_NAME);
KMSAudit(long delay) {
/**
* Create a new KMSAudit.
*
* @param windowMs Duplicate events within the aggregation window are quashed
* to reduce log traffic. A single message for aggregated
* events is printed at the end of the window, along with a
* count of the number of aggregated events.
*/
KMSAudit(long windowMs) {
cache = CacheBuilder.newBuilder()
.expireAfterWrite(delay, TimeUnit.MILLISECONDS)
.expireAfterWrite(windowMs, TimeUnit.MILLISECONDS)
.removalListener(
new RemovalListener<String, AuditEvent>() {
@Override
@ -126,7 +134,7 @@ public class KMSAudit {
public void run() {
cache.cleanUp();
}
}, delay / 10, delay / 10, TimeUnit.MILLISECONDS);
}, windowMs / 10, windowMs / 10, TimeUnit.MILLISECONDS);
}
private void logEvent(AuditEvent event) {

View File

@ -54,8 +54,8 @@ public class KMSConfiguration {
public static final String CURR_KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX +
"current.key.cache.timeout.ms";
// Delay for Audit logs that need aggregation
public static final String KMS_AUDIT_AGGREGATION_DELAY = CONFIG_PREFIX +
"aggregation.delay.ms";
public static final String KMS_AUDIT_AGGREGATION_WINDOW = CONFIG_PREFIX +
"audit.aggregation.window.ms";
public static final boolean KEY_CACHE_ENABLE_DEFAULT = true;
// 10 mins
@ -63,7 +63,7 @@ public class KMSConfiguration {
// 30 secs
public static final long CURR_KEY_CACHE_TIMEOUT_DEFAULT = 30 * 1000;
// 10 secs
public static final long KMS_AUDIT_AGGREGATION_DELAY_DEFAULT = 10000;
public static final long KMS_AUDIT_AGGREGATION_WINDOW_DEFAULT = 10000;
// Property to Enable/Disable per Key authorization
public static final String KEY_AUTHORIZATION_ENABLE = CONFIG_PREFIX +

View File

@ -148,8 +148,8 @@ public class KMSWebApp implements ServletContextListener {
kmsAudit =
new KMSAudit(kmsConf.getLong(
KMSConfiguration.KMS_AUDIT_AGGREGATION_DELAY,
KMSConfiguration.KMS_AUDIT_AGGREGATION_DELAY_DEFAULT));
KMSConfiguration.KMS_AUDIT_AGGREGATION_WINDOW,
KMSConfiguration.KMS_AUDIT_AGGREGATION_WINDOW_DEFAULT));
// this is required for the the JMXJsonServlet to work properly.
// the JMXJsonServlet is behind the authentication filter,

View File

@ -76,7 +76,7 @@ public class TestKMSWithZK {
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
Configuration conf = new Configuration(false);
conf.set("hadoop.security.key.provider.path",
conf.set(KMSConfiguration.KEY_PROVIDER_URI,
"jceks://file@" + new Path(keyStoreDir.getAbsolutePath(),
"kms.keystore").toUri());
conf.set("hadoop.kms.authentication.type", "simple");

View File

@ -782,6 +782,8 @@ Release 2.6.0 - UNRELEASED
HDFS-7096. Fix TestRpcProgramNfs3 to use DFS_ENCRYPTION_KEY_PROVIDER_URI
(clamb via cmccabe)
HDFS-7046. HA NN can NPE upon transition to active. (kihwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an
@ -895,6 +897,12 @@ Release 2.6.0 - UNRELEASED
HDFS-7051. TestDataNodeRollingUpgrade#isBlockFileInPrevious assumes Unix file
path separator. (cnauroth)
HDFS-7105. Fix TestJournalNode#testFailToStartWithBadConfig to match log
output change. (Ray Chiang via cnauroth)
HDFS-7105. Allow falling back to a non-SASL connection on
DataTransferProtocol in several edge cases. (cnauroth)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -22,8 +22,6 @@ import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
.EncryptedKeyVersion;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@ -90,6 +88,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -616,13 +615,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
if (numResponseToDrop > 0) {
// This case is used for testing.
LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+ " is set to " + numResponseToDrop
+ ", this hacked client will proactively drop responses");
proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
nameNodeUri, ClientProtocol.class, numResponseToDrop);
nameNodeUri, ClientProtocol.class, numResponseToDrop,
nnFallbackToSimpleAuth);
}
if (proxyInfo != null) {
@ -637,7 +638,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class);
ClientProtocol.class, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
@ -675,10 +676,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
this.saslClient = new SaslDataTransferClient(
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf),
conf.getBoolean(
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
}
/**
@ -3113,4 +3111,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public void setKeyProvider(KeyProviderCryptoExtension provider) {
this.provider = provider;
}
/**
* Returns the SaslDataTransferClient configured for this DFSClient.
*
* @return SaslDataTransferClient configured for this DFSClient
*/
public SaslDataTransferClient getSaslDataTransferClient() {
return saslClient;
}
}

View File

@ -603,6 +603,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
@ -717,4 +718,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =
1000;
public static final String IGNORE_SECURE_PORTS_FOR_TESTING_KEY =
"ignore.secure.ports.for.testing";
public static final boolean IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT = false;
}

View File

@ -244,7 +244,7 @@ public class HAUtil {
// Create the proxy provider. Actual proxy is not created.
AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxies
.createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
false);
false, null);
// No need to use logical URI since failover is not configured.
if (provider == null) {

View File

@ -36,6 +36,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -145,13 +146,37 @@ public class NameNodeProxies {
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface) throws IOException {
return createProxy(conf, nameNodeUri, xface, null);
}
/**
* Creates the namenode proxy with the passed protocol. This will handle
* creation of either HA- or non-HA-enabled proxy objects, depending upon
* if the provided URI is a configured logical URI.
*
* @param conf the configuration containing the required IPC
* properties, client failover configurations, etc.
* @param nameNodeUri the URI pointing either to a specific NameNode
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException if there is an error creating the proxy
**/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(conf, nameNodeUri, xface, true);
createFailoverProxyProvider(conf, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
// Non-HA case
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true);
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
} else {
// HA case
Conf config = new Conf(conf);
@ -187,6 +212,8 @@ public class NameNodeProxies {
* or to a logical nameservice.
* @param xface the IPC interface which should be created
* @param numResponseToDrop The number of responses to drop for each RPC call
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to. Will return null of the
* given configuration does not support HA.
@ -195,10 +222,12 @@ public class NameNodeProxies {
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
Configuration config, URI nameNodeUri, Class<T> xface,
int numResponseToDrop) throws IOException {
int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
Preconditions.checkArgument(numResponseToDrop > 0);
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(config, nameNodeUri, xface, true);
createFailoverProxyProvider(config, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider != null) { // HA case
int delay = config.getInt(
@ -257,12 +286,35 @@ public class NameNodeProxies {
public static <T> ProxyAndInfo<T> createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries) throws IOException {
return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
}
/**
* Creates an explicitly non-HA-enabled proxy object. Most of the time you
* don't want to use this, and should instead use {@link NameNodeProxies#createProxy}.
*
* @param conf the configuration object
* @param nnAddr address of the remote NN to connect to
* @param xface the IPC interface which should be created
* @param ugi the user who is making the calls on the proxy object
* @param withRetries certain interfaces have a non-standard retry policy
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <T> ProxyAndInfo<T> createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
Text dtService = SecurityUtil.buildTokenService(nnAddr);
T proxy;
if (xface == ClientProtocol.class) {
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
withRetries);
withRetries, fallbackToSimpleAuth);
} else if (xface == JournalProtocol.class) {
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
} else if (xface == NamenodeProtocol.class) {
@ -351,7 +403,8 @@ public class NameNodeProxies {
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy =
@ -367,8 +420,8 @@ public class NameNodeProxies {
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
.getProxy();
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
if (withRetries) { // create the proxy with retries
@ -440,8 +493,8 @@ public class NameNodeProxies {
/** Creates the Failover proxy provider instance*/
@VisibleForTesting
public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort)
throws IOException {
Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
AbstractNNFailoverProxyProvider<T> providerNN;
Preconditions.checkArgument(
@ -490,6 +543,7 @@ public class NameNodeProxies {
+ " and does not use port information.");
}
}
providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
return providerNN;
}

View File

@ -28,6 +28,7 @@ import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@ -71,21 +72,38 @@ public class SaslDataTransferClient {
private static final Logger LOG = LoggerFactory.getLogger(
SaslDataTransferClient.class);
private final boolean fallbackToSimpleAuthAllowed;
private final AtomicBoolean fallbackToSimpleAuth;
private final SaslPropertiesResolver saslPropsResolver;
private final TrustedChannelResolver trustedChannelResolver;
/**
* Creates a new SaslDataTransferClient. This constructor is used in cases
* where it is not relevant to track if a secure client did a fallback to
* simple auth. For intra-cluster connections between data nodes in the same
* cluster, we can assume that all run under the same security configuration.
*
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
*/
public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver) {
this(saslPropsResolver, trustedChannelResolver, null);
}
/**
* Creates a new SaslDataTransferClient.
*
* @param saslPropsResolver for determining properties of SASL negotiation
* @param trustedChannelResolver for identifying trusted connections that do
* not require SASL negotiation
* @param fallbackToSimpleAuth checked on each attempt at general SASL
* handshake, if true forces use of simple auth
*/
public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
TrustedChannelResolver trustedChannelResolver,
boolean fallbackToSimpleAuthAllowed) {
this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed;
AtomicBoolean fallbackToSimpleAuth) {
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.saslPropsResolver = saslPropsResolver;
this.trustedChannelResolver = trustedChannelResolver;
}
@ -221,22 +239,26 @@ public class SaslDataTransferClient {
"SASL client skipping handshake in secured configuration with "
+ "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
return null;
} else if (accessToken.getIdentifier().length == 0) {
if (!fallbackToSimpleAuthAllowed) {
throw new IOException(
"No block access token was provided (insecure cluster), but this " +
"client is configured to allow only secure connections.");
}
} else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
LOG.debug(
"SASL client skipping handshake in secured configuration with "
+ "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
return null;
} else {
} else if (saslPropsResolver != null) {
LOG.debug(
"SASL client doing general handshake for addr = {}, datanodeId = {}",
addr, datanodeId);
return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
datanodeId);
} else {
// It's a secured cluster using non-privileged ports, but no SASL. The
// only way this can happen is if the DataNode has
// ignore.secure.ports.for.testing configured, so this is a rare edge case.
LOG.debug(
"SASL client skipping handshake in secured configuration with no SASL "
+ "protection configured for addr = {}, datanodeId = {}",
addr, datanodeId);
return null;
}
}
@ -348,12 +370,6 @@ public class SaslDataTransferClient {
OutputStream underlyingOut, InputStream underlyingIn,
Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
throws IOException {
if (saslPropsResolver == null) {
throw new IOException(String.format("Cannot create a secured " +
"connection if DataNode listens on unprivileged port (%d) and no " +
"protection is defined in configuration property %s.",
datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
}
Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
String userName = buildUserName(accessToken);

View File

@ -112,11 +112,29 @@ public class SaslDataTransferServer {
"SASL server skipping handshake in unsecured configuration for "
+ "peer = {}, datanodeId = {}", peer, datanodeId);
return new IOStreamPair(underlyingIn, underlyingOut);
} else {
} else if (dnConf.getSaslPropsResolver() != null) {
LOG.debug(
"SASL server doing general handshake for peer = {}, datanodeId = {}",
peer, datanodeId);
return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId);
} else if (dnConf.getIgnoreSecurePortsForTesting()) {
// It's a secured cluster using non-privileged ports, but no SASL. The
// only way this can happen is if the DataNode has
// ignore.secure.ports.for.testing configured, so this is a rare edge case.
LOG.debug(
"SASL server skipping handshake in secured configuration with no SASL "
+ "protection configured for peer = {}, datanodeId = {}",
peer, datanodeId);
return new IOStreamPair(underlyingIn, underlyingOut);
} else {
// The error message here intentionally does not mention
// ignore.secure.ports.for.testing. That's intended for dev use only.
// This code path is not expected to execute ever, because DataNode startup
// checks for invalid configuration and aborts.
throw new IOException(String.format("Cannot create a secured " +
"connection if DataNode listens on unprivileged port (%d) and no " +
"protection is defined in configuration property %s.",
datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
}
}
@ -257,12 +275,6 @@ public class SaslDataTransferServer {
private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
InputStream underlyingIn, final DatanodeID datanodeId) throws IOException {
SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
if (saslPropsResolver == null) {
throw new IOException(String.format("Cannot create a secured " +
"connection if DataNode listens on unprivileged port (%d) and no " +
"protection is defined in configuration property %s.",
datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
}
Map<String, String> saslProps = saslPropsResolver.getServerProperties(
getPeerAddress(peer));

View File

@ -48,8 +48,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.StorageType;
@ -787,12 +785,9 @@ public class Dispatcher {
: Executors.newFixedThreadPool(dispatcherThreads);
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.saslClient = new SaslDataTransferClient(
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
}
public DistributedFileSystem getDistributedFileSystem() {

View File

@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -101,6 +102,7 @@ public class NameNodeConnector implements Closeable {
private final NamenodeProtocol namenode;
private final ClientProtocol client;
private final KeyManager keyManager;
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
private final DistributedFileSystem fs;
private final Path idPath;
@ -120,7 +122,7 @@ public class NameNodeConnector implements Closeable {
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
NamenodeProtocol.class).getProxy();
this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class).getProxy();
ClientProtocol.class, fallbackToSimpleAuth).getProxy();
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();

View File

@ -48,6 +48,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEF
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -90,6 +92,7 @@ public class DNConf {
final String encryptionAlgorithm;
final SaslPropertiesResolver saslPropsResolver;
final TrustedChannelResolver trustedChannelResolver;
private final boolean ignoreSecurePortsForTesting;
final long xceiverStopTimeout;
final long restartReplicaExpiry;
@ -173,6 +176,9 @@ public class DNConf {
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
conf);
this.ignoreSecurePortsForTesting = conf.getBoolean(
IGNORE_SECURE_PORTS_FOR_TESTING_KEY,
IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT);
this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@ -238,4 +244,15 @@ public class DNConf {
public TrustedChannelResolver getTrustedChannelResolver() {
return trustedChannelResolver;
}
/**
* Returns true if configuration is set to skip checking for proper
* port configuration in a secured cluster. This is only intended for use in
* dev testing.
*
* @return true if configured to skip checking secured port configuration
*/
public boolean getIgnoreSecurePortsForTesting() {
return ignoreSecurePortsForTesting;
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
@ -46,9 +44,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.BufferedOutputStream;
@ -170,6 +171,7 @@ import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -967,8 +969,6 @@ public class DataNode extends ReconfigurableBase
SecureResources resources
) throws IOException {
checkSecureConfig(conf, resources);
// settings global for all BPs in the Data Node
this.secureResources = resources;
synchronized (this) {
@ -976,6 +976,8 @@ public class DataNode extends ReconfigurableBase
}
this.conf = conf;
this.dnConf = new DNConf(conf);
checkSecureConfig(dnConf, conf, resources);
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
if (dnConf.maxLockedMemory > 0) {
@ -1031,10 +1033,7 @@ public class DataNode extends ReconfigurableBase
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
dnConf.trustedChannelResolver,
conf.getBoolean(
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
}
@ -1054,23 +1053,24 @@ public class DataNode extends ReconfigurableBase
* must check if the target port is a privileged port, and if so, skip the
* SASL handshake.
*
* @param dnConf DNConf to check
* @param conf Configuration to check
* @param resources SecuredResources obtained for DataNode
* @throws RuntimeException if security enabled, but configuration is insecure
*/
private static void checkSecureConfig(Configuration conf,
private static void checkSecureConfig(DNConf dnConf, Configuration conf,
SecureResources resources) throws RuntimeException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
if (resources != null && dataTransferProtection == null) {
SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
if (resources != null && saslPropsResolver == null) {
return;
}
if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
if (dnConf.getIgnoreSecurePortsForTesting()) {
return;
}
if (dataTransferProtection != null &&
if (saslPropsResolver != null &&
DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
resources == null) {
return;

View File

@ -1184,8 +1184,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
cacheManager.startMonitorThread();
blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
} finally {
writeUnlock();
startingActiveService = false;
checkSafeMode();
writeUnlock();
}
}
@ -5674,6 +5675,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Have to have write-lock since leaving safemode initializes
// repl queues, which requires write lock
assert hasWriteLock();
if (inTransitionToActive()) {
return;
}
// if smmthread is already running, the block threshold must have been
// reached before, there is no need to enter the safe mode again
if (smmthread == null && needEnter()) {

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
@ -59,10 +56,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
@ -161,7 +155,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
private List<String> snapshottableDirs = null;
private final BlockPlacementPolicy bpPolicy;
private final SaslDataTransferClient saslClient;
/**
* Filesystem checker.
@ -188,12 +181,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap());
this.saslClient = new SaslDataTransferClient(
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf),
conf.getBoolean(
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next();
@ -594,7 +581,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
* bad. Both places should be refactored to provide a method to copy blocks
* around.
*/
private void copyBlock(DFSClient dfs, LocatedBlock lblock,
private void copyBlock(final DFSClient dfs, LocatedBlock lblock,
OutputStream fos) throws Exception {
int failures = 0;
InetSocketAddress targetAddr = null;
@ -647,8 +634,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
try {
s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
NamenodeFsck.this, blockToken, datanodeId);
peer = TcpPeerServer.peerFromSocketAndKey(
dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
blockToken, datanodeId);
} finally {
if (peer == null) {
IOUtils.closeQuietly(s);

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
public abstract class AbstractNNFailoverProxyProvider<T> implements
FailoverProxyProvider <T> {
protected AtomicBoolean fallbackToSimpleAuth;
/**
* Inquire whether logical HA URI is used for the implementation. If it is
* used, a special token handling may be needed to make sure a token acquired
@ -32,4 +36,14 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
* @return true if logical HA URI is used. false, if not used.
*/
public abstract boolean useLogicalURI();
/**
* Set for tracking if a secure client falls back to simple auth.
*
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
*/
public void setFallbackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth) {
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
}

View File

@ -122,7 +122,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
if (current.namenode == null) {
try {
current.namenode = NameNodeProxies.createNonHAProxy(conf,
current.address, xface, ugi, false).getProxy();
current.address, xface, ugi, false, fallbackToSimpleAuth).getProxy();
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.junit.Assert.*;
import java.io.IOException;
@ -29,11 +32,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
public class TestSaslDataTransfer extends SaslDataTransferTestCase {
@ -49,6 +54,9 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase {
@Rule
public ExpectedException exception = ExpectedException.none();
@Rule
public Timeout timeout = new Timeout(60000);
@After
public void shutdown() {
IOUtils.cleanup(null, fs);
@ -98,17 +106,6 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase {
doTest(clientConf);
}
@Test
public void testClientSaslNoServerSasl() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("");
startCluster(clusterConf);
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
exception.expect(IOException.class);
exception.expectMessage("could only be replicated to 0 nodes");
doTest(clientConf);
}
@Test
public void testServerSaslNoClientSasl() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig(
@ -121,6 +118,32 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase {
doTest(clientConf);
}
@Test
public void testDataNodeAbortsIfNoSasl() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("");
exception.expect(RuntimeException.class);
exception.expectMessage("Cannot start secure DataNode");
startCluster(clusterConf);
}
@Test
public void testDataNodeAbortsIfNotHttpsOnly() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("authentication");
clusterConf.set(DFS_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_AND_HTTPS.name());
exception.expect(RuntimeException.class);
exception.expectMessage("Cannot start secure DataNode");
startCluster(clusterConf);
}
@Test
public void testNoSaslAndSecurePortsIgnored() throws Exception {
HdfsConfiguration clusterConf = createSecureConfig("");
clusterConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
startCluster(clusterConf);
doTest(clusterConf);
}
/**
* Tests DataTransferProtocol with the given client configuration.
*

View File

@ -287,7 +287,7 @@ public class TestJournalNode {
// Directory which cannot be created
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
Shell.WINDOWS ? "\\\\cannotBeCreated" : "/proc/does-not-exist");
assertJNFailsToStart(conf, "Can not create directory");
assertJNFailsToStart(conf, "Cannot create directory");
}
private static void assertJNFailsToStart(Configuration conf,

View File

@ -194,7 +194,7 @@ public class TestRetryCacheWithHA {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true);
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,

View File

@ -356,6 +356,10 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6086. mapreduce.job.credentials.binary should allow all URIs.
(Zhihai Xu via kasha)
MAPREDUCE-6091. YARNRunner.getJobStatus() fails with
ApplicationNotFoundException if the job rolled off the RM view (Sangjin
Lee via jlowe)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -152,7 +152,6 @@
<property><!--Loaded from job.xml--><name>mapreduce.map.speculative</name><value>false</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.job.acl-view-job</name><value> </value></property>
<property><!--Loaded from job.xml--><name>mapreduce.map.output.key.class</name><value>org.apache.hadoop.io.IntWritable</value></property>
<property><!--Loaded from job.xml--><name>yarn.ipc.serializer.type</name><value>protocolbuffers</value></property>
<property><!--Loaded from mapred-default.xml--><name>mapreduce.job.end-notification.max.retry.interval</name><value>5</value></property>
<property><!--Loaded from job.xml--><name>ftp.blocksize</name><value>67108864</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.tasktracker.http.threads</name><value>40</value></property>

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -150,6 +151,8 @@ public class ClientServiceDelegate {
ApplicationReport application = null;
try {
application = rm.getApplicationReport(appId);
} catch (ApplicationNotFoundException e) {
application = null;
} catch (YarnException e2) {
throw new IOException(e2);
}

View File

@ -31,8 +31,6 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
@ -56,8 +54,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -488,7 +488,9 @@ public class TestClientServiceDelegate {
private ResourceMgrDelegate getRMDelegate() throws IOException {
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
try {
when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
ApplicationId appId = jobId.getAppId();
when(rm.getApplicationReport(appId)).
thenThrow(new ApplicationNotFoundException(appId + " not found"));
} catch (YarnException e) {
throw new IOException(e);
}

View File

@ -90,13 +90,8 @@ public class TestNonExistentJob extends TestCase {
}
public void testGetInvalidJob() throws Exception {
try {
RunningJob runJob = new JobClient(getJobConf()).getJob(JobID.forName("job_0_0"));
fail("Exception is expected to thrown ahead!");
} catch (Exception e) {
assertTrue(e instanceof IOException);
assertTrue(e.getMessage().contains("ApplicationNotFoundException"));
}
RunningJob runJob = new JobClient(getJobConf()).getJob(JobID.forName("job_0_0"));
assertNull(runJob);
}
}

View File

@ -864,6 +864,16 @@
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>

View File

@ -1095,7 +1095,7 @@ public class NativeAzureFileSystem extends FileSystem {
if (dstKey.startsWith(srcKey + PATH_DELIMITER)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming directory to a itself is disallowed. src=" + src
LOG.debug("Renaming directory to itself is disallowed. src=" + src
+ " dest=" + dst);
}
return false;

View File

@ -681,7 +681,7 @@ public class Gridmix extends Configured implements Tool {
} catch (IOException e) {
LOG.warn("Failure killing " + job.getJobName(), e);
} catch (Exception e) {
LOG.error("Unexcpected exception", e);
LOG.error("Unexpected exception", e);
}
}
LOG.info("Done.");

View File

@ -4716,7 +4716,6 @@
"dfs.journalnode.http-address" : "0.0.0.0:8480",
"mapreduce.job.acl-view-job" : " ",
"mapreduce.reduce.shuffle.retry-delay.max.ms" : "60000",
"yarn.ipc.serializer.type" : "protocolbuffers",
"mapreduce.job.end-notification.max.retry.interval" : "5",
"ftp.blocksize" : "67108864",
"mapreduce.tasktracker.http.threads" : "80",
@ -4841,7 +4840,7 @@
"yarn.ipc.rpc.class" : "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC",
"mapreduce.job.name" : "TeraGen",
"kfs.blocksize" : "67108864",
"yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs" : "86400",
"yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-secs" : "86400",
"mapreduce.job.ubertask.maxmaps" : "9",
"yarn.scheduler.maximum-allocation-mb" : "8192",
"yarn.nodemanager.heartbeat.interval-ms" : "1000",
@ -9830,7 +9829,6 @@
"dfs.journalnode.http-address" : "0.0.0.0:8480",
"mapreduce.job.acl-view-job" : " ",
"mapreduce.reduce.shuffle.retry-delay.max.ms" : "60000",
"yarn.ipc.serializer.type" : "protocolbuffers",
"mapreduce.job.end-notification.max.retry.interval" : "5",
"ftp.blocksize" : "67108864",
"mapreduce.tasktracker.http.threads" : "80",
@ -9955,7 +9953,7 @@
"yarn.ipc.rpc.class" : "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC",
"mapreduce.job.name" : "TeraGen",
"kfs.blocksize" : "67108864",
"yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs" : "86400",
"yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-secs" : "86400",
"mapreduce.job.ubertask.maxmaps" : "9",
"yarn.scheduler.maximum-allocation-mb" : "8192",
"yarn.nodemanager.heartbeat.interval-ms" : "1000",

View File

@ -1016,7 +1016,7 @@ public class StreamJob implements Tool {
if (background_) {
LOG.info("Job is running in background.");
} else if (!jc_.monitorAndPrintJob(jobConf_, running_)) {
LOG.error("Job not Successful!");
LOG.error("Job not successful!");
return 1;
}
LOG.info("Output directory: " + output_);

View File

@ -400,6 +400,12 @@ Release 2.6.0 - UNRELEASED
YARN-2568. Fixed the potential test failures due to race conditions when RM
work-preserving recovery is enabled. (Jian He via zjshen)
YARN-2565. Fixed RM to not use FileSystemApplicationHistoryStore unless
explicitly set. (Zhijie Shen via jianhe)
YARN-2460. Remove obsolete entries from yarn-default.xml (Ray Chiang via
aw)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -30,22 +30,11 @@
<name>yarn.ipc.client.factory.class</name>
</property>
<property>
<description>Type of serialization to use.</description>
<name>yarn.ipc.serializer.type</name>
<value>protocolbuffers</value>
</property>
<property>
<description>Factory to create server IPC classes.</description>
<name>yarn.ipc.server.factory.class</name>
</property>
<property>
<description>Factory to create IPC exceptions.</description>
<name>yarn.ipc.exception.factory.class</name>
</property>
<property>
<description>Factory to create serializeable records.</description>
<name>yarn.ipc.record.factory.class</name>
@ -162,12 +151,6 @@
<value>1</value>
</property>
<property>
<description>How often should the RM check that the AM is still alive.</description>
<name>yarn.resourcemanager.amliveliness-monitor.interval-ms</name>
<value>1000</value>
</property>
<property>
<description>Maximum time to wait to establish connection to
ResourceManager.</description>
@ -220,12 +203,6 @@
<value>600000</value>
</property>
<property>
<description>How often to check that node managers are still alive.</description>
<name>yarn.resourcemanager.nm.liveness-monitor.interval-ms</name>
<value>1000</value>
</property>
<property>
<description>Path to file with nodes to include.</description>
<name>yarn.resourcemanager.nodes.include-path</name>
@ -580,7 +557,7 @@
<description>Interval for the roll over for the master key used to generate
application tokens
</description>
<name>yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs</name>
<name>yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-secs</name>
<value>86400</value>
</property>
@ -1090,20 +1067,6 @@
<value>2000</value>
</property>
<property>
<description>Max time, in seconds, to wait to establish a connection to RM when NM starts.
The NM will shutdown if it cannot connect to RM within the specified max time period.
If the value is set as -1, then NM will retry forever.</description>
<name>yarn.nodemanager.resourcemanager.connect.wait.secs</name>
<value>900</value>
</property>
<property>
<description>Time interval, in seconds, between each NM attempt to connect to RM.</description>
<name>yarn.nodemanager.resourcemanager.connect.retry_interval.secs</name>
<value>30</value>
</property>
<property>
<description>The minimum allowed version of a resourcemanager that a nodemanager will connect to.
The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is

View File

@ -182,7 +182,9 @@ public class ApplicationHistoryServer extends CompositeService {
// APPLICATION_HISTORY_STORE is neither null nor empty, it means that the
// user has enabled it explicitly.
if (conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE) == null ||
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).length() == 0) {
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).length() == 0 ||
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).equals(
NullApplicationHistoryStore.class.getName())) {
return new ApplicationHistoryManagerOnTimelineStore(
timelineDataManager, aclsManager);
} else {

View File

@ -52,6 +52,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import com.google.common.annotations.VisibleForTesting;
/**
* <p>
* {@link ResourceManager} uses this class to write the information of
@ -71,8 +73,10 @@ public class RMApplicationHistoryWriter extends CompositeService {
.getLog(RMApplicationHistoryWriter.class);
private Dispatcher dispatcher;
private ApplicationHistoryWriter writer;
private boolean historyServiceEnabled;
@VisibleForTesting
ApplicationHistoryWriter writer;
@VisibleForTesting
boolean historyServiceEnabled;
public RMApplicationHistoryWriter() {
super(RMApplicationHistoryWriter.class.getName());
@ -80,13 +84,18 @@ public class RMApplicationHistoryWriter extends CompositeService {
@Override
protected synchronized void serviceInit(Configuration conf) throws Exception {
historyServiceEnabled =
conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED);
if (conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE) == null ||
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).length() == 0 ||
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).equals(
NullApplicationHistoryStore.class.getName())) {
historyServiceEnabled = false;
}
// Only create the services when the history service is enabled, preventing
// wasting the system resources.
// Only create the services when the history service is enabled and not
// using the null store, preventing wasting the system resources.
if (historyServiceEnabled) {
writer = createApplicationHistoryStore(conf);
addIfService(writer);
@ -112,25 +121,19 @@ public class RMApplicationHistoryWriter extends CompositeService {
protected ApplicationHistoryStore createApplicationHistoryStore(
Configuration conf) {
// If the history writer is not enabled, a dummy store will be used to
// write nothing
if (historyServiceEnabled) {
try {
Class<? extends ApplicationHistoryStore> storeClass =
conf.getClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
FileSystemApplicationHistoryStore.class,
try {
Class<? extends ApplicationHistoryStore> storeClass =
conf.getClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
NullApplicationHistoryStore.class,
ApplicationHistoryStore.class);
return storeClass.newInstance();
} catch (Exception e) {
String msg =
"Could not instantiate ApplicationHistoryWriter: "
+ conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE,
FileSystemApplicationHistoryStore.class.getName());
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
} else {
return new NullApplicationHistoryStore();
return storeClass.newInstance();
} catch (Exception e) {
String msg =
"Could not instantiate ApplicationHistoryWriter: "
+ conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE,
NullApplicationHistoryStore.class.getName());
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
}

View File

@ -78,6 +78,8 @@ public class TestRMApplicationHistoryWriter {
store = new MemoryApplicationHistoryStore();
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
writer = new RMApplicationHistoryWriter() {
@Override
@ -174,6 +176,22 @@ public class TestRMApplicationHistoryWriter {
return container;
}
@Test
public void testDefaultStoreSetup() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
RMApplicationHistoryWriter writer = new RMApplicationHistoryWriter();
writer.init(conf);
writer.start();
try {
Assert.assertFalse(writer.historyServiceEnabled);
Assert.assertNull(writer.writer);
} finally {
writer.stop();
writer.close();
}
}
@Test
public void testWriteApplication() throws Exception {
RMApp app = createRMApp(ApplicationId.newInstance(0, 1));