Merge from trunk to branch pt 2
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1612941 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
22ba00de60
|
@ -396,6 +396,12 @@ Trunk (Unreleased)
|
|||
HADOOP-10840. Fix OutOfMemoryError caused by metrics system in Azure File
|
||||
System. (Shanyu Zhao via cnauroth)
|
||||
|
||||
HADOOP-10826. Iteration on KeyProviderFactory.serviceLoader is
|
||||
thread-unsafe. (benoyantony viat tucu)
|
||||
|
||||
HADOOP-10881. Clarify usage of encryption and encrypted encryption
|
||||
key in KeyProviderCryptoExtension. (wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
||||
|
@ -441,6 +447,11 @@ Release 2.6.0 - UNRELEASED
|
|||
HADOOP-10817. ProxyUsers configuration should support configurable
|
||||
prefixes. (tucu)
|
||||
|
||||
HADOOP-10755. Support negative caching of user-group mapping.
|
||||
(Lei Xu via wang)
|
||||
|
||||
HADOOP-10855. Allow Text to be read with a known Length. (todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -465,6 +476,13 @@ Release 2.6.0 - UNRELEASED
|
|||
HADOOP-10857. Native Libraries Guide doen't mention a dependency on
|
||||
openssl-development package (ozawa via cmccabe)
|
||||
|
||||
HADOOP-10866. RawLocalFileSystem fails to read symlink targets via the stat
|
||||
command when the format of the stat command uses non-curly quotes (yzhang
|
||||
via cmccabe)
|
||||
|
||||
HADOOP-10830. Missing lock in JavaKeyStoreProvider.createCredentialEntry.
|
||||
(Benoy Antony via umamahesh)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -787,6 +805,12 @@ Release 2.5.0 - UNRELEASED
|
|||
HADOOP-10710. hadoop.auth cookie is not properly constructed according to
|
||||
RFC2109. (Juan Yu via tucu)
|
||||
|
||||
HADOOP-10864. Tool documentenation is broken. (Akira Ajisaka
|
||||
via Arpit Agarwal)
|
||||
|
||||
HADOOP-10872. TestPathData fails intermittently with "Mkdirs failed
|
||||
to create d1". (Yongjun Zhang via Arpit Agarwal)
|
||||
|
||||
Release 2.4.1 - 2014-06-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.crypto.key;
|
|||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.SecureRandom;
|
||||
|
||||
import javax.crypto.Cipher;
|
||||
import javax.crypto.spec.IvParameterSpec;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
@ -30,51 +29,109 @@ import com.google.common.base.Preconditions;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A KeyProvider with Cytographic Extensions specifically for generating
|
||||
* Encrypted Keys as well as decrypting them
|
||||
* A KeyProvider with Cryptographic Extensions specifically for generating
|
||||
* and decrypting encrypted encryption keys.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class KeyProviderCryptoExtension extends
|
||||
KeyProviderExtension<KeyProviderCryptoExtension.CryptoExtension> {
|
||||
|
||||
/**
|
||||
* Designates an encrypted encryption key, or EEK.
|
||||
*/
|
||||
public static final String EEK = "EEK";
|
||||
/**
|
||||
* Designates a decrypted encrypted encryption key, that is, an encryption key
|
||||
* (EK).
|
||||
*/
|
||||
public static final String EK = "EK";
|
||||
|
||||
/**
|
||||
* This is a holder class whose instance contains the keyVersionName, iv
|
||||
* used to generate the encrypted Key and the encrypted KeyVersion
|
||||
* An encrypted encryption key (EEK) and related information. An EEK must be
|
||||
* decrypted using the key's encryption key before it can be used.
|
||||
*/
|
||||
public static class EncryptedKeyVersion {
|
||||
private String keyName;
|
||||
private String keyVersionName;
|
||||
private byte[] iv;
|
||||
private KeyVersion encryptedKey;
|
||||
private String encryptionKeyName;
|
||||
private String encryptionKeyVersionName;
|
||||
private byte[] encryptedKeyIv;
|
||||
private KeyVersion encryptedKeyVersion;
|
||||
|
||||
protected EncryptedKeyVersion(String keyName, String keyVersionName,
|
||||
byte[] iv, KeyVersion encryptedKey) {
|
||||
this.keyName = keyName;
|
||||
this.keyVersionName = keyVersionName;
|
||||
this.iv = iv;
|
||||
this.encryptedKey = encryptedKey;
|
||||
/**
|
||||
* Create a new EncryptedKeyVersion.
|
||||
*
|
||||
* @param keyName Name of the encryption key used to
|
||||
* encrypt the encrypted key.
|
||||
* @param encryptionKeyVersionName Version name of the encryption key used
|
||||
* to encrypt the encrypted key.
|
||||
* @param encryptedKeyIv Initialization vector of the encrypted
|
||||
* key. The IV of the encryption key used to
|
||||
* encrypt the encrypted key is derived from
|
||||
* this IV.
|
||||
* @param encryptedKeyVersion The encrypted encryption key version.
|
||||
*/
|
||||
protected EncryptedKeyVersion(String keyName,
|
||||
String encryptionKeyVersionName, byte[] encryptedKeyIv,
|
||||
KeyVersion encryptedKeyVersion) {
|
||||
this.encryptionKeyName = keyName;
|
||||
this.encryptionKeyVersionName = encryptionKeyVersionName;
|
||||
this.encryptedKeyIv = encryptedKeyIv;
|
||||
this.encryptedKeyVersion = encryptedKeyVersion;
|
||||
}
|
||||
|
||||
public String getKeyName() {
|
||||
return keyName;
|
||||
/**
|
||||
* @return Name of the encryption key used to encrypt the encrypted key.
|
||||
*/
|
||||
public String getEncryptionKeyName() {
|
||||
return encryptionKeyName;
|
||||
}
|
||||
|
||||
public String getKeyVersionName() {
|
||||
return keyVersionName;
|
||||
/**
|
||||
* @return Version name of the encryption key used to encrypt the encrypted
|
||||
* key.
|
||||
*/
|
||||
public String getEncryptionKeyVersionName() {
|
||||
return encryptionKeyVersionName;
|
||||
}
|
||||
|
||||
public byte[] getIv() {
|
||||
return iv;
|
||||
/**
|
||||
* @return Initialization vector of the encrypted key. The IV of the
|
||||
* encryption key used to encrypt the encrypted key is derived from this
|
||||
* IV.
|
||||
*/
|
||||
public byte[] getEncryptedKeyIv() {
|
||||
return encryptedKeyIv;
|
||||
}
|
||||
|
||||
public KeyVersion getEncryptedKey() {
|
||||
return encryptedKey;
|
||||
/**
|
||||
* @return The encrypted encryption key version.
|
||||
*/
|
||||
public KeyVersion getEncryptedKeyVersion() {
|
||||
return encryptedKeyVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Derive the initialization vector (IV) for the encryption key from the IV
|
||||
* of the encrypted key. This derived IV is used with the encryption key to
|
||||
* decrypt the encrypted key.
|
||||
* <p/>
|
||||
* The alternative to this is using the same IV for both the encryption key
|
||||
* and the encrypted key. Even a simple symmetric transformation like this
|
||||
* improves security by avoiding IV re-use. IVs will also be fairly unique
|
||||
* among different EEKs.
|
||||
*
|
||||
* @param encryptedKeyIV of the encrypted key (i.e. {@link
|
||||
* #getEncryptedKeyIv()})
|
||||
* @return IV for the encryption key
|
||||
*/
|
||||
protected static byte[] deriveIV(byte[] encryptedKeyIV) {
|
||||
byte[] rIv = new byte[encryptedKeyIV.length];
|
||||
// Do a simple XOR transformation to flip all the bits
|
||||
for (int i = 0; i < encryptedKeyIV.length; i++) {
|
||||
rIv[i] = (byte) (encryptedKeyIV[i] ^ 0xff);
|
||||
}
|
||||
return rIv;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -141,53 +198,56 @@ public class KeyProviderCryptoExtension extends
|
|||
this.keyProvider = keyProvider;
|
||||
}
|
||||
|
||||
// the IV used to encrypt a EK typically will be the same IV used to
|
||||
// encrypt data with the EK. To avoid any chance of weakening the
|
||||
// encryption because the same IV is used, we simply XOR the IV thus we
|
||||
// are not using the same IV for 2 different encryptions (even if they
|
||||
// are done using different keys)
|
||||
private byte[] flipIV(byte[] iv) {
|
||||
byte[] rIv = new byte[iv.length];
|
||||
for (int i = 0; i < iv.length; i++) {
|
||||
rIv[i] = (byte) (iv[i] ^ 0xff);
|
||||
}
|
||||
return rIv;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName)
|
||||
throws IOException, GeneralSecurityException {
|
||||
KeyVersion keyVer = keyProvider.getCurrentKey(encryptionKeyName);
|
||||
Preconditions.checkNotNull(keyVer, "No KeyVersion exists for key '%s' ",
|
||||
encryptionKeyName);
|
||||
byte[] newKey = new byte[keyVer.getMaterial().length];
|
||||
SecureRandom.getInstance("SHA1PRNG").nextBytes(newKey);
|
||||
// Fetch the encryption key
|
||||
KeyVersion encryptionKey = keyProvider.getCurrentKey(encryptionKeyName);
|
||||
Preconditions.checkNotNull(encryptionKey,
|
||||
"No KeyVersion exists for key '%s' ", encryptionKeyName);
|
||||
// Generate random bytes for new key and IV
|
||||
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
|
||||
byte[] iv = SecureRandom.getSeed(cipher.getBlockSize());
|
||||
cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(keyVer.getMaterial(),
|
||||
"AES"), new IvParameterSpec(flipIV(iv)));
|
||||
byte[] ek = cipher.doFinal(newKey);
|
||||
SecureRandom random = SecureRandom.getInstance("SHA1PRNG");
|
||||
final byte[] newKey = new byte[encryptionKey.getMaterial().length];
|
||||
random.nextBytes(newKey);
|
||||
final byte[] iv = random.generateSeed(cipher.getBlockSize());
|
||||
// Encryption key IV is derived from new key's IV
|
||||
final byte[] encryptionIV = EncryptedKeyVersion.deriveIV(iv);
|
||||
// Encrypt the new key
|
||||
cipher.init(Cipher.ENCRYPT_MODE,
|
||||
new SecretKeySpec(encryptionKey.getMaterial(), "AES"),
|
||||
new IvParameterSpec(encryptionIV));
|
||||
final byte[] encryptedKey = cipher.doFinal(newKey);
|
||||
return new EncryptedKeyVersion(encryptionKeyName,
|
||||
keyVer.getVersionName(), iv,
|
||||
new KeyVersion(keyVer.getName(), EEK, ek));
|
||||
encryptionKey.getVersionName(), iv,
|
||||
new KeyVersion(encryptionKey.getName(), EEK, encryptedKey));
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyVersion decryptEncryptedKey(
|
||||
EncryptedKeyVersion encryptedKeyVersion) throws IOException,
|
||||
GeneralSecurityException {
|
||||
KeyVersion keyVer =
|
||||
keyProvider.getKeyVersion(encryptedKeyVersion.getKeyVersionName());
|
||||
Preconditions.checkNotNull(keyVer, "KeyVersion name '%s' does not exist",
|
||||
encryptedKeyVersion.getKeyVersionName());
|
||||
KeyVersion keyVersion = encryptedKeyVersion.getEncryptedKey();
|
||||
// Fetch the encryption key material
|
||||
final String encryptionKeyVersionName =
|
||||
encryptedKeyVersion.getEncryptionKeyVersionName();
|
||||
final KeyVersion encryptionKey =
|
||||
keyProvider.getKeyVersion(encryptionKeyVersionName);
|
||||
Preconditions.checkNotNull(encryptionKey,
|
||||
"KeyVersion name '%s' does not exist", encryptionKeyVersionName);
|
||||
final byte[] encryptionKeyMaterial = encryptionKey.getMaterial();
|
||||
// Encryption key IV is determined from encrypted key's IV
|
||||
final byte[] encryptionIV =
|
||||
EncryptedKeyVersion.deriveIV(encryptedKeyVersion.getEncryptedKeyIv());
|
||||
// Init the cipher with encryption key parameters
|
||||
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
|
||||
cipher.init(Cipher.DECRYPT_MODE,
|
||||
new SecretKeySpec(keyVersion.getMaterial(), "AES"),
|
||||
new IvParameterSpec(flipIV(encryptedKeyVersion.getIv())));
|
||||
byte[] ek =
|
||||
cipher.doFinal(encryptedKeyVersion.getEncryptedKey().getMaterial());
|
||||
return new KeyVersion(keyVer.getName(), EK, ek);
|
||||
new SecretKeySpec(encryptionKeyMaterial, "AES"),
|
||||
new IvParameterSpec(encryptionIV));
|
||||
// Decrypt the encrypted key
|
||||
final KeyVersion encryptedKV =
|
||||
encryptedKeyVersion.getEncryptedKeyVersion();
|
||||
final byte[] decryptedKey = cipher.doFinal(encryptedKV.getMaterial());
|
||||
return new KeyVersion(encryptionKey.getName(), EK, decryptedKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.ServiceLoader;
|
||||
|
||||
|
@ -47,6 +48,15 @@ public abstract class KeyProviderFactory {
|
|||
private static final ServiceLoader<KeyProviderFactory> serviceLoader =
|
||||
ServiceLoader.load(KeyProviderFactory.class);
|
||||
|
||||
// Iterate through the serviceLoader to avoid lazy loading.
|
||||
// Lazy loading would require synchronization in concurrent use cases.
|
||||
static {
|
||||
Iterator<KeyProviderFactory> iterServices = serviceLoader.iterator();
|
||||
while (iterServices.hasNext()) {
|
||||
iterServices.next();
|
||||
}
|
||||
}
|
||||
|
||||
public static List<KeyProvider> getProviders(Configuration conf
|
||||
) throws IOException {
|
||||
List<KeyProvider> result = new ArrayList<KeyProvider>();
|
||||
|
|
|
@ -646,25 +646,28 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension {
|
|||
public KeyVersion decryptEncryptedKey(
|
||||
EncryptedKeyVersion encryptedKeyVersion) throws IOException,
|
||||
GeneralSecurityException {
|
||||
checkNotNull(encryptedKeyVersion.getKeyVersionName(), "versionName");
|
||||
checkNotNull(encryptedKeyVersion.getIv(), "iv");
|
||||
Preconditions.checkArgument(encryptedKeyVersion.getEncryptedKey()
|
||||
.getVersionName().equals(KeyProviderCryptoExtension.EEK),
|
||||
checkNotNull(encryptedKeyVersion.getEncryptionKeyVersionName(),
|
||||
"versionName");
|
||||
checkNotNull(encryptedKeyVersion.getEncryptedKeyIv(), "iv");
|
||||
Preconditions.checkArgument(
|
||||
encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
|
||||
.equals(KeyProviderCryptoExtension.EEK),
|
||||
"encryptedKey version name must be '%s', is '%s'",
|
||||
KeyProviderCryptoExtension.EK, encryptedKeyVersion.getEncryptedKey()
|
||||
.getVersionName());
|
||||
checkNotNull(encryptedKeyVersion.getEncryptedKey(), "encryptedKey");
|
||||
KeyProviderCryptoExtension.EK,
|
||||
encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
|
||||
);
|
||||
checkNotNull(encryptedKeyVersion.getEncryptedKeyVersion(), "encryptedKey");
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_DECRYPT);
|
||||
Map<String, Object> jsonPayload = new HashMap<String, Object>();
|
||||
jsonPayload.put(KMSRESTConstants.NAME_FIELD,
|
||||
encryptedKeyVersion.getKeyName());
|
||||
encryptedKeyVersion.getEncryptionKeyName());
|
||||
jsonPayload.put(KMSRESTConstants.IV_FIELD, Base64.encodeBase64String(
|
||||
encryptedKeyVersion.getIv()));
|
||||
encryptedKeyVersion.getEncryptedKeyIv()));
|
||||
jsonPayload.put(KMSRESTConstants.MATERIAL_FIELD, Base64.encodeBase64String(
|
||||
encryptedKeyVersion.getEncryptedKey().getMaterial()));
|
||||
encryptedKeyVersion.getEncryptedKeyVersion().getMaterial()));
|
||||
URL url = createURL(KMSRESTConstants.KEY_VERSION_RESOURCE,
|
||||
encryptedKeyVersion.getKeyVersionName(),
|
||||
encryptedKeyVersion.getEncryptionKeyVersionName(),
|
||||
KMSRESTConstants.EEK_SUB_RESOURCE, params);
|
||||
HttpURLConnection conn = createConnection(url, HTTP_POST);
|
||||
conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME);
|
||||
|
|
|
@ -437,7 +437,9 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
|
|||
throw new FileNotFoundException("Parent directory doesn't exist: "
|
||||
+ parent);
|
||||
} else if (!mkdirs(parent)) {
|
||||
throw new IOException("Mkdirs failed to create " + parent);
|
||||
throw new IOException("Mkdirs failed to create " + parent
|
||||
+ " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
|
||||
+ ")");
|
||||
}
|
||||
}
|
||||
final FSDataOutputStream out;
|
||||
|
|
|
@ -250,6 +250,12 @@ public class CommonConfigurationKeysPublic {
|
|||
public static final long HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT =
|
||||
300;
|
||||
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
||||
public static final String HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS =
|
||||
"hadoop.security.groups.negative-cache.secs";
|
||||
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
||||
public static final long HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS_DEFAULT =
|
||||
30;
|
||||
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
||||
public static final String HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS =
|
||||
"hadoop.security.groups.cache.warn.after.ms";
|
||||
public static final long HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT =
|
||||
|
|
|
@ -128,6 +128,8 @@ public class Stat extends Shell {
|
|||
" link " + original);
|
||||
}
|
||||
// 6,symbolic link,6,1373584236,1373584236,lrwxrwxrwx,andrew,andrew,`link' -> `target'
|
||||
// OR
|
||||
// 6,symbolic link,6,1373584236,1373584236,lrwxrwxrwx,andrew,andrew,'link' -> 'target'
|
||||
StringTokenizer tokens = new StringTokenizer(line, ",");
|
||||
try {
|
||||
long length = Long.parseLong(tokens.nextToken());
|
||||
|
@ -147,18 +149,17 @@ public class Stat extends Shell {
|
|||
String group = tokens.nextToken();
|
||||
String symStr = tokens.nextToken();
|
||||
// 'notalink'
|
||||
// 'link' -> `target'
|
||||
// `link' -> `target' OR 'link' -> 'target'
|
||||
// '' -> ''
|
||||
Path symlink = null;
|
||||
StringTokenizer symTokens = new StringTokenizer(symStr, "`");
|
||||
symTokens.nextToken();
|
||||
String parts[] = symStr.split(" -> ");
|
||||
try {
|
||||
String target = symTokens.nextToken();
|
||||
target = target.substring(0, target.length()-1);
|
||||
String target = parts[1];
|
||||
target = target.substring(1, target.length()-1);
|
||||
if (!target.isEmpty()) {
|
||||
symlink = new Path(target);
|
||||
}
|
||||
} catch (NoSuchElementException e) {
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
// null if not a symlink
|
||||
}
|
||||
// Set stat
|
||||
|
|
|
@ -288,9 +288,7 @@ public class Text extends BinaryComparable
|
|||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
int newLength = WritableUtils.readVInt(in);
|
||||
setCapacity(newLength, false);
|
||||
in.readFully(bytes, 0, newLength);
|
||||
length = newLength;
|
||||
readWithKnownLength(in, newLength);
|
||||
}
|
||||
|
||||
public void readFields(DataInput in, int maxLength) throws IOException {
|
||||
|
@ -302,9 +300,7 @@ public class Text extends BinaryComparable
|
|||
throw new IOException("tried to deserialize " + newLength +
|
||||
" bytes of data, but maxLength = " + maxLength);
|
||||
}
|
||||
setCapacity(newLength, false);
|
||||
in.readFully(bytes, 0, newLength);
|
||||
length = newLength;
|
||||
readWithKnownLength(in, newLength);
|
||||
}
|
||||
|
||||
/** Skips over one Text in the input. */
|
||||
|
@ -313,6 +309,17 @@ public class Text extends BinaryComparable
|
|||
WritableUtils.skipFully(in, length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a Text object whose length is already known.
|
||||
* This allows creating Text from a stream which uses a different serialization
|
||||
* format.
|
||||
*/
|
||||
public void readWithKnownLength(DataInput in, int len) throws IOException {
|
||||
setCapacity(len, false);
|
||||
in.readFully(bytes, 0, len);
|
||||
length = len;
|
||||
}
|
||||
|
||||
/** serialize
|
||||
* write this object to out
|
||||
* length uses zero-compressed encoding
|
||||
|
|
|
@ -883,8 +883,8 @@ public class NetworkTopology {
|
|||
* @param seed Used to seed the pseudo-random generator that randomizes the
|
||||
* set of nodes at each network distance.
|
||||
*/
|
||||
public void sortByDistance(Node reader, Node[] nodes,
|
||||
int activeLen, long seed) {
|
||||
public void sortByDistance(Node reader, Node[] nodes, int activeLen,
|
||||
long seed, boolean randomizeBlockLocationsPerBlock) {
|
||||
/** Sort weights for the nodes array */
|
||||
int[] weights = new int[activeLen];
|
||||
for (int i=0; i<activeLen; i++) {
|
||||
|
@ -906,8 +906,11 @@ public class NetworkTopology {
|
|||
// Seed is normally the block id
|
||||
// This means we use the same pseudo-random order for each block, for
|
||||
// potentially better page cache usage.
|
||||
// Seed is not used if we want to randomize block location for every block
|
||||
Random rand = getRandom();
|
||||
rand.setSeed(seed);
|
||||
if (!randomizeBlockLocationsPerBlock) {
|
||||
rand.setSeed(seed);
|
||||
}
|
||||
int idx = 0;
|
||||
for (List<Node> list: tree.values()) {
|
||||
if (list != null) {
|
||||
|
|
|
@ -279,8 +279,8 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
|
|||
* set of nodes at each network distance.
|
||||
*/
|
||||
@Override
|
||||
public void sortByDistance( Node reader, Node[] nodes,
|
||||
int activeLen, long seed) {
|
||||
public void sortByDistance(Node reader, Node[] nodes, int activeLen,
|
||||
long seed, boolean randomizeBlockLocationsPerBlock) {
|
||||
// If reader is not a datanode (not in NetworkTopology tree), we need to
|
||||
// replace this reader with a sibling leaf node in tree.
|
||||
if (reader != null && !this.contains(reader)) {
|
||||
|
@ -293,7 +293,8 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
|
|||
return;
|
||||
}
|
||||
}
|
||||
super.sortByDistance(reader, nodes, nodes.length, seed);
|
||||
super.sortByDistance(reader, nodes, nodes.length, seed,
|
||||
randomizeBlockLocationsPerBlock);
|
||||
}
|
||||
|
||||
/** InnerNodeWithNodeGroup represents a switch/router of a data center, rack
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -58,24 +58,35 @@ public class Groups {
|
|||
private final Map<String, List<String>> staticUserToGroupsMap =
|
||||
new HashMap<String, List<String>>();
|
||||
private final long cacheTimeout;
|
||||
private final long negativeCacheTimeout;
|
||||
private final long warningDeltaMs;
|
||||
private final Timer timer;
|
||||
|
||||
public Groups(Configuration conf) {
|
||||
this(conf, new Timer());
|
||||
}
|
||||
|
||||
public Groups(Configuration conf, Timer timer) {
|
||||
impl =
|
||||
ReflectionUtils.newInstance(
|
||||
conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||
ShellBasedUnixGroupsMapping.class,
|
||||
GroupMappingServiceProvider.class),
|
||||
conf);
|
||||
|
||||
|
||||
cacheTimeout =
|
||||
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
|
||||
negativeCacheTimeout =
|
||||
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS,
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS_DEFAULT) * 1000;
|
||||
warningDeltaMs =
|
||||
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS,
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT);
|
||||
parseStaticMapping(conf);
|
||||
|
||||
this.timer = timer;
|
||||
|
||||
if(LOG.isDebugEnabled())
|
||||
LOG.debug("Group mapping impl=" + impl.getClass().getName() +
|
||||
"; cacheTimeout=" + cacheTimeout + "; warningDeltaMs=" +
|
||||
|
@ -111,7 +122,29 @@ public class Groups {
|
|||
staticUserToGroupsMap.put(user, groups);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the CachedGroups is expired.
|
||||
* @param groups cached groups for one user.
|
||||
* @return true if groups is expired from useToGroupsMap.
|
||||
*/
|
||||
private boolean hasExpired(CachedGroups groups, long startMs) {
|
||||
if (groups == null) {
|
||||
return true;
|
||||
}
|
||||
long timeout = cacheTimeout;
|
||||
if (isNegativeCacheEnabled() && groups.getGroups().isEmpty()) {
|
||||
// This CachedGroups is in the negative cache, thus it should expire
|
||||
// sooner.
|
||||
timeout = negativeCacheTimeout;
|
||||
}
|
||||
return groups.getTimestamp() + timeout <= startMs;
|
||||
}
|
||||
|
||||
private boolean isNegativeCacheEnabled() {
|
||||
return negativeCacheTimeout > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the group memberships of a given user.
|
||||
* @param user User's name
|
||||
|
@ -126,18 +159,22 @@ public class Groups {
|
|||
}
|
||||
// Return cached value if available
|
||||
CachedGroups groups = userToGroupsMap.get(user);
|
||||
long startMs = Time.monotonicNow();
|
||||
// if cache has a value and it hasn't expired
|
||||
if (groups != null && (groups.getTimestamp() + cacheTimeout > startMs)) {
|
||||
long startMs = timer.monotonicNow();
|
||||
if (!hasExpired(groups, startMs)) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Returning cached groups for '" + user + "'");
|
||||
}
|
||||
if (groups.getGroups().isEmpty()) {
|
||||
// Even with enabling negative cache, getGroups() has the same behavior
|
||||
// that throws IOException if the groups for the user is empty.
|
||||
throw new IOException("No groups found for user " + user);
|
||||
}
|
||||
return groups.getGroups();
|
||||
}
|
||||
|
||||
// Create and cache user's groups
|
||||
List<String> groupList = impl.getGroups(user);
|
||||
long endMs = Time.monotonicNow();
|
||||
long endMs = timer.monotonicNow();
|
||||
long deltaMs = endMs - startMs ;
|
||||
UserGroupInformation.metrics.addGetGroups(deltaMs);
|
||||
if (deltaMs > warningDeltaMs) {
|
||||
|
@ -146,6 +183,9 @@ public class Groups {
|
|||
}
|
||||
groups = new CachedGroups(groupList, endMs);
|
||||
if (groups.getGroups().isEmpty()) {
|
||||
if (isNegativeCacheEnabled()) {
|
||||
userToGroupsMap.put(user, groups);
|
||||
}
|
||||
throw new IOException("No groups found for user " + user);
|
||||
}
|
||||
userToGroupsMap.put(user, groups);
|
||||
|
|
|
@ -201,7 +201,8 @@ public class LdapGroupsMapping
|
|||
} catch (CommunicationException e) {
|
||||
LOG.warn("Connection is closed, will try to reconnect");
|
||||
} catch (NamingException e) {
|
||||
LOG.warn("Exception trying to get groups for user " + user, e);
|
||||
LOG.warn("Exception trying to get groups for user " + user + ": "
|
||||
+ e.getMessage());
|
||||
return emptyResults;
|
||||
}
|
||||
|
||||
|
@ -215,7 +216,8 @@ public class LdapGroupsMapping
|
|||
} catch (CommunicationException e) {
|
||||
LOG.warn("Connection being closed, reconnecting failed, retryCount = " + retryCount);
|
||||
} catch (NamingException e) {
|
||||
LOG.warn("Exception trying to get groups for user " + user, e);
|
||||
LOG.warn("Exception trying to get groups for user " + user + ":"
|
||||
+ e.getMessage());
|
||||
return emptyResults;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,7 +84,8 @@ public class ShellBasedUnixGroupsMapping
|
|||
result = Shell.execCommand(Shell.getGroupsForUserCommand(user));
|
||||
} catch (ExitCodeException e) {
|
||||
// if we didn't get the group - just return empty list;
|
||||
LOG.warn("got exception trying to get groups for user " + user, e);
|
||||
LOG.warn("got exception trying to get groups for user " + user + ": "
|
||||
+ e.getMessage());
|
||||
return new LinkedList<String>();
|
||||
}
|
||||
|
||||
|
|
|
@ -194,15 +194,18 @@ public class JavaKeyStoreProvider extends CredentialProvider {
|
|||
@Override
|
||||
public CredentialEntry createCredentialEntry(String alias, char[] credential)
|
||||
throws IOException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (keyStore.containsAlias(alias) || cache.containsKey(alias)) {
|
||||
throw new IOException("Credential " + alias + " already exists in " + this);
|
||||
}
|
||||
return innerSetCredential(alias, credential);
|
||||
} catch (KeyStoreException e) {
|
||||
throw new IOException("Problem looking up credential " + alias + " in " + this,
|
||||
e);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
return innerSetCredential(alias, credential);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Utility methods for getting the time and computing intervals.
|
||||
*
|
||||
* It has the same behavior as {{@link Time}}, with the exception that its
|
||||
* functions can be overridden for dependency injection purposes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class Timer {
|
||||
/**
|
||||
* Current system time. Do not use this to calculate a duration or interval
|
||||
* to sleep, because it will be broken by settimeofday. Instead, use
|
||||
* monotonicNow.
|
||||
* @return current time in msec.
|
||||
*/
|
||||
public long now() {
|
||||
return Time.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Current time from some arbitrary time base in the past, counting in
|
||||
* milliseconds, and not affected by settimeofday or similar system clock
|
||||
* changes. This is appropriate to use when computing how much longer to
|
||||
* wait for an interval to expire.
|
||||
* @return a monotonic clock that counts in milliseconds.
|
||||
*/
|
||||
public long monotonicNow() { return Time.monotonicNow(); }
|
||||
}
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurable;
|
|||
*
|
||||
* <p><code>Tool</code>, is the standard for any Map-Reduce tool/application.
|
||||
* The tool/application should delegate the handling of
|
||||
* <a href="{@docRoot}/org/apache/hadoop/util/GenericOptionsParser.html#GenericOptions">
|
||||
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/CommandsManual.html#Generic_Options">
|
||||
* standard command-line options</a> to {@link ToolRunner#run(Tool, String[])}
|
||||
* and only handle its custom arguments.</p>
|
||||
*
|
||||
|
|
|
@ -197,6 +197,20 @@ for ldap providers in the same way as above does.
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.security.groups.negative-cache.secs</name>
|
||||
<value>30</value>
|
||||
<description>
|
||||
Expiration time for entries in the the negative user-to-group mapping
|
||||
caching, in seconds. This is useful when invalid users are retrying
|
||||
frequently. It is suggested to set a small value for this expiration, since
|
||||
a transient error in group lookup could temporarily lock out a legitimate
|
||||
user.
|
||||
|
||||
Set this to zero or negative value to disable negative user-to-group caching.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.security.groups.cache.warn.after.ms</name>
|
||||
<value>5000</value>
|
||||
|
|
|
@ -17,51 +17,112 @@
|
|||
*/
|
||||
package org.apache.hadoop.crypto.key;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Arrays;
|
||||
|
||||
import javax.crypto.Cipher;
|
||||
import javax.crypto.spec.IvParameterSpec;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestKeyProviderCryptoExtension {
|
||||
|
||||
private static final String CIPHER = "AES";
|
||||
private static final String ENCRYPTION_KEY_NAME = "fooKey";
|
||||
|
||||
private static Configuration conf;
|
||||
private static KeyProvider kp;
|
||||
private static KeyProviderCryptoExtension kpExt;
|
||||
private static KeyProvider.Options options;
|
||||
private static KeyVersion encryptionKey;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
conf = new Configuration();
|
||||
kp = new UserProvider.Factory().createProvider(new URI("user:///"), conf);
|
||||
kpExt = KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
|
||||
options = new KeyProvider.Options(conf);
|
||||
options.setCipher(CIPHER);
|
||||
options.setBitLength(128);
|
||||
encryptionKey =
|
||||
kp.createKey(ENCRYPTION_KEY_NAME, SecureRandom.getSeed(16), options);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenerateEncryptedKey() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
KeyProvider kp =
|
||||
new UserProvider.Factory().createProvider(new URI("user:///"), conf);
|
||||
KeyProvider.Options options = new KeyProvider.Options(conf);
|
||||
options.setCipher(CIPHER);
|
||||
options.setBitLength(128);
|
||||
KeyProvider.KeyVersion kv = kp.createKey("foo", SecureRandom.getSeed(16),
|
||||
options);
|
||||
KeyProviderCryptoExtension kpExt =
|
||||
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
|
||||
|
||||
// Generate a new EEK and check it
|
||||
KeyProviderCryptoExtension.EncryptedKeyVersion ek1 =
|
||||
kpExt.generateEncryptedKey(kv.getName());
|
||||
Assert.assertEquals(KeyProviderCryptoExtension.EEK,
|
||||
ek1.getEncryptedKey().getVersionName());
|
||||
Assert.assertEquals("foo", ek1.getKeyName());
|
||||
Assert.assertNotNull(ek1.getEncryptedKey().getMaterial());
|
||||
Assert.assertEquals(kv.getMaterial().length,
|
||||
ek1.getEncryptedKey().getMaterial().length);
|
||||
KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
|
||||
Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
|
||||
KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
|
||||
Assert.assertArrayEquals(k1.getMaterial(), k1a.getMaterial());
|
||||
Assert.assertEquals(kv.getMaterial().length, k1.getMaterial().length);
|
||||
kpExt.generateEncryptedKey(encryptionKey.getName());
|
||||
assertEquals("Version name of EEK should be EEK",
|
||||
KeyProviderCryptoExtension.EEK,
|
||||
ek1.getEncryptedKeyVersion().getVersionName());
|
||||
assertEquals("Name of EEK should be encryption key name",
|
||||
ENCRYPTION_KEY_NAME, ek1.getEncryptionKeyName());
|
||||
assertNotNull("Expected encrypted key material",
|
||||
ek1.getEncryptedKeyVersion().getMaterial());
|
||||
assertEquals("Length of encryption key material and EEK material should "
|
||||
+ "be the same", encryptionKey.getMaterial().length,
|
||||
ek1.getEncryptedKeyVersion().getMaterial().length
|
||||
);
|
||||
|
||||
KeyProviderCryptoExtension.EncryptedKeyVersion ek2 =
|
||||
kpExt.generateEncryptedKey(kv.getName());
|
||||
KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
|
||||
boolean eq = true;
|
||||
for (int i = 0; eq && i < ek2.getEncryptedKey().getMaterial().length; i++) {
|
||||
eq = k2.getMaterial()[i] == k1.getMaterial()[i];
|
||||
// Decrypt EEK into an EK and check it
|
||||
KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
|
||||
assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
|
||||
assertEquals(encryptionKey.getMaterial().length, k1.getMaterial().length);
|
||||
if (Arrays.equals(k1.getMaterial(), encryptionKey.getMaterial())) {
|
||||
fail("Encrypted key material should not equal encryption key material");
|
||||
}
|
||||
Assert.assertFalse(eq);
|
||||
if (Arrays.equals(ek1.getEncryptedKeyVersion().getMaterial(),
|
||||
encryptionKey.getMaterial())) {
|
||||
fail("Encrypted key material should not equal decrypted key material");
|
||||
}
|
||||
// Decrypt it again and it should be the same
|
||||
KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
|
||||
assertArrayEquals(k1.getMaterial(), k1a.getMaterial());
|
||||
|
||||
// Generate another EEK and make sure it's different from the first
|
||||
KeyProviderCryptoExtension.EncryptedKeyVersion ek2 =
|
||||
kpExt.generateEncryptedKey(encryptionKey.getName());
|
||||
KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
|
||||
if (Arrays.equals(k1.getMaterial(), k2.getMaterial())) {
|
||||
fail("Generated EEKs should have different material!");
|
||||
}
|
||||
if (Arrays.equals(ek1.getEncryptedKeyIv(), ek2.getEncryptedKeyIv())) {
|
||||
fail("Generated EEKs should have different IVs!");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptDecrypt() throws Exception {
|
||||
// Get an EEK
|
||||
KeyProviderCryptoExtension.EncryptedKeyVersion eek =
|
||||
kpExt.generateEncryptedKey(encryptionKey.getName());
|
||||
final byte[] encryptedKeyIv = eek.getEncryptedKeyIv();
|
||||
final byte[] encryptedKeyMaterial = eek.getEncryptedKeyVersion()
|
||||
.getMaterial();
|
||||
// Decrypt it manually
|
||||
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
|
||||
cipher.init(Cipher.DECRYPT_MODE,
|
||||
new SecretKeySpec(encryptionKey.getMaterial(), "AES"),
|
||||
new IvParameterSpec(KeyProviderCryptoExtension.EncryptedKeyVersion
|
||||
.deriveIV(encryptedKeyIv)));
|
||||
final byte[] manualMaterial = cipher.doFinal(encryptedKeyMaterial);
|
||||
// Decrypt it with the API
|
||||
KeyVersion decryptedKey = kpExt.decryptEncryptedKey(eek);
|
||||
final byte[] apiMaterial = decryptedKey.getMaterial();
|
||||
|
||||
assertArrayEquals("Wrong key material from decryptEncryptedKey",
|
||||
manualMaterial, apiMaterial);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,15 +45,15 @@ public class TestStat extends FileSystemTestHelper {
|
|||
final String doesNotExist;
|
||||
final String directory;
|
||||
final String file;
|
||||
final String symlink;
|
||||
final String[] symlinks;
|
||||
final String stickydir;
|
||||
|
||||
StatOutput(String doesNotExist, String directory, String file,
|
||||
String symlink, String stickydir) {
|
||||
String[] symlinks, String stickydir) {
|
||||
this.doesNotExist = doesNotExist;
|
||||
this.directory = directory;
|
||||
this.file = file;
|
||||
this.symlink = symlink;
|
||||
this.symlinks = symlinks;
|
||||
this.stickydir = stickydir;
|
||||
}
|
||||
|
||||
|
@ -78,10 +78,12 @@ public class TestStat extends FileSystemTestHelper {
|
|||
status = stat.getFileStatusForTesting();
|
||||
assertTrue(status.isFile());
|
||||
|
||||
br = new BufferedReader(new StringReader(symlink));
|
||||
stat.parseExecResult(br);
|
||||
status = stat.getFileStatusForTesting();
|
||||
assertTrue(status.isSymlink());
|
||||
for (String symlink : symlinks) {
|
||||
br = new BufferedReader(new StringReader(symlink));
|
||||
stat.parseExecResult(br);
|
||||
status = stat.getFileStatusForTesting();
|
||||
assertTrue(status.isSymlink());
|
||||
}
|
||||
|
||||
br = new BufferedReader(new StringReader(stickydir));
|
||||
stat.parseExecResult(br);
|
||||
|
@ -93,22 +95,30 @@ public class TestStat extends FileSystemTestHelper {
|
|||
|
||||
@Test(timeout=10000)
|
||||
public void testStatLinux() throws Exception {
|
||||
String[] symlinks = new String[] {
|
||||
"6,symbolic link,1373584236,1373584236,777,andrew,andrew,`link' -> `target'",
|
||||
"6,symbolic link,1373584236,1373584236,777,andrew,andrew,'link' -> 'target'"
|
||||
};
|
||||
StatOutput linux = new StatOutput(
|
||||
"stat: cannot stat `watermelon': No such file or directory",
|
||||
"4096,directory,1373584236,1373586485,755,andrew,root,`.'",
|
||||
"0,regular empty file,1373584228,1373584228,644,andrew,andrew,`target'",
|
||||
"6,symbolic link,1373584236,1373584236,777,andrew,andrew,`link' -> `target'",
|
||||
symlinks,
|
||||
"4096,directory,1374622334,1375124212,1755,andrew,andrew,`stickydir'");
|
||||
linux.test();
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testStatFreeBSD() throws Exception {
|
||||
String[] symlinks = new String[] {
|
||||
"6,Symbolic Link,1373508941,1373508941,120755,awang,awang,`link' -> `target'"
|
||||
};
|
||||
|
||||
StatOutput freebsd = new StatOutput(
|
||||
"stat: symtest/link: stat: No such file or directory",
|
||||
"512,Directory,1373583695,1373583669,40755,awang,awang,`link' -> `'",
|
||||
"0,Regular File,1373508937,1373508937,100644,awang,awang,`link' -> `'",
|
||||
"6,Symbolic Link,1373508941,1373508941,120755,awang,awang,`link' -> `target'",
|
||||
symlinks,
|
||||
"512,Directory,1375139537,1375139537,41755,awang,awang,`link' -> `'");
|
||||
freebsd.test();
|
||||
}
|
||||
|
|
|
@ -35,19 +35,22 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
public class TestPathData {
|
||||
private static final String TEST_ROOT_DIR =
|
||||
System.getProperty("test.build.data","build/test/data") + "/testPD";
|
||||
protected Configuration conf;
|
||||
protected FileSystem fs;
|
||||
protected Path testDir;
|
||||
|
||||
|
||||
@Before
|
||||
public void initialize() throws Exception {
|
||||
conf = new Configuration();
|
||||
fs = FileSystem.getLocal(conf);
|
||||
testDir = new Path(
|
||||
System.getProperty("test.build.data", "build/test/data") + "/testPD"
|
||||
);
|
||||
testDir = new Path(TEST_ROOT_DIR);
|
||||
|
||||
// don't want scheme on the path, just an absolute path
|
||||
testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
|
||||
fs.mkdirs(testDir);
|
||||
|
||||
FileSystem.setDefaultUri(conf, fs.getUri());
|
||||
fs.setWorkingDirectory(testDir);
|
||||
fs.mkdirs(new Path("d1"));
|
||||
|
@ -60,6 +63,7 @@ public class TestPathData {
|
|||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
fs.delete(testDir, true);
|
||||
fs.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.nio.BufferUnderflowException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.CharacterCodingException;
|
||||
import java.util.Random;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.primitives.Bytes;
|
||||
|
||||
/** Unit tests for LargeUTF8. */
|
||||
|
@ -363,6 +364,27 @@ public class TestText extends TestCase {
|
|||
fail("testReadWriteOperations error !!!");
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadWithKnownLength() throws IOException {
|
||||
String line = "hello world";
|
||||
byte[] inputBytes = line.getBytes(Charsets.UTF_8);
|
||||
DataInputBuffer in = new DataInputBuffer();
|
||||
Text text = new Text();
|
||||
|
||||
in.reset(inputBytes, inputBytes.length);
|
||||
text.readWithKnownLength(in, 5);
|
||||
assertEquals("hello", text.toString());
|
||||
|
||||
// Read longer length, make sure it lengthens
|
||||
in.reset(inputBytes, inputBytes.length);
|
||||
text.readWithKnownLength(in, 7);
|
||||
assertEquals("hello w", text.toString());
|
||||
|
||||
// Read shorter length, make sure it shortens
|
||||
in.reset(inputBytes, inputBytes.length);
|
||||
text.readWithKnownLength(in, 2);
|
||||
assertEquals("he", text.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* test {@code Text.bytesToCodePoint(bytes) }
|
||||
|
|
|
@ -105,7 +105,7 @@ public class TestNetworkTopologyWithNodeGroup {
|
|||
testNodes[2] = dataNodes[3];
|
||||
testNodes[3] = dataNodes[0];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] == dataNodes[1]);
|
||||
assertTrue(testNodes[2] == dataNodes[2]);
|
||||
|
@ -117,7 +117,7 @@ public class TestNetworkTopologyWithNodeGroup {
|
|||
testNodes[2] = dataNodes[1];
|
||||
testNodes[3] = dataNodes[0];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] == dataNodes[1]);
|
||||
|
||||
|
@ -127,7 +127,7 @@ public class TestNetworkTopologyWithNodeGroup {
|
|||
testNodes[2] = dataNodes[2];
|
||||
testNodes[3] = dataNodes[0];
|
||||
cluster.sortByDistance(dataNodes[0], testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] == dataNodes[2]);
|
||||
|
||||
|
@ -137,7 +137,7 @@ public class TestNetworkTopologyWithNodeGroup {
|
|||
testNodes[2] = dataNodes[2];
|
||||
testNodes[3] = dataNodes[0];
|
||||
cluster.sortByDistance(computeNode, testNodes,
|
||||
testNodes.length, 0xDEADBEEF);
|
||||
testNodes.length, 0xDEADBEEF, false);
|
||||
assertTrue(testNodes[0] == dataNodes[0]);
|
||||
assertTrue(testNodes[1] == dataNodes[2]);
|
||||
}
|
||||
|
|
|
@ -26,8 +26,11 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -94,6 +97,9 @@ public class TestGroupsCaching {
|
|||
|
||||
@Test
|
||||
public void testGroupsCaching() throws Exception {
|
||||
// Disable negative cache.
|
||||
conf.setLong(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0);
|
||||
Groups groups = new Groups(conf);
|
||||
groups.cacheGroupsAdd(Arrays.asList(myGroups));
|
||||
groups.refresh();
|
||||
|
@ -163,4 +169,54 @@ public class TestGroupsCaching {
|
|||
FakeunPrivilegedGroupMapping.invoked);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNegativeGroupCaching() throws Exception {
|
||||
final String user = "negcache";
|
||||
final String failMessage = "Did not throw IOException: ";
|
||||
conf.setLong(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 2);
|
||||
FakeTimer timer = new FakeTimer();
|
||||
Groups groups = new Groups(conf, timer);
|
||||
groups.cacheGroupsAdd(Arrays.asList(myGroups));
|
||||
groups.refresh();
|
||||
FakeGroupMapping.addToBlackList(user);
|
||||
|
||||
// In the first attempt, the user will be put in the negative cache.
|
||||
try {
|
||||
groups.getGroups(user);
|
||||
fail(failMessage + "Failed to obtain groups from FakeGroupMapping.");
|
||||
} catch (IOException e) {
|
||||
// Expects to raise exception for the first time. But the user will be
|
||||
// put into the negative cache
|
||||
GenericTestUtils.assertExceptionContains("No groups found for user", e);
|
||||
}
|
||||
|
||||
// The second time, the user is in the negative cache.
|
||||
try {
|
||||
groups.getGroups(user);
|
||||
fail(failMessage + "The user is in the negative cache.");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("No groups found for user", e);
|
||||
}
|
||||
|
||||
// Brings back the backend user-group mapping service.
|
||||
FakeGroupMapping.clearBlackList();
|
||||
|
||||
// It should still get groups from the negative cache.
|
||||
try {
|
||||
groups.getGroups(user);
|
||||
fail(failMessage + "The user is still in the negative cache, even " +
|
||||
"FakeGroupMapping has resumed.");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("No groups found for user", e);
|
||||
}
|
||||
|
||||
// Let the elements in the negative cache expire.
|
||||
timer.advance(4 * 1000);
|
||||
|
||||
// The groups for the user is expired in the negative cache, a new copy of
|
||||
// groups for the user is fetched.
|
||||
assertEquals(Arrays.asList(myGroups), groups.getGroups(user));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* FakeTimer can be used for test purposes to control the return values
|
||||
* from {{@link Timer}}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class FakeTimer extends Timer {
|
||||
private long nowMillis;
|
||||
|
||||
/** Constructs a FakeTimer with a non-zero value */
|
||||
public FakeTimer() {
|
||||
nowMillis = 1000; // Initialize with a non-trivial value.
|
||||
}
|
||||
|
||||
@Override
|
||||
public long now() {
|
||||
return nowMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long monotonicNow() {
|
||||
return nowMillis;
|
||||
}
|
||||
|
||||
/** Increases the time by milliseconds */
|
||||
public void advance(long advMillis) {
|
||||
nowMillis += advMillis;
|
||||
}
|
||||
}
|
|
@ -64,12 +64,12 @@ public class KMSServerJSONUtils {
|
|||
Map json = new LinkedHashMap();
|
||||
if (encryptedKeyVersion != null) {
|
||||
json.put(KMSRESTConstants.VERSION_NAME_FIELD,
|
||||
encryptedKeyVersion.getKeyVersionName());
|
||||
encryptedKeyVersion.getEncryptionKeyVersionName());
|
||||
json.put(KMSRESTConstants.IV_FIELD,
|
||||
Base64.encodeBase64URLSafeString(
|
||||
encryptedKeyVersion.getIv()));
|
||||
encryptedKeyVersion.getEncryptedKeyIv()));
|
||||
json.put(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD,
|
||||
toJSON(encryptedKeyVersion.getEncryptedKey()));
|
||||
toJSON(encryptedKeyVersion.getEncryptedKeyVersion()));
|
||||
}
|
||||
return json;
|
||||
}
|
||||
|
|
|
@ -485,10 +485,10 @@ public class TestKMS {
|
|||
|
||||
EncryptedKeyVersion ek1 = kpExt.generateEncryptedKey(kv.getName());
|
||||
Assert.assertEquals(KeyProviderCryptoExtension.EEK,
|
||||
ek1.getEncryptedKey().getVersionName());
|
||||
Assert.assertNotNull(ek1.getEncryptedKey().getMaterial());
|
||||
ek1.getEncryptedKeyVersion().getVersionName());
|
||||
Assert.assertNotNull(ek1.getEncryptedKeyVersion().getMaterial());
|
||||
Assert.assertEquals(kv.getMaterial().length,
|
||||
ek1.getEncryptedKey().getMaterial().length);
|
||||
ek1.getEncryptedKeyVersion().getMaterial().length);
|
||||
KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
|
||||
Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
|
||||
KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
|
||||
|
@ -498,8 +498,8 @@ public class TestKMS {
|
|||
EncryptedKeyVersion ek2 = kpExt.generateEncryptedKey(kv.getName());
|
||||
KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
|
||||
boolean isEq = true;
|
||||
for (int i = 0; isEq && i < ek2.getEncryptedKey().getMaterial().length;
|
||||
i++) {
|
||||
for (int i = 0; isEq && i < ek2.getEncryptedKeyVersion()
|
||||
.getMaterial().length; i++) {
|
||||
isEq = k2.getMaterial()[i] == k1.getMaterial()[i];
|
||||
}
|
||||
Assert.assertFalse(isEq);
|
||||
|
|
|
@ -1051,8 +1051,12 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
|
||||
@Override
|
||||
public REMOVE3Response remove(XDR xdr, RpcInfo info) {
|
||||
return remove(xdr, getSecurityHandler(info), info.remoteAddress());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) {
|
||||
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
|
||||
SecurityHandler securityHandler = getSecurityHandler(info);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
|
@ -1083,17 +1087,19 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
|||
return new REMOVE3Response(Nfs3Status.NFS3ERR_STALE);
|
||||
}
|
||||
|
||||
WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
|
||||
preOpDirAttr);
|
||||
if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
|
||||
return new REMOVE3Response(Nfs3Status.NFS3ERR_ACCES, errWcc);
|
||||
}
|
||||
|
||||
String fileIdPath = dirFileIdPath + "/" + fileName;
|
||||
HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
|
||||
if (fstat == null) {
|
||||
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
|
||||
preOpDirAttr);
|
||||
return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
|
||||
return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
|
||||
}
|
||||
if (fstat.isDir()) {
|
||||
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
|
||||
preOpDirAttr);
|
||||
return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, dirWcc);
|
||||
return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, errWcc);
|
||||
}
|
||||
|
||||
boolean result = dfsClient.delete(fileIdPath, false);
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||
import org.apache.hadoop.nfs.nfs3.response.REMOVE3Response;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.apache.hadoop.oncrpc.security.SecurityHandler;
|
||||
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestClientAccessPrivilege {
|
||||
static MiniDFSCluster cluster = null;
|
||||
static NfsConfiguration config = new NfsConfiguration();
|
||||
static DistributedFileSystem hdfs;
|
||||
static NameNode nn;
|
||||
static String testdir = "/tmp";
|
||||
static SecurityHandler securityHandler;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
|
||||
String currentUser = System.getProperty("user.name");
|
||||
config.set(DefaultImpersonationProvider.getTestProvider()
|
||||
.getProxySuperuserGroupConfKey(currentUser), "*");
|
||||
config.set(DefaultImpersonationProvider.getTestProvider()
|
||||
.getProxySuperuserIpConfKey(currentUser), "*");
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
hdfs = cluster.getFileSystem();
|
||||
nn = cluster.getNameNode();
|
||||
|
||||
// Use ephemeral port in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
config.setInt("nfs3.server.port", 0);
|
||||
|
||||
securityHandler = Mockito.mock(SecurityHandler.class);
|
||||
Mockito.when(securityHandler.getUser()).thenReturn(
|
||||
System.getProperty("user.name"));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void createFiles() throws IllegalArgumentException, IOException {
|
||||
hdfs.delete(new Path(testdir), true);
|
||||
hdfs.mkdirs(new Path(testdir));
|
||||
DFSTestUtil.createFile(hdfs, new Path(testdir + "/f1"), 0, (short) 1, 0);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testClientAccessPrivilegeForRemove() throws Exception {
|
||||
// Configure ro access for nfs1 service
|
||||
config.set("dfs.nfs.exports.allowed.hosts", "* ro");
|
||||
|
||||
// Start nfs
|
||||
Nfs3 nfs = new Nfs3(config);
|
||||
nfs.startServiceInternal(false);
|
||||
|
||||
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs.getRpcProgram();
|
||||
|
||||
// Create a remove request
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
handle.serialize(xdr_req);
|
||||
xdr_req.writeString("f1");
|
||||
|
||||
// Remove operation
|
||||
REMOVE3Response response = nfsd.remove(xdr_req.asReadOnlyWrap(),
|
||||
securityHandler, new InetSocketAddress("localhost", 1234));
|
||||
|
||||
// Assert on return code
|
||||
assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
|
||||
response.getStatus());
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
|
|||
MAPREDUCE-5971. Move the default options for distcp -p to
|
||||
DistCpOptionSwitch. (clamb via wang)
|
||||
|
||||
MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with
|
||||
compatible/incompatible changes (Junping Du via jlowe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -82,10 +82,13 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
|
||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.fusesource.leveldbjni.JniDBFactory;
|
||||
|
@ -125,6 +128,7 @@ import org.jboss.netty.handler.stream.ChunkedWriteHandler;
|
|||
import org.jboss.netty.util.CharsetUtil;
|
||||
import org.mortbay.jetty.HttpHeaders;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
@ -146,8 +150,9 @@ public class ShuffleHandler extends AuxiliaryService {
|
|||
Pattern.CASE_INSENSITIVE);
|
||||
|
||||
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
|
||||
private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
|
||||
private static final String STATE_DB_SCHEMA_VERSION = "1.0";
|
||||
private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
|
||||
protected static final NMDBSchemaVersion CURRENT_VERSION_INFO =
|
||||
NMDBSchemaVersion.newInstance(1, 0);
|
||||
|
||||
private int port;
|
||||
private ChannelFactory selector;
|
||||
|
@ -466,18 +471,15 @@ public class ShuffleHandler extends AuxiliaryService {
|
|||
Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
|
||||
LOG.info("Using state database at " + dbPath + " for recovery");
|
||||
File dbfile = new File(dbPath.toString());
|
||||
byte[] schemaVersionData;
|
||||
try {
|
||||
stateDb = JniDBFactory.factory.open(dbfile, options);
|
||||
schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
|
||||
} catch (NativeDB.DBException e) {
|
||||
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
||||
LOG.info("Creating state database at " + dbfile);
|
||||
options.createIfMissing(true);
|
||||
try {
|
||||
stateDb = JniDBFactory.factory.open(dbfile, options);
|
||||
schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
|
||||
stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
|
||||
storeVersion();
|
||||
} catch (DBException dbExc) {
|
||||
throw new IOException("Unable to create state store", dbExc);
|
||||
}
|
||||
|
@ -485,15 +487,69 @@ public class ShuffleHandler extends AuxiliaryService {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
if (schemaVersionData != null) {
|
||||
String schemaVersion = asString(schemaVersionData);
|
||||
// only support exact schema matches for now
|
||||
if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
|
||||
throw new IOException("Incompatible state database schema, found "
|
||||
+ schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
|
||||
}
|
||||
checkVersion();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
NMDBSchemaVersion loadVersion() throws IOException {
|
||||
byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
|
||||
// if version is not stored previously, treat it as 1.0.
|
||||
if (data == null || data.length == 0) {
|
||||
return NMDBSchemaVersion.newInstance(1, 0);
|
||||
}
|
||||
NMDBSchemaVersion version =
|
||||
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
|
||||
return version;
|
||||
}
|
||||
|
||||
private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException {
|
||||
String key = STATE_DB_SCHEMA_VERSION_KEY;
|
||||
byte[] data =
|
||||
((NMDBSchemaVersionPBImpl) version).getProto().toByteArray();
|
||||
try {
|
||||
stateDb.put(bytes(key), data);
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void storeVersion() throws IOException {
|
||||
storeSchemaVersion(CURRENT_VERSION_INFO);
|
||||
}
|
||||
|
||||
// Only used for test
|
||||
@VisibleForTesting
|
||||
void storeVersion(NMDBSchemaVersion version) throws IOException {
|
||||
storeSchemaVersion(version);
|
||||
}
|
||||
|
||||
protected NMDBSchemaVersion getCurrentVersion() {
|
||||
return CURRENT_VERSION_INFO;
|
||||
}
|
||||
|
||||
/**
|
||||
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
|
||||
* 2) Any incompatible change of DB schema is a major upgrade, and any
|
||||
* compatible change of DB schema is a minor upgrade.
|
||||
* 3) Within a minor upgrade, say 1.1 to 1.2:
|
||||
* overwrite the version info and proceed as normal.
|
||||
* 4) Within a major upgrade, say 1.2 to 2.0:
|
||||
* throw exception and indicate user to use a separate upgrade tool to
|
||||
* upgrade shuffle info or remove incompatible old state.
|
||||
*/
|
||||
private void checkVersion() throws IOException {
|
||||
NMDBSchemaVersion loadedVersion = loadVersion();
|
||||
LOG.info("Loaded state DB schema version info " + loadedVersion);
|
||||
if (loadedVersion.equals(getCurrentVersion())) {
|
||||
return;
|
||||
}
|
||||
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
||||
LOG.info("Storing state DB schedma version info " + getCurrentVersion());
|
||||
storeVersion();
|
||||
} else {
|
||||
throw new IOException("State database schema version not found");
|
||||
throw new IOException(
|
||||
"Incompatible version for state DB schema: expecting DB schema version "
|
||||
+ getCurrentVersion() + ", but loading version " + loadedVersion);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.metrics2.MetricsSystem;
|
|||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.ServiceStateException;
|
||||
import org.apache.hadoop.util.PureJavaCrc32;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
|
@ -718,6 +720,94 @@ public class TestShuffleHandler {
|
|||
FileUtil.fullyDelete(tmpDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryFromOtherVersions() throws IOException {
|
||||
final String user = "someuser";
|
||||
final ApplicationId appId = ApplicationId.newInstance(12345, 1);
|
||||
final File tmpDir = new File(System.getProperty("test.build.data",
|
||||
System.getProperty("java.io.tmpdir")),
|
||||
TestShuffleHandler.class.getName());
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
||||
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
|
||||
ShuffleHandler shuffle = new ShuffleHandler();
|
||||
// emulate aux services startup with recovery enabled
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
tmpDir.mkdirs();
|
||||
try {
|
||||
shuffle.init(conf);
|
||||
shuffle.start();
|
||||
|
||||
// setup a shuffle token for an application
|
||||
DataOutputBuffer outputBuffer = new DataOutputBuffer();
|
||||
outputBuffer.reset();
|
||||
Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
|
||||
"identifier".getBytes(), "password".getBytes(), new Text(user),
|
||||
new Text("shuffleService"));
|
||||
jt.write(outputBuffer);
|
||||
shuffle.initializeApplication(new ApplicationInitializationContext(user,
|
||||
appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
|
||||
outputBuffer.getLength())));
|
||||
|
||||
// verify we are authorized to shuffle
|
||||
int rc = getShuffleResponseCode(shuffle, jt);
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
||||
|
||||
// emulate shuffle handler restart
|
||||
shuffle.close();
|
||||
shuffle = new ShuffleHandler();
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
shuffle.init(conf);
|
||||
shuffle.start();
|
||||
|
||||
// verify we are still authorized to shuffle to the old application
|
||||
rc = getShuffleResponseCode(shuffle, jt);
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
||||
NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0);
|
||||
Assert.assertEquals(version, shuffle.getCurrentVersion());
|
||||
|
||||
// emulate shuffle handler restart with compatible version
|
||||
NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1);
|
||||
// update version info before close shuffle
|
||||
shuffle.storeVersion(version11);
|
||||
Assert.assertEquals(version11, shuffle.loadVersion());
|
||||
shuffle.close();
|
||||
shuffle = new ShuffleHandler();
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
shuffle.init(conf);
|
||||
shuffle.start();
|
||||
// shuffle version will be override by CURRENT_VERSION_INFO after restart
|
||||
// successfully.
|
||||
Assert.assertEquals(version, shuffle.loadVersion());
|
||||
// verify we are still authorized to shuffle to the old application
|
||||
rc = getShuffleResponseCode(shuffle, jt);
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
||||
|
||||
// emulate shuffle handler restart with incompatible version
|
||||
NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1);
|
||||
shuffle.storeVersion(version21);
|
||||
Assert.assertEquals(version21, shuffle.loadVersion());
|
||||
shuffle.close();
|
||||
shuffle = new ShuffleHandler();
|
||||
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
|
||||
shuffle.init(conf);
|
||||
|
||||
try {
|
||||
shuffle.start();
|
||||
Assert.fail("Incompatible version, should expect fail here.");
|
||||
} catch (ServiceStateException e) {
|
||||
Assert.assertTrue("Exception message mismatch",
|
||||
e.getMessage().contains("Incompatible version for state DB schema:"));
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (shuffle != null) {
|
||||
shuffle.close();
|
||||
}
|
||||
FileUtil.fullyDelete(tmpDir);
|
||||
}
|
||||
}
|
||||
|
||||
private static int getShuffleResponseCode(ShuffleHandler shuffle,
|
||||
Token<JobTokenIdentifier> jt) throws IOException {
|
||||
|
|
|
@ -89,6 +89,7 @@
|
|||
<item name="HDFS NFS Gateway" href="hadoop-project-dist/hadoop-hdfs/HdfsNfsGateway.html"/>
|
||||
<item name="HDFS Rolling Upgrade" href="hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html"/>
|
||||
<item name="Extended Attributes" href="hadoop-project-dist/hadoop-hdfs/ExtendedAttributes.html"/>
|
||||
<item name="HDFS Support for Multihoming" href="hadoop-project-dist/hadoop-hdfs/HdfsMultihoming.html"/>
|
||||
</menu>
|
||||
|
||||
<menu name="MapReduce" inherit="top">
|
||||
|
|
|
@ -56,6 +56,12 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
YARN-2045. Data persisted in NM should be versioned (Junping Du via jlowe)
|
||||
|
||||
YARN-2013. The diagnostics is always the ExitCodeException stack when the container
|
||||
crashes. (Tsuyoshi OZAWA via junping_du)
|
||||
|
||||
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
|
||||
(Li Lu via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -79,6 +85,15 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2244. FairScheduler missing handling of containers for unknown
|
||||
application attempts. (Anubhav Dhoot via kasha)
|
||||
|
||||
YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement
|
||||
(Leitao Guo via jlowe)
|
||||
|
||||
YARN-2273. NPE in ContinuousScheduling thread when we lose a node.
|
||||
(Wei Yan via kasha)
|
||||
|
||||
YARN-2313. Livelock can occur in FairScheduler when there are lots of
|
||||
running apps (Tsuyoshi Ozawa via Sandy Ryza)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -406,6 +421,12 @@ Release 2.5.0 - UNRELEASED
|
|||
|
||||
YARN-2269. Remove external links from YARN UI. (Craig Welch via xgong)
|
||||
|
||||
YARN-2270. Made TestFSDownload#testDownloadPublicWithStatCache be skipped
|
||||
when there’s no ancestor permissions. (Akira Ajisaka via zjshen)
|
||||
|
||||
YARN-2319. Made the MiniKdc instance start/close before/after the class of
|
||||
TestRMWebServicesDelegationTokens. (Wenwu Peng via zjshen)
|
||||
|
||||
Release 2.4.1 - 2014-06-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -61,23 +61,27 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
|
|||
function print_usage(){
|
||||
echo "Usage: yarn [--config confdir] COMMAND"
|
||||
echo "where COMMAND is one of:"
|
||||
echo " resourcemanager -format deletes the RMStateStore"
|
||||
echo " resourcemanager run the ResourceManager"
|
||||
echo " nodemanager run a nodemanager on each slave"
|
||||
echo " timelineserver run the timeline server"
|
||||
echo " rmadmin admin tools"
|
||||
echo " version print the version"
|
||||
echo " jar <jar> run a jar file"
|
||||
echo " application prints application(s) report/kill application"
|
||||
echo " applicationattempt prints applicationattempt(s) report"
|
||||
echo " container prints container(s) report"
|
||||
echo " node prints node report(s)"
|
||||
echo " logs dump container logs"
|
||||
echo " classpath prints the class path needed to get the"
|
||||
echo " Hadoop jar and the required libraries"
|
||||
echo " daemonlog get/set the log level for each daemon"
|
||||
echo " resourcemanager -format-state-store deletes the RMStateStore"
|
||||
echo " resourcemanager run the ResourceManager"
|
||||
echo " nodemanager run a nodemanager on each slave"
|
||||
echo " timelineserver run the timeline server"
|
||||
echo " rmadmin admin tools"
|
||||
echo " version print the version"
|
||||
echo " jar <jar> run a jar file"
|
||||
echo " application prints application(s)"
|
||||
echo " report/kill application"
|
||||
echo " applicationattempt prints applicationattempt(s)"
|
||||
echo " report"
|
||||
echo " container prints container(s) report"
|
||||
echo " node prints node report(s)"
|
||||
echo " logs dump container logs"
|
||||
echo " classpath prints the class path needed to"
|
||||
echo " get the Hadoop jar and the"
|
||||
echo " required libraries"
|
||||
echo " daemonlog get/set the log level for each"
|
||||
echo " daemon"
|
||||
echo " or"
|
||||
echo " CLASSNAME run the class named CLASSNAME"
|
||||
echo " CLASSNAME run the class named CLASSNAME"
|
||||
echo "Most commands print help when invoked w/o parameters."
|
||||
}
|
||||
|
||||
|
|
|
@ -194,6 +194,12 @@
|
|||
<Field name="scheduleAsynchronously" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!-- Inconsistent sync warning - updateInterval is only initialized once and never changed -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
|
||||
<Field name="updateInterval" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!-- Inconsistent sync warning - numRetries is only initialized once and never changed -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
|
||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
|
@ -94,7 +95,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -522,6 +522,8 @@ public class ApplicationMaster {
|
|||
+ appAttemptID.toString(), e);
|
||||
}
|
||||
|
||||
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
|
||||
// are marked as LimitedPrivate
|
||||
Credentials credentials =
|
||||
UserGroupInformation.getCurrentUser().getCredentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
|
@ -900,11 +902,6 @@ public class ApplicationMaster {
|
|||
public void run() {
|
||||
LOG.info("Setting up container launch container for containerid="
|
||||
+ container.getId());
|
||||
ContainerLaunchContext ctx = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
|
||||
// Set the environment
|
||||
ctx.setEnvironment(shellEnv);
|
||||
|
||||
// Set the local resources
|
||||
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
||||
|
@ -935,16 +932,13 @@ public class ApplicationMaster {
|
|||
return;
|
||||
}
|
||||
|
||||
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
|
||||
shellRsrc.setType(LocalResourceType.FILE);
|
||||
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
URL yarnUrl = null;
|
||||
try {
|
||||
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
|
||||
renamedScriptPath.toString())));
|
||||
yarnUrl = ConverterUtils.getYarnUrlFromURI(
|
||||
new URI(renamedScriptPath.toString()));
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Error when trying to use shell script path specified"
|
||||
+ " in env, path=" + renamedScriptPath, e);
|
||||
|
||||
// A failure scenario on bad input such as invalid shell script path
|
||||
// We know we cannot continue launching the container
|
||||
// so we should release it.
|
||||
|
@ -953,13 +947,13 @@ public class ApplicationMaster {
|
|||
numFailedContainers.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
shellRsrc.setTimestamp(shellScriptPathTimestamp);
|
||||
shellRsrc.setSize(shellScriptPathLen);
|
||||
LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
|
||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
|
||||
shellScriptPathLen, shellScriptPathTimestamp);
|
||||
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
|
||||
ExecShellStringPath, shellRsrc);
|
||||
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
|
||||
}
|
||||
ctx.setLocalResources(localResources);
|
||||
|
||||
// Set the necessary command to execute on the allocated container
|
||||
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
|
||||
|
@ -986,16 +980,18 @@ public class ApplicationMaster {
|
|||
|
||||
List<String> commands = new ArrayList<String>();
|
||||
commands.add(command.toString());
|
||||
ctx.setCommands(commands);
|
||||
|
||||
// Set up tokens for the container too. Today, for normal shell commands,
|
||||
// the container in distribute-shell doesn't need any tokens. We are
|
||||
// populating them mainly for NodeManagers to be able to download any
|
||||
// files in the distributed file-system. The tokens are otherwise also
|
||||
// useful in cases, for e.g., when one is running a "hadoop dfs" command
|
||||
// inside the distributed shell.
|
||||
ctx.setTokens(allTokens.duplicate());
|
||||
// Set up ContainerLaunchContext, setting local resource, environment,
|
||||
// command and token for constructor.
|
||||
|
||||
// Note for tokens: Set up tokens for the container too. Today, for normal
|
||||
// shell commands, the container in distribute-shell doesn't need any
|
||||
// tokens. We are populating them mainly for NodeManagers to be able to
|
||||
// download anyfiles in the distributed file-system. The tokens are
|
||||
// otherwise also useful in cases, for e.g., when one is running a
|
||||
// "hadoop dfs" command inside the distributed shell.
|
||||
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
|
||||
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
|
||||
containerListener.addContainer(container.getId(), container);
|
||||
nmClientAsync.startContainerAsync(container, ctx);
|
||||
}
|
||||
|
@ -1024,15 +1020,13 @@ public class ApplicationMaster {
|
|||
// setup requirements for hosts
|
||||
// using * as any host will do for the distributed shell app
|
||||
// set the priority for the request
|
||||
Priority pri = Records.newRecord(Priority.class);
|
||||
// TODO - what is the range for priority? how to decide?
|
||||
pri.setPriority(requestPriority);
|
||||
Priority pri = Priority.newInstance(requestPriority);
|
||||
|
||||
// Set up resource type requirements
|
||||
// For now, memory and CPU are supported so we set memory and cpu requirements
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(containerMemory);
|
||||
capability.setVirtualCores(containerVirtualCores);
|
||||
Resource capability = Resource.newInstance(containerMemory,
|
||||
containerVirtualCores);
|
||||
|
||||
ContainerRequest request = new ContainerRequest(capability, null, null,
|
||||
pri);
|
||||
|
|
|
@ -456,9 +456,6 @@ public class Client {
|
|||
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
|
||||
appContext.setApplicationName(appName);
|
||||
|
||||
// Set up the container launch context for the application master
|
||||
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
|
||||
|
||||
// set local resources for the application master
|
||||
// local files or archives as needed
|
||||
// In this scenario, the jar file for the application master is part of the local resources
|
||||
|
@ -508,8 +505,6 @@ public class Client {
|
|||
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
|
||||
localResources, StringUtils.join(shellArgs, " "));
|
||||
}
|
||||
// Set local resource info into app master container launch context
|
||||
amContainer.setLocalResources(localResources);
|
||||
|
||||
// Set the necessary security tokens as needed
|
||||
//amContainer.setContainerTokens(containerToken);
|
||||
|
@ -550,8 +545,6 @@ public class Client {
|
|||
|
||||
env.put("CLASSPATH", classPathEnv.toString());
|
||||
|
||||
amContainer.setEnvironment(env);
|
||||
|
||||
// Set the necessary command to execute the application master
|
||||
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
|
||||
|
||||
|
@ -587,14 +580,15 @@ public class Client {
|
|||
LOG.info("Completed setting up app master command " + command.toString());
|
||||
List<String> commands = new ArrayList<String>();
|
||||
commands.add(command.toString());
|
||||
amContainer.setCommands(commands);
|
||||
|
||||
// Set up the container launch context for the application master
|
||||
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
|
||||
localResources, env, commands, null, null, null);
|
||||
|
||||
// Set up resource type requirements
|
||||
// For now, both memory and vcores are supported, so we set memory and
|
||||
// vcores requirements
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(amMemory);
|
||||
capability.setVirtualCores(amVCores);
|
||||
Resource capability = Resource.newInstance(amMemory, amVCores);
|
||||
appContext.setResource(capability);
|
||||
|
||||
// Service data is a binary blob that can be passed to the application
|
||||
|
@ -603,6 +597,7 @@ public class Client {
|
|||
|
||||
// Setup security tokens
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
|
||||
Credentials credentials = new Credentials();
|
||||
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
|
||||
if (tokenRenewer == null || tokenRenewer.length() == 0) {
|
||||
|
@ -627,9 +622,8 @@ public class Client {
|
|||
appContext.setAMContainerSpec(amContainer);
|
||||
|
||||
// Set the priority for the application master
|
||||
Priority pri = Records.newRecord(Priority.class);
|
||||
// TODO - what is the range for priority? how to decide?
|
||||
pri.setPriority(amPriority);
|
||||
Priority pri = Priority.newInstance(amPriority);
|
||||
appContext.setPriority(pri);
|
||||
|
||||
// Set the queue to which this application is to be submitted in the RM
|
||||
|
|
|
@ -177,9 +177,10 @@ public class FSDownload implements Callable<Path> {
|
|||
/**
|
||||
* Returns true if all ancestors of the specified path have the 'execute'
|
||||
* permission set for all users (i.e. that other users can traverse
|
||||
* the directory heirarchy to the given path)
|
||||
* the directory hierarchy to the given path)
|
||||
*/
|
||||
private static boolean ancestorsHaveExecutePermissions(FileSystem fs,
|
||||
@VisibleForTesting
|
||||
static boolean ancestorsHaveExecutePermissions(FileSystem fs,
|
||||
Path path, LoadingCache<Path,Future<FileStatus>> statCache)
|
||||
throws IOException {
|
||||
Path current = path;
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
@ -66,6 +67,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
|
@ -308,6 +310,11 @@ public class TestFSDownload {
|
|||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||
Path basedir = files.makeQualified(new Path("target",
|
||||
TestFSDownload.class.getSimpleName()));
|
||||
|
||||
// if test directory doesn't have ancestor permission, skip this test
|
||||
FileSystem f = basedir.getFileSystem(conf);
|
||||
assumeTrue(FSDownload.ancestorsHaveExecutePermissions(f, basedir, null));
|
||||
|
||||
files.mkdir(basedir, null, true);
|
||||
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
|
||||
|
@ -212,10 +213,21 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|||
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
|
||||
LOG.warn("Exception from container-launch with container ID: "
|
||||
+ containerId + " and exit code: " + exitCode , e);
|
||||
logOutput(shExec.getOutput());
|
||||
String diagnostics = "Exception from container-launch: "
|
||||
+ e + "\n"
|
||||
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("Exception from container-launch.\n");
|
||||
builder.append("Container id: " + containerId + "\n");
|
||||
builder.append("Exit code: " + exitCode + "\n");
|
||||
if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
|
||||
builder.append("Exception message: " + e.getMessage() + "\n");
|
||||
}
|
||||
builder.append("Stack trace: "
|
||||
+ StringUtils.stringifyException(e) + "\n");
|
||||
if (!shExec.getOutput().isEmpty()) {
|
||||
builder.append("Shell output: " + shExec.getOutput() + "\n");
|
||||
}
|
||||
String diagnostics = builder.toString();
|
||||
logOutput(diagnostics);
|
||||
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
|
||||
diagnostics));
|
||||
} else {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -296,9 +297,21 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|||
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
|
||||
LOG.warn("Exception from container-launch with container ID: "
|
||||
+ containerId + " and exit code: " + exitCode , e);
|
||||
logOutput(shExec.getOutput());
|
||||
String diagnostics = "Exception from container-launch: \n"
|
||||
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("Exception from container-launch.\n");
|
||||
builder.append("Container id: " + containerId + "\n");
|
||||
builder.append("Exit code: " + exitCode + "\n");
|
||||
if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
|
||||
builder.append("Exception message: " + e.getMessage() + "\n");
|
||||
}
|
||||
builder.append("Stack trace: "
|
||||
+ StringUtils.stringifyException(e) + "\n");
|
||||
if (!shExec.getOutput().isEmpty()) {
|
||||
builder.append("Shell output: " + shExec.getOutput() + "\n");
|
||||
}
|
||||
String diagnostics = builder.toString();
|
||||
logOutput(diagnostics);
|
||||
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
|
||||
diagnostics));
|
||||
} else {
|
||||
|
|
|
@ -72,7 +72,7 @@ public class NodePage extends NMView {
|
|||
._("Total Pmem allocated for Container",
|
||||
StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB))
|
||||
._("Pmem enforcement enabled",
|
||||
info.isVmemCheckEnabled())
|
||||
info.isPmemCheckEnabled())
|
||||
._("Total VCores allocated for Containers",
|
||||
String.valueOf(info.getTotalVCoresAllocated()))
|
||||
._("NodeHealthyStatus",
|
||||
|
|
|
@ -18,16 +18,37 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.LineNumberReader;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||
|
@ -45,15 +66,13 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
|
||||
|
||||
import static org.apache.hadoop.fs.CreateFlag.*;
|
||||
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Matchers;
|
||||
import static org.mockito.Mockito.*;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestDefaultContainerExecutor {
|
||||
|
||||
|
@ -191,6 +210,92 @@ public class TestDefaultContainerExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerLaunchError()
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
Path localDir = new Path(BASE_TMP_PATH, "localDir");
|
||||
List<String> localDirs = new ArrayList<String>();
|
||||
localDirs.add(localDir.toString());
|
||||
List<String> logDirs = new ArrayList<String>();
|
||||
Path logDir = new Path(BASE_TMP_PATH, "logDir");
|
||||
logDirs.add(logDir.toString());
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.toString());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
|
||||
|
||||
FileContext lfs = FileContext.getLocalFSFileContext(conf);
|
||||
DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(lfs));
|
||||
mockExec.setConf(conf);
|
||||
doAnswer(
|
||||
new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock)
|
||||
throws Throwable {
|
||||
String diagnostics = (String) invocationOnMock.getArguments()[0];
|
||||
assertTrue("Invalid Diagnostics message: " + diagnostics,
|
||||
diagnostics.contains("No such file or directory"));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
).when(mockExec).logOutput(any(String.class));
|
||||
|
||||
String appSubmitter = "nobody";
|
||||
String appId = "APP_ID";
|
||||
String containerId = "CONTAINER_ID";
|
||||
Container container = mock(Container.class);
|
||||
ContainerId cId = mock(ContainerId.class);
|
||||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String, String>();
|
||||
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
try {
|
||||
doAnswer(new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock)
|
||||
throws Throwable {
|
||||
ContainerDiagnosticsUpdateEvent event =
|
||||
(ContainerDiagnosticsUpdateEvent) invocationOnMock
|
||||
.getArguments()[0];
|
||||
assertTrue("Invalid Diagnostics message: "
|
||||
+ event.getDiagnosticsUpdate(),
|
||||
event.getDiagnosticsUpdate().contains("No such file or directory")
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class));
|
||||
|
||||
when(cId.toString()).thenReturn(containerId);
|
||||
when(cId.getApplicationAttemptId()).thenReturn(
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 0));
|
||||
|
||||
when(context.getEnvironment()).thenReturn(env);
|
||||
|
||||
mockExec.createUserLocalDirs(localDirs, appSubmitter);
|
||||
mockExec.createUserCacheDirs(localDirs, appSubmitter);
|
||||
mockExec.createAppDirs(localDirs, appSubmitter, appId);
|
||||
mockExec.createAppLogDirs(appId, logDirs);
|
||||
|
||||
Path scriptPath = new Path("file:///bin/echo");
|
||||
Path tokensPath = new Path("file:///dev/null");
|
||||
Path workDir = localDir;
|
||||
Path pidFile = new Path(workDir, "pid.txt");
|
||||
|
||||
mockExec.init();
|
||||
mockExec.activateContainer(cId, pidFile);
|
||||
int ret = mockExec
|
||||
.launchContainer(container, scriptPath, tokensPath, appSubmitter,
|
||||
appId, workDir, localDirs, localDirs);
|
||||
Assert.assertNotSame(0, ret);
|
||||
} finally {
|
||||
mockExec.deleteAsUser(appSubmitter, localDir);
|
||||
mockExec.deleteAsUser(appSubmitter, logDir);
|
||||
}
|
||||
}
|
||||
|
||||
// @Test
|
||||
// public void testInit() throws IOException, InterruptedException {
|
||||
// Configuration conf = new Configuration();
|
||||
|
|
|
@ -19,8 +19,12 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -34,8 +38,6 @@ import java.util.HashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -46,9 +48,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||
import org.junit.Assert;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestLinuxContainerExecutorWithMocks {
|
||||
|
||||
|
@ -216,7 +222,19 @@ public class TestLinuxContainerExecutorWithMocks {
|
|||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "file:///bin/echo");
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, "file:///dev/null");
|
||||
|
||||
mockExec = new LinuxContainerExecutor();
|
||||
mockExec = spy(new LinuxContainerExecutor());
|
||||
doAnswer(
|
||||
new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock)
|
||||
throws Throwable {
|
||||
String diagnostics = (String) invocationOnMock.getArguments()[0];
|
||||
assertTrue("Invalid Diagnostics message: " + diagnostics,
|
||||
diagnostics.contains("badcommand"));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
).when(mockExec).logOutput(any(String.class));
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
dirsHandler.init(conf);
|
||||
mockExec.setConf(conf);
|
||||
|
@ -233,7 +251,22 @@ public class TestLinuxContainerExecutorWithMocks {
|
|||
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
doAnswer(
|
||||
new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock)
|
||||
throws Throwable {
|
||||
ContainerDiagnosticsUpdateEvent event =
|
||||
(ContainerDiagnosticsUpdateEvent) invocationOnMock
|
||||
.getArguments()[0];
|
||||
assertTrue("Invalid Diagnostics message: " +
|
||||
event.getDiagnosticsUpdate(),
|
||||
event.getDiagnosticsUpdate().contains("badcommand"));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class));
|
||||
|
||||
when(cId.toString()).thenReturn(containerId);
|
||||
|
||||
when(context.getEnvironment()).thenReturn(env);
|
||||
|
|
|
@ -1035,8 +1035,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
|
||||
try {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
// If -format, then delete RMStateStore; else startup normally
|
||||
if (argv.length == 1 && argv[0].equals("-format")) {
|
||||
// If -format-state-store, then delete RMStateStore; else startup normally
|
||||
if (argv.length == 1 && argv[0].equals("-format-state-store")) {
|
||||
deleteRMStateStore(conf);
|
||||
} else {
|
||||
ResourceManager resourceManager = new ResourceManager();
|
||||
|
|
|
@ -1294,12 +1294,20 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
private String getAMContainerCrashedDiagnostics(
|
||||
RMAppAttemptContainerFinishedEvent finishEvent) {
|
||||
ContainerStatus status = finishEvent.getContainerStatus();
|
||||
String diagnostics =
|
||||
"AM Container for " + finishEvent.getApplicationAttemptId()
|
||||
+ " exited with " + " exitCode: " + status.getExitStatus() + ". "
|
||||
+ "Check application tracking page: " + this.getTrackingUrl()
|
||||
+ " . Then, click on links to logs of each attempt for detailed output. ";
|
||||
return diagnostics;
|
||||
StringBuilder diagnosticsBuilder = new StringBuilder();
|
||||
diagnosticsBuilder.append("AM Container for ").append(
|
||||
finishEvent.getApplicationAttemptId()).append(
|
||||
" exited with ").append(" exitCode: ").append(status.getExitStatus()).
|
||||
append("\n");
|
||||
if (this.getTrackingUrl() != null) {
|
||||
diagnosticsBuilder.append("For more detailed output,").append(
|
||||
" check application tracking page:").append(
|
||||
this.getTrackingUrl()).append(
|
||||
"Then, click on links to logs of each attempt.\n");
|
||||
}
|
||||
diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics())
|
||||
.append("Failing this attempt");
|
||||
return diagnosticsBuilder.toString();
|
||||
}
|
||||
|
||||
private static class FinalTransition extends BaseFinalTransition {
|
||||
|
|
|
@ -135,7 +135,7 @@ public class FairScheduler extends
|
|||
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
|
||||
|
||||
// How often fair shares are re-calculated (ms)
|
||||
protected long UPDATE_INTERVAL = 500;
|
||||
protected long updateInterval;
|
||||
private final int UPDATE_DEBUG_FREQUENCY = 5;
|
||||
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||
|
||||
|
@ -244,13 +244,13 @@ public class FairScheduler extends
|
|||
|
||||
/**
|
||||
* A runnable which calls {@link FairScheduler#update()} every
|
||||
* <code>UPDATE_INTERVAL</code> milliseconds.
|
||||
* <code>updateInterval</code> milliseconds.
|
||||
*/
|
||||
private class UpdateThread implements Runnable {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(UPDATE_INTERVAL);
|
||||
Thread.sleep(updateInterval);
|
||||
update();
|
||||
preemptTasksIfNecessary();
|
||||
} catch (Exception e) {
|
||||
|
@ -970,37 +970,27 @@ public class FairScheduler extends
|
|||
}
|
||||
}
|
||||
|
||||
private void continuousScheduling() {
|
||||
while (true) {
|
||||
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
|
||||
// Sort the nodes by space available on them, so that we offer
|
||||
// containers on emptier nodes first, facilitating an even spread. This
|
||||
// requires holding the scheduler lock, so that the space available on a
|
||||
// node doesn't change during the sort.
|
||||
synchronized (this) {
|
||||
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
|
||||
}
|
||||
void continuousSchedulingAttempt() {
|
||||
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
|
||||
// Sort the nodes by space available on them, so that we offer
|
||||
// containers on emptier nodes first, facilitating an even spread. This
|
||||
// requires holding the scheduler lock, so that the space available on a
|
||||
// node doesn't change during the sort.
|
||||
synchronized (this) {
|
||||
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
|
||||
}
|
||||
|
||||
// iterate all nodes
|
||||
for (NodeId nodeId : nodeIdList) {
|
||||
if (nodes.containsKey(nodeId)) {
|
||||
FSSchedulerNode node = getFSSchedulerNode(nodeId);
|
||||
try {
|
||||
if (Resources.fitsIn(minimumAllocation,
|
||||
node.getAvailableResource())) {
|
||||
attemptScheduling(node);
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
LOG.warn("Error while attempting scheduling for node " + node +
|
||||
": " + ex.toString(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
// iterate all nodes
|
||||
for (NodeId nodeId : nodeIdList) {
|
||||
FSSchedulerNode node = getFSSchedulerNode(nodeId);
|
||||
try {
|
||||
Thread.sleep(getContinuousSchedulingSleepMs());
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Error while doing sleep in continuous scheduling: " +
|
||||
e.toString(), e);
|
||||
if (node != null && Resources.fitsIn(minimumAllocation,
|
||||
node.getAvailableResource())) {
|
||||
attemptScheduling(node);
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
LOG.error("Error while attempting scheduling for node " + node +
|
||||
": " + ex.toString(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1010,6 +1000,12 @@ public class FairScheduler extends
|
|||
|
||||
@Override
|
||||
public int compare(NodeId n1, NodeId n2) {
|
||||
if (!nodes.containsKey(n1)) {
|
||||
return 1;
|
||||
}
|
||||
if (!nodes.containsKey(n2)) {
|
||||
return -1;
|
||||
}
|
||||
return RESOURCE_CALCULATOR.compare(clusterResource,
|
||||
nodes.get(n2).getAvailableResource(),
|
||||
nodes.get(n1).getAvailableResource());
|
||||
|
@ -1210,6 +1206,15 @@ public class FairScheduler extends
|
|||
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
||||
usePortForNodeName = this.conf.getUsePortForNodeName();
|
||||
|
||||
updateInterval = this.conf.getUpdateInterval();
|
||||
if (updateInterval < 0) {
|
||||
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
|
||||
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
|
||||
+ " is invalid, so using default value " +
|
||||
+ FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
|
||||
+ " ms instead");
|
||||
}
|
||||
|
||||
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
||||
// This stores per-application scheduling information
|
||||
this.applications =
|
||||
|
@ -1234,7 +1239,16 @@ public class FairScheduler extends
|
|||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
continuousScheduling();
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
continuousSchedulingAttempt();
|
||||
Thread.sleep(getContinuousSchedulingSleepMs());
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Continuous scheduling thread interrupted. Exiting. ",
|
||||
e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -123,6 +123,11 @@ public class FairSchedulerConfiguration extends Configuration {
|
|||
protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
|
||||
protected static final int DEFAULT_MAX_ASSIGN = -1;
|
||||
|
||||
/** The update interval for calculating resources in FairScheduler .*/
|
||||
public static final String UPDATE_INTERVAL_MS =
|
||||
CONF_PREFIX + "update-interval-ms";
|
||||
public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
|
||||
|
||||
public FairSchedulerConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
@ -246,6 +251,10 @@ public class FairSchedulerConfiguration extends Configuration {
|
|||
"Error reading resource config", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public long getUpdateInterval() {
|
||||
return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
|
||||
}
|
||||
|
||||
private static int findResource(String val, String units)
|
||||
throws AllocationConfigurationException {
|
||||
|
|
|
@ -823,7 +823,9 @@ public class TestRMAppAttemptTransitions {
|
|||
applicationAttempt.getAppAttemptState());
|
||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||
verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics());
|
||||
boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null);
|
||||
verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(),
|
||||
exitCode, shouldCheckURL);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1241,11 +1243,18 @@ public class TestRMAppAttemptTransitions {
|
|||
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||
}
|
||||
|
||||
private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics) {
|
||||
assertTrue("Diagnostic information does not contain application proxy URL",
|
||||
diagnostics.contains(applicationAttempt.getWebProxyBase()));
|
||||
private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics,
|
||||
int exitCode, boolean shouldCheckURL) {
|
||||
assertTrue("Diagnostic information does not point the logs to the users",
|
||||
diagnostics.contains("logs"));
|
||||
assertTrue("Diagnostic information does not contain application attempt id",
|
||||
diagnostics.contains(applicationAttempt.getAppAttemptId().toString()));
|
||||
assertTrue("Diagnostic information does not contain application exit code",
|
||||
diagnostics.contains("exitCode: " + exitCode));
|
||||
if (shouldCheckURL) {
|
||||
assertTrue("Diagnostic information does not contain application proxy URL",
|
||||
diagnostics.contains(applicationAttempt.getWebProxyBase()));
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
|
||||
|
|
|
@ -2763,7 +2763,43 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
Assert.assertEquals(2, nodes.size());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
|
||||
// Disable continuous scheduling, will invoke continuous scheduling once manually
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
Assert.assertTrue("Continuous scheduling should be disabled.",
|
||||
!scheduler.isContinuousSchedulingEnabled());
|
||||
|
||||
// Add two nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
Assert.assertEquals("We should have two alive nodes.",
|
||||
2, scheduler.getNumClusterNodes());
|
||||
|
||||
// Remove one node
|
||||
NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
|
||||
scheduler.handle(removeNode1);
|
||||
Assert.assertEquals("We should only have one alive node.",
|
||||
1, scheduler.getNumClusterNodes());
|
||||
|
||||
// Invoke the continuous scheduling once
|
||||
try {
|
||||
scheduler.continuousSchedulingAttempt();
|
||||
} catch (Exception e) {
|
||||
fail("Exception happened when doing continuous scheduling. " +
|
||||
e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDontAllowUndeclaredPools() throws Exception{
|
||||
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
|
||||
|
|
|
@ -94,7 +94,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
|||
scheduler = (FairScheduler)resourceManager.getResourceScheduler();
|
||||
|
||||
scheduler.setClock(clock);
|
||||
scheduler.UPDATE_INTERVAL = 60 * 1000;
|
||||
scheduler.updateInterval = 60 * 1000;
|
||||
}
|
||||
|
||||
private void registerNodeAndSubmitApp(
|
||||
|
|
|
@ -60,7 +60,9 @@ import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
|
|||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -90,28 +92,14 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
|
|||
@RunWith(Parameterized.class)
|
||||
public class TestRMWebServicesDelegationTokens extends JerseyTest {
|
||||
|
||||
private static final File testRootDir = new File("target",
|
||||
TestRMWebServicesDelegationTokens.class.getName() + "-root");
|
||||
private static File testRootDir;
|
||||
private static File httpSpnegoKeytabFile = new File(
|
||||
KerberosTestUtils.getKeytabFile());
|
||||
|
||||
private static String httpSpnegoPrincipal = KerberosTestUtils
|
||||
.getServerPrincipal();
|
||||
|
||||
private static boolean miniKDCStarted = false;
|
||||
private static MiniKdc testMiniKDC;
|
||||
static {
|
||||
try {
|
||||
testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir);
|
||||
} catch (Exception e) {
|
||||
assertTrue("Couldn't create MiniKDC", false);
|
||||
}
|
||||
}
|
||||
|
||||
private static MockRM rm;
|
||||
|
||||
private Injector injector;
|
||||
|
||||
private boolean isKerberosAuth = false;
|
||||
|
||||
// Make sure the test uses the published header string
|
||||
|
@ -237,7 +225,6 @@ public class TestRMWebServicesDelegationTokens extends JerseyTest {
|
|||
.contextListenerClass(GuiceServletConfig.class)
|
||||
.filterClass(com.google.inject.servlet.GuiceFilter.class)
|
||||
.contextPath("jersey-guice-filter").servletPath("/").build());
|
||||
setupKDC();
|
||||
switch (run) {
|
||||
case 0:
|
||||
default:
|
||||
|
@ -249,17 +236,14 @@ public class TestRMWebServicesDelegationTokens extends JerseyTest {
|
|||
}
|
||||
}
|
||||
|
||||
private void setupKDC() throws Exception {
|
||||
if (miniKDCStarted == false) {
|
||||
testMiniKDC.start();
|
||||
getKdc().createPrincipal(httpSpnegoKeytabFile, "HTTP/localhost",
|
||||
"client", "client2", "client3");
|
||||
miniKDCStarted = true;
|
||||
}
|
||||
}
|
||||
|
||||
private MiniKdc getKdc() {
|
||||
return testMiniKDC;
|
||||
@BeforeClass
|
||||
public static void setupKDC() throws Exception {
|
||||
testRootDir = new File("target",
|
||||
TestRMWebServicesDelegationTokens.class.getName() + "-root");
|
||||
testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir);
|
||||
testMiniKDC.start();
|
||||
testMiniKDC.createPrincipal(httpSpnegoKeytabFile, "HTTP/localhost",
|
||||
"client", "client2", "client3");
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -270,6 +254,13 @@ public class TestRMWebServicesDelegationTokens extends JerseyTest {
|
|||
testRootDir.deleteOnExit();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownKdc() {
|
||||
if (testMiniKDC != null) {
|
||||
testMiniKDC.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
|
|
|
@ -205,6 +205,12 @@ Properties that can be placed in yarn-site.xml
|
|||
instead. Defaults to true. If a queue placement policy is given in the
|
||||
allocations file, this property is ignored.
|
||||
|
||||
* <<<yarn.scheduler.fair.update-interval-ms>>>
|
||||
|
||||
* The interval at which to lock the scheduler and recalculate fair shares,
|
||||
recalculate demand, and check whether anything is due for preemption.
|
||||
Defaults to 500 ms.
|
||||
|
||||
Allocation file format
|
||||
|
||||
The allocation file must be in XML format. The format contains five types of
|
||||
|
|
|
@ -157,15 +157,16 @@ Usage: yarn [--config confdir] COMMAND
|
|||
Start the ResourceManager
|
||||
|
||||
-------
|
||||
Usage: yarn resourcemanager [-format]
|
||||
Usage: yarn resourcemanager [-format-state-store]
|
||||
-------
|
||||
|
||||
*---------------+--------------+
|
||||
|| COMMAND_OPTIONS || Description |
|
||||
*---------------+--------------+
|
||||
| -format | Formats the RMStateStore. This will clear the RMStateStore and is
|
||||
| | useful if past applications are no longer needed. This should be run
|
||||
| | only when the ResourceManager is not running.
|
||||
| -format-state-store | Formats the RMStateStore. This will clear the
|
||||
| | RMStateStore and is useful if past applications are no
|
||||
| | longer needed. This should be run only when the
|
||||
| | ResourceManager is not running.
|
||||
*---------------+--------------+
|
||||
|
||||
** nodemanager
|
||||
|
|
Loading…
Reference in New Issue