Merge r1609845 through r1612880 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1612883 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-07-23 17:30:06 +00:00
commit 48dc486672
41 changed files with 1035 additions and 284 deletions

View File

@ -396,6 +396,12 @@ Trunk (Unreleased)
HADOOP-10840. Fix OutOfMemoryError caused by metrics system in Azure File
System. (Shanyu Zhao via cnauroth)
HADOOP-10826. Iteration on KeyProviderFactory.serviceLoader is
thread-unsafe. (benoyantony viat tucu)
HADOOP-10881. Clarify usage of encryption and encrypted encryption
key in KeyProviderCryptoExtension. (wang)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -444,6 +450,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-10755. Support negative caching of user-group mapping.
(Lei Xu via wang)
HADOOP-10855. Allow Text to be read with a known Length. (todd)
OPTIMIZATIONS
BUG FIXES
@ -794,6 +802,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10710. hadoop.auth cookie is not properly constructed according to
RFC2109. (Juan Yu via tucu)
HADOOP-10864. Tool documentenation is broken. (Akira Ajisaka
via Arpit Agarwal)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -21,7 +21,6 @@
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
@ -30,51 +29,109 @@
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A KeyProvider with Cytographic Extensions specifically for generating
* Encrypted Keys as well as decrypting them
* A KeyProvider with Cryptographic Extensions specifically for generating
* and decrypting encrypted encryption keys.
*
*/
@InterfaceAudience.Private
public class KeyProviderCryptoExtension extends
KeyProviderExtension<KeyProviderCryptoExtension.CryptoExtension> {
/**
* Designates an encrypted encryption key, or EEK.
*/
public static final String EEK = "EEK";
/**
* Designates a decrypted encrypted encryption key, that is, an encryption key
* (EK).
*/
public static final String EK = "EK";
/**
* This is a holder class whose instance contains the keyVersionName, iv
* used to generate the encrypted Key and the encrypted KeyVersion
* An encrypted encryption key (EEK) and related information. An EEK must be
* decrypted using the key's encryption key before it can be used.
*/
public static class EncryptedKeyVersion {
private String keyName;
private String keyVersionName;
private byte[] iv;
private KeyVersion encryptedKey;
private String encryptionKeyName;
private String encryptionKeyVersionName;
private byte[] encryptedKeyIv;
private KeyVersion encryptedKeyVersion;
protected EncryptedKeyVersion(String keyName, String keyVersionName,
byte[] iv, KeyVersion encryptedKey) {
this.keyName = keyName;
this.keyVersionName = keyVersionName;
this.iv = iv;
this.encryptedKey = encryptedKey;
/**
* Create a new EncryptedKeyVersion.
*
* @param keyName Name of the encryption key used to
* encrypt the encrypted key.
* @param encryptionKeyVersionName Version name of the encryption key used
* to encrypt the encrypted key.
* @param encryptedKeyIv Initialization vector of the encrypted
* key. The IV of the encryption key used to
* encrypt the encrypted key is derived from
* this IV.
* @param encryptedKeyVersion The encrypted encryption key version.
*/
protected EncryptedKeyVersion(String keyName,
String encryptionKeyVersionName, byte[] encryptedKeyIv,
KeyVersion encryptedKeyVersion) {
this.encryptionKeyName = keyName;
this.encryptionKeyVersionName = encryptionKeyVersionName;
this.encryptedKeyIv = encryptedKeyIv;
this.encryptedKeyVersion = encryptedKeyVersion;
}
public String getKeyName() {
return keyName;
/**
* @return Name of the encryption key used to encrypt the encrypted key.
*/
public String getEncryptionKeyName() {
return encryptionKeyName;
}
public String getKeyVersionName() {
return keyVersionName;
/**
* @return Version name of the encryption key used to encrypt the encrypted
* key.
*/
public String getEncryptionKeyVersionName() {
return encryptionKeyVersionName;
}
public byte[] getIv() {
return iv;
/**
* @return Initialization vector of the encrypted key. The IV of the
* encryption key used to encrypt the encrypted key is derived from this
* IV.
*/
public byte[] getEncryptedKeyIv() {
return encryptedKeyIv;
}
public KeyVersion getEncryptedKey() {
return encryptedKey;
/**
* @return The encrypted encryption key version.
*/
public KeyVersion getEncryptedKeyVersion() {
return encryptedKeyVersion;
}
/**
* Derive the initialization vector (IV) for the encryption key from the IV
* of the encrypted key. This derived IV is used with the encryption key to
* decrypt the encrypted key.
* <p/>
* The alternative to this is using the same IV for both the encryption key
* and the encrypted key. Even a simple symmetric transformation like this
* improves security by avoiding IV re-use. IVs will also be fairly unique
* among different EEKs.
*
* @param encryptedKeyIV of the encrypted key (i.e. {@link
* #getEncryptedKeyIv()})
* @return IV for the encryption key
*/
protected static byte[] deriveIV(byte[] encryptedKeyIV) {
byte[] rIv = new byte[encryptedKeyIV.length];
// Do a simple XOR transformation to flip all the bits
for (int i = 0; i < encryptedKeyIV.length; i++) {
rIv[i] = (byte) (encryptedKeyIV[i] ^ 0xff);
}
return rIv;
}
}
/**
@ -141,53 +198,56 @@ private DefaultCryptoExtension(KeyProvider keyProvider) {
this.keyProvider = keyProvider;
}
// the IV used to encrypt a EK typically will be the same IV used to
// encrypt data with the EK. To avoid any chance of weakening the
// encryption because the same IV is used, we simply XOR the IV thus we
// are not using the same IV for 2 different encryptions (even if they
// are done using different keys)
private byte[] flipIV(byte[] iv) {
byte[] rIv = new byte[iv.length];
for (int i = 0; i < iv.length; i++) {
rIv[i] = (byte) (iv[i] ^ 0xff);
}
return rIv;
}
@Override
public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName)
throws IOException, GeneralSecurityException {
KeyVersion keyVer = keyProvider.getCurrentKey(encryptionKeyName);
Preconditions.checkNotNull(keyVer, "No KeyVersion exists for key '%s' ",
encryptionKeyName);
byte[] newKey = new byte[keyVer.getMaterial().length];
SecureRandom.getInstance("SHA1PRNG").nextBytes(newKey);
// Fetch the encryption key
KeyVersion encryptionKey = keyProvider.getCurrentKey(encryptionKeyName);
Preconditions.checkNotNull(encryptionKey,
"No KeyVersion exists for key '%s' ", encryptionKeyName);
// Generate random bytes for new key and IV
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
byte[] iv = SecureRandom.getSeed(cipher.getBlockSize());
cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(keyVer.getMaterial(),
"AES"), new IvParameterSpec(flipIV(iv)));
byte[] ek = cipher.doFinal(newKey);
SecureRandom random = SecureRandom.getInstance("SHA1PRNG");
final byte[] newKey = new byte[encryptionKey.getMaterial().length];
random.nextBytes(newKey);
final byte[] iv = random.generateSeed(cipher.getBlockSize());
// Encryption key IV is derived from new key's IV
final byte[] encryptionIV = EncryptedKeyVersion.deriveIV(iv);
// Encrypt the new key
cipher.init(Cipher.ENCRYPT_MODE,
new SecretKeySpec(encryptionKey.getMaterial(), "AES"),
new IvParameterSpec(encryptionIV));
final byte[] encryptedKey = cipher.doFinal(newKey);
return new EncryptedKeyVersion(encryptionKeyName,
keyVer.getVersionName(), iv,
new KeyVersion(keyVer.getName(), EEK, ek));
encryptionKey.getVersionName(), iv,
new KeyVersion(encryptionKey.getName(), EEK, encryptedKey));
}
@Override
public KeyVersion decryptEncryptedKey(
EncryptedKeyVersion encryptedKeyVersion) throws IOException,
GeneralSecurityException {
KeyVersion keyVer =
keyProvider.getKeyVersion(encryptedKeyVersion.getKeyVersionName());
Preconditions.checkNotNull(keyVer, "KeyVersion name '%s' does not exist",
encryptedKeyVersion.getKeyVersionName());
KeyVersion keyVersion = encryptedKeyVersion.getEncryptedKey();
// Fetch the encryption key material
final String encryptionKeyVersionName =
encryptedKeyVersion.getEncryptionKeyVersionName();
final KeyVersion encryptionKey =
keyProvider.getKeyVersion(encryptionKeyVersionName);
Preconditions.checkNotNull(encryptionKey,
"KeyVersion name '%s' does not exist", encryptionKeyVersionName);
final byte[] encryptionKeyMaterial = encryptionKey.getMaterial();
// Encryption key IV is determined from encrypted key's IV
final byte[] encryptionIV =
EncryptedKeyVersion.deriveIV(encryptedKeyVersion.getEncryptedKeyIv());
// Init the cipher with encryption key parameters
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
cipher.init(Cipher.DECRYPT_MODE,
new SecretKeySpec(keyVersion.getMaterial(), "AES"),
new IvParameterSpec(flipIV(encryptedKeyVersion.getIv())));
byte[] ek =
cipher.doFinal(encryptedKeyVersion.getEncryptedKey().getMaterial());
return new KeyVersion(keyVer.getName(), EK, ek);
new SecretKeySpec(encryptionKeyMaterial, "AES"),
new IvParameterSpec(encryptionIV));
// Decrypt the encrypted key
final KeyVersion encryptedKV =
encryptedKeyVersion.getEncryptedKeyVersion();
final byte[] decryptedKey = cipher.doFinal(encryptedKV.getMaterial());
return new KeyVersion(encryptionKey.getName(), EK, decryptedKey);
}
@Override

View File

@ -22,6 +22,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
@ -47,6 +48,15 @@ public abstract KeyProvider createProvider(URI providerName,
private static final ServiceLoader<KeyProviderFactory> serviceLoader =
ServiceLoader.load(KeyProviderFactory.class);
// Iterate through the serviceLoader to avoid lazy loading.
// Lazy loading would require synchronization in concurrent use cases.
static {
Iterator<KeyProviderFactory> iterServices = serviceLoader.iterator();
while (iterServices.hasNext()) {
iterServices.next();
}
}
public static List<KeyProvider> getProviders(Configuration conf
) throws IOException {
List<KeyProvider> result = new ArrayList<KeyProvider>();

View File

@ -646,25 +646,28 @@ public EncryptedKeyVersion generateEncryptedKey(
public KeyVersion decryptEncryptedKey(
EncryptedKeyVersion encryptedKeyVersion) throws IOException,
GeneralSecurityException {
checkNotNull(encryptedKeyVersion.getKeyVersionName(), "versionName");
checkNotNull(encryptedKeyVersion.getIv(), "iv");
Preconditions.checkArgument(encryptedKeyVersion.getEncryptedKey()
.getVersionName().equals(KeyProviderCryptoExtension.EEK),
checkNotNull(encryptedKeyVersion.getEncryptionKeyVersionName(),
"versionName");
checkNotNull(encryptedKeyVersion.getEncryptedKeyIv(), "iv");
Preconditions.checkArgument(
encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
.equals(KeyProviderCryptoExtension.EEK),
"encryptedKey version name must be '%s', is '%s'",
KeyProviderCryptoExtension.EK, encryptedKeyVersion.getEncryptedKey()
.getVersionName());
checkNotNull(encryptedKeyVersion.getEncryptedKey(), "encryptedKey");
KeyProviderCryptoExtension.EK,
encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
);
checkNotNull(encryptedKeyVersion.getEncryptedKeyVersion(), "encryptedKey");
Map<String, String> params = new HashMap<String, String>();
params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_DECRYPT);
Map<String, Object> jsonPayload = new HashMap<String, Object>();
jsonPayload.put(KMSRESTConstants.NAME_FIELD,
encryptedKeyVersion.getKeyName());
encryptedKeyVersion.getEncryptionKeyName());
jsonPayload.put(KMSRESTConstants.IV_FIELD, Base64.encodeBase64String(
encryptedKeyVersion.getIv()));
encryptedKeyVersion.getEncryptedKeyIv()));
jsonPayload.put(KMSRESTConstants.MATERIAL_FIELD, Base64.encodeBase64String(
encryptedKeyVersion.getEncryptedKey().getMaterial()));
encryptedKeyVersion.getEncryptedKeyVersion().getMaterial()));
URL url = createURL(KMSRESTConstants.KEY_VERSION_RESOURCE,
encryptedKeyVersion.getKeyVersionName(),
encryptedKeyVersion.getEncryptionKeyVersionName(),
KMSRESTConstants.EEK_SUB_RESOURCE, params);
HttpURLConnection conn = createConnection(url, HTTP_POST);
conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME);

View File

@ -288,9 +288,7 @@ public String toString() {
@Override
public void readFields(DataInput in) throws IOException {
int newLength = WritableUtils.readVInt(in);
setCapacity(newLength, false);
in.readFully(bytes, 0, newLength);
length = newLength;
readWithKnownLength(in, newLength);
}
public void readFields(DataInput in, int maxLength) throws IOException {
@ -302,9 +300,7 @@ public void readFields(DataInput in, int maxLength) throws IOException {
throw new IOException("tried to deserialize " + newLength +
" bytes of data, but maxLength = " + maxLength);
}
setCapacity(newLength, false);
in.readFully(bytes, 0, newLength);
length = newLength;
readWithKnownLength(in, newLength);
}
/** Skips over one Text in the input. */
@ -313,6 +309,17 @@ public static void skip(DataInput in) throws IOException {
WritableUtils.skipFully(in, length);
}
/**
* Read a Text object whose length is already known.
* This allows creating Text from a stream which uses a different serialization
* format.
*/
public void readWithKnownLength(DataInput in, int len) throws IOException {
setCapacity(len, false);
in.readFully(bytes, 0, len);
length = len;
}
/** serialize
* write this object to out
* length uses zero-compressed encoding

View File

@ -883,8 +883,8 @@ protected int getWeight(Node reader, Node node) {
* @param seed Used to seed the pseudo-random generator that randomizes the
* set of nodes at each network distance.
*/
public void sortByDistance(Node reader, Node[] nodes,
int activeLen, long seed) {
public void sortByDistance(Node reader, Node[] nodes, int activeLen,
long seed, boolean randomizeBlockLocationsPerBlock) {
/** Sort weights for the nodes array */
int[] weights = new int[activeLen];
for (int i=0; i<activeLen; i++) {
@ -906,8 +906,11 @@ public void sortByDistance(Node reader, Node[] nodes,
// Seed is normally the block id
// This means we use the same pseudo-random order for each block, for
// potentially better page cache usage.
// Seed is not used if we want to randomize block location for every block
Random rand = getRandom();
if (!randomizeBlockLocationsPerBlock) {
rand.setSeed(seed);
}
int idx = 0;
for (List<Node> list: tree.values()) {
if (list != null) {

View File

@ -279,8 +279,8 @@ protected int getWeight(Node reader, Node node) {
* set of nodes at each network distance.
*/
@Override
public void sortByDistance( Node reader, Node[] nodes,
int activeLen, long seed) {
public void sortByDistance(Node reader, Node[] nodes, int activeLen,
long seed, boolean randomizeBlockLocationsPerBlock) {
// If reader is not a datanode (not in NetworkTopology tree), we need to
// replace this reader with a sibling leaf node in tree.
if (reader != null && !this.contains(reader)) {
@ -293,7 +293,8 @@ public void sortByDistance( Node reader, Node[] nodes,
return;
}
}
super.sortByDistance(reader, nodes, nodes.length, seed);
super.sortByDistance(reader, nodes, nodes.length, seed,
randomizeBlockLocationsPerBlock);
}
/** InnerNodeWithNodeGroup represents a switch/router of a data center, rack

View File

@ -27,7 +27,7 @@
*
* <p><code>Tool</code>, is the standard for any Map-Reduce tool/application.
* The tool/application should delegate the handling of
* <a href="{@docRoot}/org/apache/hadoop/util/GenericOptionsParser.html#GenericOptions">
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/CommandsManual.html#Generic_Options">
* standard command-line options</a> to {@link ToolRunner#run(Tool, String[])}
* and only handle its custom arguments.</p>
*

View File

@ -17,51 +17,112 @@
*/
package org.apache.hadoop.crypto.key;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.security.SecureRandom;
import java.util.Arrays;
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.apache.hadoop.conf.Configuration;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
public class TestKeyProviderCryptoExtension {
private static final String CIPHER = "AES";
private static final String ENCRYPTION_KEY_NAME = "fooKey";
private static Configuration conf;
private static KeyProvider kp;
private static KeyProviderCryptoExtension kpExt;
private static KeyProvider.Options options;
private static KeyVersion encryptionKey;
@BeforeClass
public static void setup() throws Exception {
conf = new Configuration();
kp = new UserProvider.Factory().createProvider(new URI("user:///"), conf);
kpExt = KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
options = new KeyProvider.Options(conf);
options.setCipher(CIPHER);
options.setBitLength(128);
encryptionKey =
kp.createKey(ENCRYPTION_KEY_NAME, SecureRandom.getSeed(16), options);
}
@Test
public void testGenerateEncryptedKey() throws Exception {
Configuration conf = new Configuration();
KeyProvider kp =
new UserProvider.Factory().createProvider(new URI("user:///"), conf);
KeyProvider.Options options = new KeyProvider.Options(conf);
options.setCipher(CIPHER);
options.setBitLength(128);
KeyProvider.KeyVersion kv = kp.createKey("foo", SecureRandom.getSeed(16),
options);
KeyProviderCryptoExtension kpExt =
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
// Generate a new EEK and check it
KeyProviderCryptoExtension.EncryptedKeyVersion ek1 =
kpExt.generateEncryptedKey(kv.getName());
Assert.assertEquals(KeyProviderCryptoExtension.EEK,
ek1.getEncryptedKey().getVersionName());
Assert.assertEquals("foo", ek1.getKeyName());
Assert.assertNotNull(ek1.getEncryptedKey().getMaterial());
Assert.assertEquals(kv.getMaterial().length,
ek1.getEncryptedKey().getMaterial().length);
KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
Assert.assertArrayEquals(k1.getMaterial(), k1a.getMaterial());
Assert.assertEquals(kv.getMaterial().length, k1.getMaterial().length);
kpExt.generateEncryptedKey(encryptionKey.getName());
assertEquals("Version name of EEK should be EEK",
KeyProviderCryptoExtension.EEK,
ek1.getEncryptedKeyVersion().getVersionName());
assertEquals("Name of EEK should be encryption key name",
ENCRYPTION_KEY_NAME, ek1.getEncryptionKeyName());
assertNotNull("Expected encrypted key material",
ek1.getEncryptedKeyVersion().getMaterial());
assertEquals("Length of encryption key material and EEK material should "
+ "be the same", encryptionKey.getMaterial().length,
ek1.getEncryptedKeyVersion().getMaterial().length
);
KeyProviderCryptoExtension.EncryptedKeyVersion ek2 =
kpExt.generateEncryptedKey(kv.getName());
KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
boolean eq = true;
for (int i = 0; eq && i < ek2.getEncryptedKey().getMaterial().length; i++) {
eq = k2.getMaterial()[i] == k1.getMaterial()[i];
// Decrypt EEK into an EK and check it
KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
assertEquals(encryptionKey.getMaterial().length, k1.getMaterial().length);
if (Arrays.equals(k1.getMaterial(), encryptionKey.getMaterial())) {
fail("Encrypted key material should not equal encryption key material");
}
Assert.assertFalse(eq);
if (Arrays.equals(ek1.getEncryptedKeyVersion().getMaterial(),
encryptionKey.getMaterial())) {
fail("Encrypted key material should not equal decrypted key material");
}
// Decrypt it again and it should be the same
KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
assertArrayEquals(k1.getMaterial(), k1a.getMaterial());
// Generate another EEK and make sure it's different from the first
KeyProviderCryptoExtension.EncryptedKeyVersion ek2 =
kpExt.generateEncryptedKey(encryptionKey.getName());
KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
if (Arrays.equals(k1.getMaterial(), k2.getMaterial())) {
fail("Generated EEKs should have different material!");
}
if (Arrays.equals(ek1.getEncryptedKeyIv(), ek2.getEncryptedKeyIv())) {
fail("Generated EEKs should have different IVs!");
}
}
@Test
public void testEncryptDecrypt() throws Exception {
// Get an EEK
KeyProviderCryptoExtension.EncryptedKeyVersion eek =
kpExt.generateEncryptedKey(encryptionKey.getName());
final byte[] encryptedKeyIv = eek.getEncryptedKeyIv();
final byte[] encryptedKeyMaterial = eek.getEncryptedKeyVersion()
.getMaterial();
// Decrypt it manually
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
cipher.init(Cipher.DECRYPT_MODE,
new SecretKeySpec(encryptionKey.getMaterial(), "AES"),
new IvParameterSpec(KeyProviderCryptoExtension.EncryptedKeyVersion
.deriveIV(encryptedKeyIv)));
final byte[] manualMaterial = cipher.doFinal(encryptedKeyMaterial);
// Decrypt it with the API
KeyVersion decryptedKey = kpExt.decryptEncryptedKey(eek);
final byte[] apiMaterial = decryptedKey.getMaterial();
assertArrayEquals("Wrong key material from decryptEncryptedKey",
manualMaterial, apiMaterial);
}
}

View File

@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Random;
import com.google.common.base.Charsets;
import com.google.common.primitives.Bytes;
/** Unit tests for LargeUTF8. */
@ -364,6 +365,27 @@ public void testReadWriteOperations() {
}
}
public void testReadWithKnownLength() throws IOException {
String line = "hello world";
byte[] inputBytes = line.getBytes(Charsets.UTF_8);
DataInputBuffer in = new DataInputBuffer();
Text text = new Text();
in.reset(inputBytes, inputBytes.length);
text.readWithKnownLength(in, 5);
assertEquals("hello", text.toString());
// Read longer length, make sure it lengthens
in.reset(inputBytes, inputBytes.length);
text.readWithKnownLength(in, 7);
assertEquals("hello w", text.toString());
// Read shorter length, make sure it shortens
in.reset(inputBytes, inputBytes.length);
text.readWithKnownLength(in, 2);
assertEquals("he", text.toString());
}
/**
* test {@code Text.bytesToCodePoint(bytes) }
* with {@code BufferUnderflowException}

View File

@ -105,7 +105,7 @@ public void testSortByDistance() throws Exception {
testNodes[2] = dataNodes[3];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[2]);
@ -117,7 +117,7 @@ public void testSortByDistance() throws Exception {
testNodes[2] = dataNodes[1];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
@ -127,7 +127,7 @@ public void testSortByDistance() throws Exception {
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);
@ -137,7 +137,7 @@ public void testSortByDistance() throws Exception {
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(computeNode, testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);
}

View File

@ -64,12 +64,12 @@ public static Map toJSON(EncryptedKeyVersion encryptedKeyVersion) {
Map json = new LinkedHashMap();
if (encryptedKeyVersion != null) {
json.put(KMSRESTConstants.VERSION_NAME_FIELD,
encryptedKeyVersion.getKeyVersionName());
encryptedKeyVersion.getEncryptionKeyVersionName());
json.put(KMSRESTConstants.IV_FIELD,
Base64.encodeBase64URLSafeString(
encryptedKeyVersion.getIv()));
encryptedKeyVersion.getEncryptedKeyIv()));
json.put(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD,
toJSON(encryptedKeyVersion.getEncryptedKey()));
toJSON(encryptedKeyVersion.getEncryptedKeyVersion()));
}
return json;
}

View File

@ -485,10 +485,10 @@ public Void call() throws Exception {
EncryptedKeyVersion ek1 = kpExt.generateEncryptedKey(kv.getName());
Assert.assertEquals(KeyProviderCryptoExtension.EEK,
ek1.getEncryptedKey().getVersionName());
Assert.assertNotNull(ek1.getEncryptedKey().getMaterial());
ek1.getEncryptedKeyVersion().getVersionName());
Assert.assertNotNull(ek1.getEncryptedKeyVersion().getMaterial());
Assert.assertEquals(kv.getMaterial().length,
ek1.getEncryptedKey().getMaterial().length);
ek1.getEncryptedKeyVersion().getMaterial().length);
KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1);
Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName());
KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1);
@ -498,8 +498,8 @@ public Void call() throws Exception {
EncryptedKeyVersion ek2 = kpExt.generateEncryptedKey(kv.getName());
KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2);
boolean isEq = true;
for (int i = 0; isEq && i < ek2.getEncryptedKey().getMaterial().length;
i++) {
for (int i = 0; isEq && i < ek2.getEncryptedKeyVersion()
.getMaterial().length; i++) {
isEq = k2.getMaterial()[i] == k1.getMaterial()[i];
}
Assert.assertFalse(isEq);

View File

@ -1051,8 +1051,12 @@ public READDIR3Response mknod(XDR xdr, RpcInfo info) {
@Override
public REMOVE3Response remove(XDR xdr, RpcInfo info) {
return remove(xdr, getSecurityHandler(info), info.remoteAddress());
}
@VisibleForTesting
REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) {
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
SecurityHandler securityHandler = getSecurityHandler(info);
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@ -1083,17 +1087,19 @@ public REMOVE3Response remove(XDR xdr, RpcInfo info) {
return new REMOVE3Response(Nfs3Status.NFS3ERR_STALE);
}
WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
return new REMOVE3Response(Nfs3Status.NFS3ERR_ACCES, errWcc);
}
String fileIdPath = dirFileIdPath + "/" + fileName;
HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
if (fstat == null) {
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc);
return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
}
if (fstat.isDir()) {
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr);
return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, dirWcc);
return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, errWcc);
}
boolean result = dfsClient.delete(fileIdPath, false);

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.response.REMOVE3Response;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
public class TestClientAccessPrivilege {
static MiniDFSCluster cluster = null;
static NfsConfiguration config = new NfsConfiguration();
static DistributedFileSystem hdfs;
static NameNode nn;
static String testdir = "/tmp";
static SecurityHandler securityHandler;
@BeforeClass
public static void setup() throws Exception {
String currentUser = System.getProperty("user.name");
config.set(DefaultImpersonationProvider.getTestProvider()
.getProxySuperuserGroupConfKey(currentUser), "*");
config.set(DefaultImpersonationProvider.getTestProvider()
.getProxySuperuserIpConfKey(currentUser), "*");
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
hdfs = cluster.getFileSystem();
nn = cluster.getNameNode();
// Use ephemeral port in case tests are running in parallel
config.setInt("nfs3.mountd.port", 0);
config.setInt("nfs3.server.port", 0);
securityHandler = Mockito.mock(SecurityHandler.class);
Mockito.when(securityHandler.getUser()).thenReturn(
System.getProperty("user.name"));
}
@AfterClass
public static void shutdown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Before
public void createFiles() throws IllegalArgumentException, IOException {
hdfs.delete(new Path(testdir), true);
hdfs.mkdirs(new Path(testdir));
DFSTestUtil.createFile(hdfs, new Path(testdir + "/f1"), 0, (short) 1, 0);
}
@Test(timeout = 60000)
public void testClientAccessPrivilegeForRemove() throws Exception {
// Configure ro access for nfs1 service
config.set("dfs.nfs.exports.allowed.hosts", "* ro");
// Start nfs
Nfs3 nfs = new Nfs3(config);
nfs.startServiceInternal(false);
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs.getRpcProgram();
// Create a remove request
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
long dirId = status.getFileId();
XDR xdr_req = new XDR();
FileHandle handle = new FileHandle(dirId);
handle.serialize(xdr_req);
xdr_req.writeString("f1");
// Remove operation
REMOVE3Response response = nfsd.remove(xdr_req.asReadOnlyWrap(),
securityHandler, new InetSocketAddress("localhost", 1234));
// Assert on return code
assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES,
response.getStatus());
}
}

View File

@ -331,6 +331,9 @@ Release 2.6.0 - UNRELEASED
datanodes and change datanode to write block replicas using the specified
storage type. (szetszwo)
HDFS-6701. Make seed optional in NetworkTopology#sortByDistance.
(Ashwin Shankar via wang)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -368,6 +371,12 @@ Release 2.6.0 - UNRELEASED
HDFS-6667. In HDFS HA mode, Distcp/SLive with webhdfs on secure cluster fails
with Client cannot authenticate via:[TOKEN, KERBEROS] error. (jing9)
HDFS-6704. Fix the command to launch JournalNode in HDFS-HA document.
(Akira AJISAKA via jing9)
HDFS-6731. Run "hdfs zkfc-formatZK" on a server in a non-namenode will cause
a null pointer exception. (Masatake Iwasaki via brandonli)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -616,6 +625,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes
correctly. (szetszwo)
HDFS-6712. Document HDFS Multihoming Settings. (Arpit Agarwal)
OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
@ -898,6 +909,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6378. NFS registration should timeout instead of hanging when
portmap/rpcbind is not available (Abhiraj Butala via brandonli)
HDFS-6703. NFS: Files can be deleted from a read-only mount
(Srikanth Upputuri via brandonli)
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)

View File

@ -214,6 +214,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
public static final String DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK = "dfs.namenode.randomize-block-locations-per-block";
public static final boolean DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT = false;
public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;

View File

@ -345,7 +345,8 @@ private boolean isInactive(DatanodeInfo datanode) {
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks) {
final List<LocatedBlock> locatedblocks,
boolean randomizeBlockLocationsPerBlock) {
//sort the blocks
// As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
@ -372,8 +373,8 @@ public void sortLocatedBlocks(final String targethost,
--lastActiveIndex;
}
int activeLen = lastActiveIndex + 1;
networktopology.sortByDistance(client, b.getLocations(), activeLen,
b.getBlock().getBlockId());
networktopology.sortByDistance(client, b.getLocations(), activeLen, b
.getBlock().getBlockId(), randomizeBlockLocationsPerBlock);
}
}

View File

@ -83,6 +83,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
import static org.apache.hadoop.util.Time.now;
import java.io.BufferedWriter;
@ -527,6 +530,8 @@ private void logAuditEvent(boolean succeeded,
private final FSImage fsImage;
private boolean randomizeBlockLocationsPerBlock;
/**
* Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use
@ -837,6 +842,10 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
this.randomizeBlockLocationsPerBlock = conf.getBoolean(
DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK,
DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT);
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(this, conf);
this.snapshotManager = new SnapshotManager(dir);
@ -1699,8 +1708,8 @@ LocatedBlocks getBlockLocations(String clientMachine, String src,
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true,
true);
if (blocks != null) {
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock);
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
@ -1708,8 +1717,8 @@ LocatedBlocks getBlockLocations(String clientMachine, String src,
ArrayList<LocatedBlock> lastBlockList =
Lists.newArrayListWithCapacity(1);
lastBlockList.add(lastBlock);
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, lastBlockList);
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
lastBlockList, randomizeBlockLocationsPerBlock);
}
}
return blocks;

View File

@ -122,6 +122,11 @@ public static DFSZKFailoverController create(Configuration conf) {
"HA is not enabled for this namenode.");
}
String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
if (nnId == null) {
String msg = "Could not get the namenode ID of this node. " +
"You may run zkfc on the node other than namenode.";
throw new HadoopIllegalArgumentException(msg);
}
NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);

View File

@ -2040,4 +2040,17 @@
</description>
</property>
<property>
<name>dfs.namenode.randomize-block-locations-per-block</name>
<value>false</value>
<description>When fetching replica locations of a block, the replicas
are sorted based on network distance. This configuration parameter
determines whether the replicas at the same network distance are randomly
shuffled. By default, this is false, such that repeated requests for a block's
replicas always result in the same order. This potentially improves page cache
behavior. However, for some network topologies, it is desirable to shuffle this
order for better load balancing.
</description>
</property>
</configuration>

View File

@ -416,8 +416,8 @@ HDFS High Availability Using the Quorum Journal Manager
After all of the necessary configuration options have been set, you must
start the JournalNode daemons on the set of machines where they will run. This
can be done by running the command "<hdfs-daemon.sh journalnode>" and waiting
for the daemon to start on each of the relevant machines.
can be done by running the command "<hadoop-daemon.sh start journalnode>" and
waiting for the daemon to start on each of the relevant machines.
Once the JournalNodes have been started, one must initially synchronize the
two HA NameNodes' on-disk metadata.

View File

@ -0,0 +1,145 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~
~~ http://www.apache.org/licenses/LICENSE-2.0
~~
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
---
Hadoop Distributed File System-${project.version} - Support for Multi-Homed Networks
---
---
${maven.build.timestamp}
HDFS Support for Multihomed Networks
This document is targetted to cluster administrators deploying <<<HDFS>>> in
multihomed networks. Similar support for <<<YARN>>>/<<<MapReduce>>> is
work in progress and will be documented when available.
%{toc|section=1|fromDepth=0}
* Multihoming Background
In multihomed networks the cluster nodes are connected to more than one
network interface. There could be multiple reasons for doing so.
[[1]] <<Security>>: Security requirements may dictate that intra-cluster
traffic be confined to a different network than the network used to
transfer data in and out of the cluster.
[[2]] <<Performance>>: Intra-cluster traffic may use one or more high bandwidth
interconnects like Fiber Channel, Infiniband or 10GbE.
[[3]] <<Failover/Redundancy>>: The nodes may have multiple network adapters
connected to a single network to handle network adapter failure.
Note that NIC Bonding (also known as NIC Teaming or Link
Aggregation) is a related but separate topic. The following settings
are usually not applicable to a NIC bonding configuration which handles
multiplexing and failover transparently while presenting a single 'logical
network' to applications.
* Fixing Hadoop Issues In Multihomed Environments
** Ensuring HDFS Daemons Bind All Interfaces
By default <<<HDFS>>> endpoints are specified as either hostnames or IP addresses.
In either case <<<HDFS>>> daemons will bind to a single IP address making
the daemons unreachable from other networks.
The solution is to have separate setting for server endpoints to force binding
the wildcard IP address <<<INADDR_ANY>>> i.e. <<<0.0.0.0>>>. Do NOT supply a port
number with any of these settings.
----
<property>
<name>dfs.namenode.rpc-bind-host</name>
<value>0.0.0.0</value>
<description>
The actual address the RPC server will bind to. If this optional address is
set, it overrides only the hostname portion of dfs.namenode.rpc-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node listen on all interfaces by
setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.namenode.servicerpc-bind-host</name>
<value>0.0.0.0</value>
<description>
The actual address the service RPC server will bind to. If this optional address is
set, it overrides only the hostname portion of dfs.namenode.servicerpc-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node listen on all interfaces by
setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.namenode.http-bind-host</name>
<value>0.0.0.0</value>
<description>
The actual adress the HTTP server will bind to. If this optional address
is set, it overrides only the hostname portion of dfs.namenode.http-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node HTTP server listen on all
interfaces by setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.namenode.https-bind-host</name>
<value>0.0.0.0</value>
<description>
The actual adress the HTTPS server will bind to. If this optional address
is set, it overrides only the hostname portion of dfs.namenode.https-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node HTTPS server listen on all
interfaces by setting it to 0.0.0.0.
</description>
</property>
----
** Clients use Hostnames when connecting to DataNodes
By default <<<HDFS>>> clients connect to DataNodes using the IP address
provided by the NameNode. Depending on the network configuration this
IP address may be unreachable by the clients. The fix is letting clients perform
their own DNS resolution of the DataNode hostname. The following setting
enables this behavior.
----
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
<description>Whether clients should use datanode hostnames when
connecting to datanodes.
</description>
</property>
----
** DataNodes use HostNames when connecting to other DataNodes
Rarely, the NameNode-resolved IP address for a DataNode may be unreachable
from other DataNodes. The fix is to force DataNodes to perform their own
DNS resolution for inter-DataNode connections. The following setting enables
this behavior.
----
<property>
<name>dfs.datanode.use.datanode.hostname</name>
<value>true</value>
<description>Whether datanodes should use datanode hostnames when
connecting to other datanodes for data transfer.
</description>
</property>
----

View File

@ -60,7 +60,14 @@ public void setupDatanodes() {
DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/d3/r1"),
DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/d3/r1"),
DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/d3/r2"),
DFSTestUtil.getDatanodeDescriptor("13.13.13.13", "/d3/r2")
DFSTestUtil.getDatanodeDescriptor("13.13.13.13", "/d3/r2"),
DFSTestUtil.getDatanodeDescriptor("14.14.14.14", "/d4/r1"),
DFSTestUtil.getDatanodeDescriptor("15.15.15.15", "/d4/r1"),
DFSTestUtil.getDatanodeDescriptor("16.16.16.16", "/d4/r1"),
DFSTestUtil.getDatanodeDescriptor("17.17.17.17", "/d4/r1"),
DFSTestUtil.getDatanodeDescriptor("18.18.18.18", "/d4/r1"),
DFSTestUtil.getDatanodeDescriptor("19.19.19.19", "/d4/r1"),
DFSTestUtil.getDatanodeDescriptor("20.20.20.20", "/d4/r1"),
};
for (int i = 0; i < dataNodes.length; i++) {
cluster.add(dataNodes[i]);
@ -107,7 +114,7 @@ public void testCreateInvalidTopology() throws Exception {
@Test
public void testRacks() throws Exception {
assertEquals(cluster.getNumOfRacks(), 5);
assertEquals(cluster.getNumOfRacks(), 6);
assertTrue(cluster.isOnSameRack(dataNodes[0], dataNodes[1]));
assertFalse(cluster.isOnSameRack(dataNodes[1], dataNodes[2]));
assertTrue(cluster.isOnSameRack(dataNodes[2], dataNodes[3]));
@ -133,7 +140,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[2];
testNodes[2] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[2]);
@ -146,7 +153,7 @@ public void testSortByDistance() throws Exception {
dtestNodes[3] = dataNodes[9];
dtestNodes[4] = dataNodes[10];
cluster.sortByDistance(dataNodes[8], dtestNodes,
dtestNodes.length - 2, 0xDEADBEEF);
dtestNodes.length - 2, 0xDEADBEEF, false);
assertTrue(dtestNodes[0] == dataNodes[8]);
assertTrue(dtestNodes[1] == dataNodes[11]);
assertTrue(dtestNodes[2] == dataNodes[12]);
@ -158,7 +165,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[3];
testNodes[2] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[3]);
@ -168,7 +175,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[3];
testNodes[2] = dataNodes[1];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[1]);
assertTrue(testNodes[1] == dataNodes[3]);
assertTrue(testNodes[2] == dataNodes[5]);
@ -178,7 +185,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[5];
testNodes[2] = dataNodes[3];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF);
testNodes.length, 0xDEADBEEF, false);
assertTrue(testNodes[0] == dataNodes[1]);
assertTrue(testNodes[1] == dataNodes[3]);
assertTrue(testNodes[2] == dataNodes[5]);
@ -188,7 +195,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[5];
testNodes[2] = dataNodes[3];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEAD);
testNodes.length, 0xDEAD, false);
// sortByDistance does not take the "data center" layer into consideration
// and it doesn't sort by getDistance, so 1, 5, 3 is also valid here
assertTrue(testNodes[0] == dataNodes[1]);
@ -204,7 +211,27 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[6];
testNodes[2] = dataNodes[7];
cluster.sortByDistance(dataNodes[i], testNodes,
testNodes.length, 0xBEADED+i);
testNodes.length, 0xBEADED+i, false);
if (first == null) {
first = testNodes[0];
} else {
if (first != testNodes[0]) {
foundRandom = true;
break;
}
}
}
assertTrue("Expected to find a different first location", foundRandom);
// Array of rack local nodes with randomizeBlockLocationsPerBlock set to
// true
// Expect random order of block locations for same block
first = null;
for (int i = 1; i <= 4; i++) {
testNodes[0] = dataNodes[13];
testNodes[1] = dataNodes[14];
testNodes[2] = dataNodes[15];
cluster.sortByDistance(dataNodes[15 + i], testNodes, testNodes.length,
0xBEADED, true);
if (first == null) {
first = testNodes[0];
} else {

View File

@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5971. Move the default options for distcp -p to
DistCpOptionSwitch. (clamb via wang)
MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with
compatible/incompatible changes (Junping Du via jlowe)
OPTIMIZATIONS
BUG FIXES

View File

@ -82,10 +82,13 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
@ -125,6 +128,7 @@
import org.jboss.netty.util.CharsetUtil;
import org.mortbay.jetty.HttpHeaders;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@ -146,8 +150,9 @@ public class ShuffleHandler extends AuxiliaryService {
Pattern.CASE_INSENSITIVE);
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
private static final String STATE_DB_SCHEMA_VERSION = "1.0";
private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
protected static final NMDBSchemaVersion CURRENT_VERSION_INFO =
NMDBSchemaVersion.newInstance(1, 0);
private int port;
private ChannelFactory selector;
@ -466,18 +471,15 @@ private void startStore(Path recoveryRoot) throws IOException {
Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
LOG.info("Using state database at " + dbPath + " for recovery");
File dbfile = new File(dbPath.toString());
byte[] schemaVersionData;
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
storeVersion();
} catch (DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}
@ -485,15 +487,69 @@ private void startStore(Path recoveryRoot) throws IOException {
throw e;
}
}
if (schemaVersionData != null) {
String schemaVersion = asString(schemaVersionData);
// only support exact schema matches for now
if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
throw new IOException("Incompatible state database schema, found "
+ schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
checkVersion();
}
@VisibleForTesting
NMDBSchemaVersion loadVersion() throws IOException {
byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) {
return NMDBSchemaVersion.newInstance(1, 0);
}
NMDBSchemaVersion version =
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
return version;
}
private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException {
String key = STATE_DB_SCHEMA_VERSION_KEY;
byte[] data =
((NMDBSchemaVersionPBImpl) version).getProto().toByteArray();
try {
stateDb.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
}
private void storeVersion() throws IOException {
storeSchemaVersion(CURRENT_VERSION_INFO);
}
// Only used for test
@VisibleForTesting
void storeVersion(NMDBSchemaVersion version) throws IOException {
storeSchemaVersion(version);
}
protected NMDBSchemaVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of DB schema is a major upgrade, and any
* compatible change of DB schema is a minor upgrade.
* 3) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade shuffle info or remove incompatible old state.
*/
private void checkVersion() throws IOException {
NMDBSchemaVersion loadedVersion = loadVersion();
LOG.info("Loaded state DB schema version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing state DB schedma version info " + getCurrentVersion());
storeVersion();
} else {
throw new IOException("State database schema version not found");
throw new IOException(
"Incompatible version for state DB schema: expecting DB schema version "
+ getCurrentVersion() + ", but loading version " + loadedVersion);
}
}

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -74,6 +75,7 @@
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -719,6 +721,94 @@ public void testRecovery() throws IOException {
}
}
@Test
public void testRecoveryFromOtherVersions() throws IOException {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(12345, 1);
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffle = new ShuffleHandler();
// emulate aux services startup with recovery enabled
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
tmpDir.mkdirs();
try {
shuffle.init(conf);
shuffle.start();
// setup a shuffle token for an application
DataOutputBuffer outputBuffer = new DataOutputBuffer();
outputBuffer.reset();
Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text(user),
new Text("shuffleService"));
jt.write(outputBuffer);
shuffle.initializeApplication(new ApplicationInitializationContext(user,
appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
outputBuffer.getLength())));
// verify we are authorized to shuffle
int rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// emulate shuffle handler restart
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
// verify we are still authorized to shuffle to the old application
rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0);
Assert.assertEquals(version, shuffle.getCurrentVersion());
// emulate shuffle handler restart with compatible version
NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1);
// update version info before close shuffle
shuffle.storeVersion(version11);
Assert.assertEquals(version11, shuffle.loadVersion());
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
// shuffle version will be override by CURRENT_VERSION_INFO after restart
// successfully.
Assert.assertEquals(version, shuffle.loadVersion());
// verify we are still authorized to shuffle to the old application
rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// emulate shuffle handler restart with incompatible version
NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1);
shuffle.storeVersion(version21);
Assert.assertEquals(version21, shuffle.loadVersion());
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
try {
shuffle.start();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for state DB schema:"));
}
} finally {
if (shuffle != null) {
shuffle.close();
}
FileUtil.fullyDelete(tmpDir);
}
}
private static int getShuffleResponseCode(ShuffleHandler shuffle,
Token<JobTokenIdentifier> jt) throws IOException {
URL url = new URL("http://127.0.0.1:"

View File

@ -89,6 +89,7 @@
<item name="HDFS NFS Gateway" href="hadoop-project-dist/hadoop-hdfs/HdfsNfsGateway.html"/>
<item name="HDFS Rolling Upgrade" href="hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html"/>
<item name="Extended Attributes" href="hadoop-project-dist/hadoop-hdfs/ExtendedAttributes.html"/>
<item name="HDFS Support for Multihoming" href="hadoop-project-dist/hadoop-hdfs/HdfsMultihoming.html"/>
</menu>
<menu name="MapReduce" inherit="top">

View File

@ -59,6 +59,9 @@ Release 2.6.0 - UNRELEASED
YARN-2013. The diagnostics is always the ExitCodeException stack when the container
crashes. (Tsuyoshi OZAWA via junping_du)
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
(Li Lu via jianhe)
OPTIMIZATIONS
BUG FIXES
@ -85,6 +88,12 @@ Release 2.6.0 - UNRELEASED
YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement
(Leitao Guo via jlowe)
YARN-2273. NPE in ContinuousScheduling thread when we lose a node.
(Wei Yan via kasha)
YARN-2313. Livelock can occur in FairScheduler when there are lots of
running apps (Tsuyoshi Ozawa via Sandy Ryza)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -415,6 +424,9 @@ Release 2.5.0 - UNRELEASED
YARN-2270. Made TestFSDownload#testDownloadPublicWithStatCache be skipped
when theres no ancestor permissions. (Akira Ajisaka via zjshen)
YARN-2319. Made the MiniKdc instance start/close before/after the class of
TestRMWebServicesDelegationTokens. (Wenwu Peng via zjshen)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -194,6 +194,12 @@
<Field name="scheduleAsynchronously" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Inconsistent sync warning - updateInterval is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler" />
<Field name="updateInterval" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Inconsistent sync warning - numRetries is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@ -94,7 +95,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
@ -522,6 +522,8 @@ public void run() throws YarnException, IOException {
+ appAttemptID.toString(), e);
}
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
// are marked as LimitedPrivate
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
@ -900,11 +902,6 @@ public LaunchContainerRunnable(
public void run() {
LOG.info("Setting up container launch container for containerid="
+ container.getId());
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
// Set the environment
ctx.setEnvironment(shellEnv);
// Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@ -935,16 +932,13 @@ public void run() {
return;
}
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
shellRsrc.setType(LocalResourceType.FILE);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
URL yarnUrl = null;
try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
renamedScriptPath.toString())));
yarnUrl = ConverterUtils.getYarnUrlFromURI(
new URI(renamedScriptPath.toString()));
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + renamedScriptPath, e);
// A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
@ -953,13 +947,13 @@ public void run() {
numFailedContainers.incrementAndGet();
return;
}
shellRsrc.setTimestamp(shellScriptPathTimestamp);
shellRsrc.setSize(shellScriptPathLen);
LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
shellScriptPathLen, shellScriptPathTimestamp);
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
ExecShellStringPath, shellRsrc);
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
}
ctx.setLocalResources(localResources);
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
@ -986,16 +980,18 @@ public void run() {
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
ctx.setCommands(commands);
// Set up tokens for the container too. Today, for normal shell commands,
// the container in distribute-shell doesn't need any tokens. We are
// populating them mainly for NodeManagers to be able to download any
// files in the distributed file-system. The tokens are otherwise also
// useful in cases, for e.g., when one is running a "hadoop dfs" command
// inside the distributed shell.
ctx.setTokens(allTokens.duplicate());
// Set up ContainerLaunchContext, setting local resource, environment,
// command and token for constructor.
// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
}
@ -1024,15 +1020,13 @@ private ContainerRequest setupContainerAskForRM() {
// setup requirements for hosts
// using * as any host will do for the distributed shell app
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
pri.setPriority(requestPriority);
Priority pri = Priority.newInstance(requestPriority);
// Set up resource type requirements
// For now, memory and CPU are supported so we set memory and cpu requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory);
capability.setVirtualCores(containerVirtualCores);
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null,
pri);

View File

@ -456,9 +456,6 @@ public boolean run() throws IOException, YarnException {
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
@ -508,8 +505,6 @@ public boolean run() throws IOException, YarnException {
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
localResources, StringUtils.join(shellArgs, " "));
}
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
// Set the necessary security tokens as needed
//amContainer.setContainerTokens(containerToken);
@ -550,8 +545,6 @@ public boolean run() throws IOException, YarnException {
env.put("CLASSPATH", classPathEnv.toString());
amContainer.setEnvironment(env);
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
@ -587,14 +580,15 @@ public boolean run() throws IOException, YarnException {
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
amContainer.setCommands(commands);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(amMemory);
capability.setVirtualCores(amVCores);
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
// Service data is a binary blob that can be passed to the application
@ -603,6 +597,7 @@ public boolean run() throws IOException, YarnException {
// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
Credentials credentials = new Credentials();
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
@ -627,9 +622,8 @@ public boolean run() throws IOException, YarnException {
appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
pri.setPriority(amPriority);
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM

View File

@ -1294,12 +1294,20 @@ private void setAMContainerCrashedDiagnosticsAndExitStatus(
private String getAMContainerCrashedDiagnostics(
RMAppAttemptContainerFinishedEvent finishEvent) {
ContainerStatus status = finishEvent.getContainerStatus();
String diagnostics =
"AM Container for " + finishEvent.getApplicationAttemptId()
+ " exited with " + " exitCode: " + status.getExitStatus() + ". "
+ "Check application tracking page: " + this.getTrackingUrl()
+ " . Then, click on links to logs of each attempt for detailed output. ";
return diagnostics;
StringBuilder diagnosticsBuilder = new StringBuilder();
diagnosticsBuilder.append("AM Container for ").append(
finishEvent.getApplicationAttemptId()).append(
" exited with ").append(" exitCode: ").append(status.getExitStatus()).
append("\n");
if (this.getTrackingUrl() != null) {
diagnosticsBuilder.append("For more detailed output,").append(
" check application tracking page:").append(
this.getTrackingUrl()).append(
"Then, click on links to logs of each attempt.\n");
}
diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics())
.append("Failing this attempt");
return diagnosticsBuilder.toString();
}
private static class FinalTransition extends BaseFinalTransition {

View File

@ -135,7 +135,7 @@ public class FairScheduler extends
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
protected long updateInterval;
private final int UPDATE_DEBUG_FREQUENCY = 5;
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
@ -244,13 +244,13 @@ public QueueManager getQueueManager() {
/**
* A runnable which calls {@link FairScheduler#update()} every
* <code>UPDATE_INTERVAL</code> milliseconds.
* <code>updateInterval</code> milliseconds.
*/
private class UpdateThread implements Runnable {
public void run() {
while (true) {
try {
Thread.sleep(UPDATE_INTERVAL);
Thread.sleep(updateInterval);
update();
preemptTasksIfNecessary();
} catch (Exception e) {
@ -970,8 +970,7 @@ private synchronized void nodeUpdate(RMNode nm) {
}
}
private void continuousScheduling() {
while (true) {
void continuousSchedulingAttempt() {
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
@ -983,33 +982,30 @@ private void continuousScheduling() {
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
if (nodes.containsKey(nodeId)) {
FSSchedulerNode node = getFSSchedulerNode(nodeId);
try {
if (Resources.fitsIn(minimumAllocation,
if (node != null && Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
attemptScheduling(node);
}
} catch (Throwable ex) {
LOG.warn("Error while attempting scheduling for node " + node +
LOG.error("Error while attempting scheduling for node " + node +
": " + ex.toString(), ex);
}
}
}
try {
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Error while doing sleep in continuous scheduling: " +
e.toString(), e);
}
}
}
/** Sort nodes by available resource */
private class NodeAvailableResourceComparator implements Comparator<NodeId> {
@Override
public int compare(NodeId n1, NodeId n2) {
if (!nodes.containsKey(n1)) {
return 1;
}
if (!nodes.containsKey(n2)) {
return -1;
}
return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getAvailableResource(),
nodes.get(n1).getAvailableResource());
@ -1210,6 +1206,15 @@ private synchronized void initScheduler(Configuration conf)
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
updateInterval = this.conf.getUpdateInterval();
if (updateInterval < 0) {
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
+ " is invalid, so using default value " +
+ FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ " ms instead");
}
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
// This stores per-application scheduling information
this.applications =
@ -1234,7 +1239,16 @@ private synchronized void initScheduler(Configuration conf)
new Runnable() {
@Override
public void run() {
continuousScheduling();
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.error("Continuous scheduling thread interrupted. Exiting. ",
e);
return;
}
}
}
}
);

View File

@ -123,6 +123,11 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
protected static final int DEFAULT_MAX_ASSIGN = -1;
/** The update interval for calculating resources in FairScheduler .*/
public static final String UPDATE_INTERVAL_MS =
CONF_PREFIX + "update-interval-ms";
public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
public FairSchedulerConfiguration() {
super();
}
@ -247,6 +252,10 @@ public static Resource parseResourceConfigValue(String val)
}
}
public long getUpdateInterval() {
return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
}
private static int findResource(String val, String units)
throws AllocationConfigurationException {
Pattern pattern = Pattern.compile("(\\d+) ?" + units);

View File

@ -823,7 +823,9 @@ public void testAMCrashAtAllocated() {
applicationAttempt.getAppAttemptState());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics());
boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null);
verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(),
exitCode, shouldCheckURL);
}
@Test
@ -1241,11 +1243,18 @@ scheduler, masterService, submissionContext, new Configuration(),
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
}
private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics) {
assertTrue("Diagnostic information does not contain application proxy URL",
diagnostics.contains(applicationAttempt.getWebProxyBase()));
private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics,
int exitCode, boolean shouldCheckURL) {
assertTrue("Diagnostic information does not point the logs to the users",
diagnostics.contains("logs"));
assertTrue("Diagnostic information does not contain application attempt id",
diagnostics.contains(applicationAttempt.getAppAttemptId().toString()));
assertTrue("Diagnostic information does not contain application exit code",
diagnostics.contains("exitCode: " + exitCode));
if (shouldCheckURL) {
assertTrue("Diagnostic information does not contain application proxy URL",
diagnostics.contains(applicationAttempt.getWebProxyBase()));
}
}
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {

View File

@ -2763,6 +2763,42 @@ public void testContinuousScheduling() throws Exception {
Assert.assertEquals(2, nodes.size());
}
@Test
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
// Disable continuous scheduling, will invoke continuous scheduling once manually
scheduler.init(conf);
scheduler.start();
Assert.assertTrue("Continuous scheduling should be disabled.",
!scheduler.isContinuousSchedulingEnabled());
// Add two nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
Assert.assertEquals("We should have two alive nodes.",
2, scheduler.getNumClusterNodes());
// Remove one node
NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(removeNode1);
Assert.assertEquals("We should only have one alive node.",
1, scheduler.getNumClusterNodes());
// Invoke the continuous scheduling once
try {
scheduler.continuousSchedulingAttempt();
} catch (Exception e) {
fail("Exception happened when doing continuous scheduling. " +
e.toString());
}
}
@Test
public void testDontAllowUndeclaredPools() throws Exception{

View File

@ -94,7 +94,7 @@ private void startResourceManager(float utilizationThreshold) {
scheduler = (FairScheduler)resourceManager.getResourceScheduler();
scheduler.setClock(clock);
scheduler.UPDATE_INTERVAL = 60 * 1000;
scheduler.updateInterval = 60 * 1000;
}
private void registerNodeAndSubmitApp(

View File

@ -60,7 +60,9 @@
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -90,28 +92,14 @@
@RunWith(Parameterized.class)
public class TestRMWebServicesDelegationTokens extends JerseyTest {
private static final File testRootDir = new File("target",
TestRMWebServicesDelegationTokens.class.getName() + "-root");
private static File testRootDir;
private static File httpSpnegoKeytabFile = new File(
KerberosTestUtils.getKeytabFile());
private static String httpSpnegoPrincipal = KerberosTestUtils
.getServerPrincipal();
private static boolean miniKDCStarted = false;
private static MiniKdc testMiniKDC;
static {
try {
testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir);
} catch (Exception e) {
assertTrue("Couldn't create MiniKDC", false);
}
}
private static MockRM rm;
private Injector injector;
private boolean isKerberosAuth = false;
// Make sure the test uses the published header string
@ -237,7 +225,6 @@ public TestRMWebServicesDelegationTokens(int run) throws Exception {
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
setupKDC();
switch (run) {
case 0:
default:
@ -249,17 +236,14 @@ public TestRMWebServicesDelegationTokens(int run) throws Exception {
}
}
private void setupKDC() throws Exception {
if (miniKDCStarted == false) {
@BeforeClass
public static void setupKDC() throws Exception {
testRootDir = new File("target",
TestRMWebServicesDelegationTokens.class.getName() + "-root");
testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir);
testMiniKDC.start();
getKdc().createPrincipal(httpSpnegoKeytabFile, "HTTP/localhost",
testMiniKDC.createPrincipal(httpSpnegoKeytabFile, "HTTP/localhost",
"client", "client2", "client3");
miniKDCStarted = true;
}
}
private MiniKdc getKdc() {
return testMiniKDC;
}
@Before
@ -270,6 +254,13 @@ public void setUp() throws Exception {
testRootDir.deleteOnExit();
}
@AfterClass
public static void shutdownKdc() {
if (testMiniKDC != null) {
testMiniKDC.stop();
}
}
@After
@Override
public void tearDown() throws Exception {

View File

@ -205,6 +205,12 @@ Properties that can be placed in yarn-site.xml
instead. Defaults to true. If a queue placement policy is given in the
allocations file, this property is ignored.
* <<<yarn.scheduler.fair.update-interval-ms>>>
* The interval at which to lock the scheduler and recalculate fair shares,
recalculate demand, and check whether anything is due for preemption.
Defaults to 500 ms.
Allocation file format
The allocation file must be in XML format. The format contains five types of

View File

@ -157,15 +157,16 @@ Usage: yarn [--config confdir] COMMAND
Start the ResourceManager
-------
Usage: yarn resourcemanager [-format]
Usage: yarn resourcemanager [-format-state-store]
-------
*---------------+--------------+
|| COMMAND_OPTIONS || Description |
*---------------+--------------+
| -format | Formats the RMStateStore. This will clear the RMStateStore and is
| | useful if past applications are no longer needed. This should be run
| | only when the ResourceManager is not running.
| -format-state-store | Formats the RMStateStore. This will clear the
| | RMStateStore and is useful if past applications are no
| | longer needed. This should be run only when the
| | ResourceManager is not running.
*---------------+--------------+
** nodemanager