Merge branch 'trunk' into HDFS-7240

This commit is contained in:
Anu Engineer 2016-04-18 16:58:45 -07:00
commit c884170c93
78 changed files with 1772 additions and 359 deletions

View File

@ -327,7 +327,11 @@ public class KerberosAuthenticator implements Authenticator {
}
});
} catch (PrivilegedActionException ex) {
throw new AuthenticationException(ex.getException());
if (ex.getException() instanceof IOException) {
throw (IOException) ex.getException();
} else {
throw new AuthenticationException(ex.getException());
}
} catch (LoginException ex) {
throw new AuthenticationException(ex);
}

View File

@ -61,6 +61,7 @@ import javax.security.sasl.Sasl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -107,7 +108,7 @@ import com.google.protobuf.CodedOutputStream;
*
* @see Server
*/
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
@Public
@InterfaceStability.Evolving
public class Client implements AutoCloseable {

View File

@ -74,6 +74,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
@ -133,7 +134,7 @@ import com.google.protobuf.Message.Builder;
*
* @see Client
*/
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
@Public
@InterfaceStability.Evolving
public abstract class Server {
private final boolean authorize;
@ -439,7 +440,7 @@ public abstract class Server {
/**
* Checks if LogSlowRPC is set true.
* @return
* @return true, if LogSlowRPC is set true, false, otherwise.
*/
protected boolean isLogSlowRPC() {
return logSlowRPC;

View File

@ -765,6 +765,13 @@
</description>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
</description>
</property>
<property>
<name>fs.s3a.proxy.host</name>
<description>Hostname of the (optional) proxy server for S3 connections.</description>

View File

@ -60,14 +60,14 @@ public class TestLoadBalancingKMSClientProvider {
providers[2].getKMSUrl()));
kp = new KMSClientProvider.Factory().createProvider(new URI(
"kms://http@host1;host2;host3:16000/kms/foo"), conf);
"kms://http@host1;host2;host3:9600/kms/foo"), conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
providers =
((LoadBalancingKMSClientProvider) kp).getProviders();
assertEquals(3, providers.length);
assertEquals(Sets.newHashSet("http://host1:16000/kms/foo/v1/",
"http://host2:16000/kms/foo/v1/",
"http://host3:16000/kms/foo/v1/"),
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
"http://host2:9600/kms/foo/v1/",
"http://host3:9600/kms/foo/v1/"),
Sets.newHashSet(providers[0].getKMSUrl(),
providers[1].getKMSUrl(),
providers[2].getKMSUrl()));

View File

@ -26,12 +26,17 @@ import java.util.Deque;
import java.util.LinkedList;
import org.apache.hadoop.fs.shell.PathData;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
public class TestAnd {
@Rule
public Timeout globalTimeout = new Timeout(10000);
// test all expressions passing
@Test(timeout = 1000)
@Test
public void testPass() throws IOException {
And and = new And();
@ -56,7 +61,7 @@ public class TestAnd {
}
// test the first expression failing
@Test(timeout = 1000)
@Test
public void testFailFirst() throws IOException {
And and = new And();
@ -80,7 +85,7 @@ public class TestAnd {
}
// test the second expression failing
@Test(timeout = 1000)
@Test
public void testFailSecond() throws IOException {
And and = new And();
@ -105,7 +110,7 @@ public class TestAnd {
}
// test both expressions failing
@Test(timeout = 1000)
@Test
public void testFailBoth() throws IOException {
And and = new And();
@ -129,7 +134,7 @@ public class TestAnd {
}
// test the first expression stopping
@Test(timeout = 1000)
@Test
public void testStopFirst() throws IOException {
And and = new And();
@ -154,7 +159,7 @@ public class TestAnd {
}
// test the second expression stopping
@Test(timeout = 1000)
@Test
public void testStopSecond() throws IOException {
And and = new And();
@ -179,7 +184,7 @@ public class TestAnd {
}
// test first expression stopping and second failing
@Test(timeout = 1000)
@Test
public void testStopFail() throws IOException {
And and = new And();
@ -204,7 +209,7 @@ public class TestAnd {
}
// test setOptions is called on child
@Test(timeout = 1000)
@Test
public void testSetOptions() throws IOException {
And and = new And();
Expression first = mock(Expression.class);
@ -224,7 +229,7 @@ public class TestAnd {
}
// test prepare is called on child
@Test(timeout = 1000)
@Test
public void testPrepare() throws IOException {
And and = new And();
Expression first = mock(Expression.class);
@ -243,7 +248,7 @@ public class TestAnd {
}
// test finish is called on child
@Test(timeout = 1000)
@Test
public void testFinish() throws IOException {
And and = new And();
Expression first = mock(Expression.class);

View File

@ -26,12 +26,17 @@ import java.util.Deque;
import org.apache.hadoop.fs.shell.PathData;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
public class TestFilterExpression {
private Expression expr;
private FilterExpression test;
@Rule
public Timeout globalTimeout = new Timeout(10000);
@Before
public void setup() {
expr = mock(Expression.class);
@ -40,13 +45,13 @@ public class TestFilterExpression {
}
// test that the child expression is correctly set
@Test(timeout = 1000)
@Test
public void expression() throws IOException {
assertEquals(expr, test.expression);
}
// test that setOptions method is called
@Test(timeout = 1000)
@Test
public void setOptions() throws IOException {
FindOptions options = mock(FindOptions.class);
test.setOptions(options);
@ -55,7 +60,7 @@ public class TestFilterExpression {
}
// test the apply method is called and the result returned
@Test(timeout = 1000)
@Test
public void apply() throws IOException {
PathData item = mock(PathData.class);
when(expr.apply(item, -1)).thenReturn(Result.PASS).thenReturn(Result.FAIL);
@ -66,7 +71,7 @@ public class TestFilterExpression {
}
// test that the finish method is called
@Test(timeout = 1000)
@Test
public void finish() throws IOException {
test.finish();
verify(expr).finish();
@ -74,7 +79,7 @@ public class TestFilterExpression {
}
// test that the getUsage method is called
@Test(timeout = 1000)
@Test
public void getUsage() {
String[] usage = new String[] { "Usage 1", "Usage 2", "Usage 3" };
when(expr.getUsage()).thenReturn(usage);
@ -84,7 +89,7 @@ public class TestFilterExpression {
}
// test that the getHelp method is called
@Test(timeout = 1000)
@Test
public void getHelp() {
String[] help = new String[] { "Help 1", "Help 2", "Help 3" };
when(expr.getHelp()).thenReturn(help);
@ -94,7 +99,7 @@ public class TestFilterExpression {
}
// test that the isAction method is called
@Test(timeout = 1000)
@Test
public void isAction() {
when(expr.isAction()).thenReturn(true).thenReturn(false);
assertTrue(test.isAction());
@ -104,7 +109,7 @@ public class TestFilterExpression {
}
// test that the isOperator method is called
@Test(timeout = 1000)
@Test
public void isOperator() {
when(expr.isAction()).thenReturn(true).thenReturn(false);
assertTrue(test.isAction());
@ -114,7 +119,7 @@ public class TestFilterExpression {
}
// test that the getPrecedence method is called
@Test(timeout = 1000)
@Test
public void getPrecedence() {
int precedence = 12345;
when(expr.getPrecedence()).thenReturn(precedence);
@ -124,7 +129,7 @@ public class TestFilterExpression {
}
// test that the addChildren method is called
@Test(timeout = 1000)
@Test
public void addChildren() {
@SuppressWarnings("unchecked")
Deque<Expression> expressions = mock(Deque.class);
@ -134,7 +139,7 @@ public class TestFilterExpression {
}
// test that the addArguments method is called
@Test(timeout = 1000)
@Test
public void addArguments() {
@SuppressWarnings("unchecked")
Deque<String> args = mock(Deque.class);

View File

@ -39,11 +39,12 @@ import org.apache.hadoop.fs.shell.find.FindOptions;
import org.apache.hadoop.fs.shell.find.Result;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.Test;
import org.mockito.InOrder;
public class TestFind {
@Rule
public Timeout timeout = new Timeout(10000);

View File

@ -25,12 +25,17 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.shell.PathData;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
public class TestIname {
private FileSystem mockFs;
private Name.Iname name;
@Rule
public Timeout globalTimeout = new Timeout(10000);
@Before
public void resetMock() throws IOException {
mockFs = MockFileSystem.setup();
@ -44,7 +49,7 @@ public class TestIname {
}
// test a matching name (same case)
@Test(timeout = 1000)
@Test
public void applyMatch() throws IOException {
setup("name");
PathData item = new PathData("/directory/path/name", mockFs.getConf());
@ -52,7 +57,7 @@ public class TestIname {
}
// test a non-matching name
@Test(timeout = 1000)
@Test
public void applyNotMatch() throws IOException {
setup("name");
PathData item = new PathData("/directory/path/notname", mockFs.getConf());
@ -60,7 +65,7 @@ public class TestIname {
}
// test a matching name (different case)
@Test(timeout = 1000)
@Test
public void applyMixedCase() throws IOException {
setup("name");
PathData item = new PathData("/directory/path/NaMe", mockFs.getConf());
@ -68,7 +73,7 @@ public class TestIname {
}
// test a matching glob pattern (same case)
@Test(timeout = 1000)
@Test
public void applyGlob() throws IOException {
setup("n*e");
PathData item = new PathData("/directory/path/name", mockFs.getConf());
@ -76,7 +81,7 @@ public class TestIname {
}
// test a matching glob pattern (different case)
@Test(timeout = 1000)
@Test
public void applyGlobMixedCase() throws IOException {
setup("n*e");
PathData item = new PathData("/directory/path/NaMe", mockFs.getConf());
@ -84,7 +89,7 @@ public class TestIname {
}
// test a non-matching glob pattern
@Test(timeout = 1000)
@Test
public void applyGlobNotMatch() throws IOException {
setup("n*e");
PathData item = new PathData("/directory/path/notmatch", mockFs.getConf());

View File

@ -25,12 +25,17 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.shell.PathData;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
public class TestName {
private FileSystem mockFs;
private Name name;
@Rule
public Timeout globalTimeout = new Timeout(10000);
@Before
public void resetMock() throws IOException {
mockFs = MockFileSystem.setup();
@ -44,7 +49,7 @@ public class TestName {
}
// test a matching name
@Test(timeout = 1000)
@Test
public void applyMatch() throws IOException {
setup("name");
PathData item = new PathData("/directory/path/name", mockFs.getConf());
@ -52,7 +57,7 @@ public class TestName {
}
// test a non-matching name
@Test(timeout = 1000)
@Test
public void applyNotMatch() throws IOException {
setup("name");
PathData item = new PathData("/directory/path/notname", mockFs.getConf());
@ -60,7 +65,7 @@ public class TestName {
}
// test a different case name
@Test(timeout = 1000)
@Test
public void applyMixedCase() throws IOException {
setup("name");
PathData item = new PathData("/directory/path/NaMe", mockFs.getConf());
@ -68,7 +73,7 @@ public class TestName {
}
// test a matching glob pattern
@Test(timeout = 1000)
@Test
public void applyGlob() throws IOException {
setup("n*e");
PathData item = new PathData("/directory/path/name", mockFs.getConf());
@ -76,7 +81,7 @@ public class TestName {
}
// test a glob pattern with different case
@Test(timeout = 1000)
@Test
public void applyGlobMixedCase() throws IOException {
setup("n*e");
PathData item = new PathData("/directory/path/NaMe", mockFs.getConf());
@ -84,7 +89,7 @@ public class TestName {
}
// test a non-matching glob pattern
@Test(timeout = 1000)
@Test
public void applyGlobNotMatch() throws IOException {
setup("n*e");
PathData item = new PathData("/directory/path/notmatch", mockFs.getConf());

View File

@ -23,23 +23,28 @@ import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.fs.shell.PathData;
import org.junit.Test;
import java.io.PrintStream;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
public class TestPrint {
private FileSystem mockFs;
@Rule
public Timeout globalTimeout = new Timeout(10000);
@Before
public void resetMock() throws IOException {
mockFs = MockFileSystem.setup();
}
// test the full path is printed to stdout
@Test(timeout = 1000)
@Test
public void testPrint() throws IOException {
Print print = new Print();
PrintStream out = mock(PrintStream.class);

View File

@ -23,23 +23,28 @@ import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.fs.shell.PathData;
import org.junit.Test;
import java.io.PrintStream;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
public class TestPrint0 {
private FileSystem mockFs;
@Rule
public Timeout globalTimeout = new Timeout(10000);
@Before
public void resetMock() throws IOException {
mockFs = MockFileSystem.setup();
}
// test the full path is printed to stdout with a '\0'
@Test(timeout = 1000)
@Test
public void testPrint() throws IOException {
Print.Print0 print = new Print.Print0();
PrintStream out = mock(PrintStream.class);

View File

@ -19,12 +19,17 @@ package org.apache.hadoop.fs.shell.find;
import static org.junit.Assert.*;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
public class TestResult {
@Rule
public Timeout globalTimeout = new Timeout(10000);
// test the PASS value
@Test(timeout = 1000)
@Test
public void testPass() {
Result result = Result.PASS;
assertTrue(result.isPass());
@ -32,7 +37,7 @@ public class TestResult {
}
// test the FAIL value
@Test(timeout = 1000)
@Test
public void testFail() {
Result result = Result.FAIL;
assertFalse(result.isPass());
@ -40,7 +45,7 @@ public class TestResult {
}
// test the STOP value
@Test(timeout = 1000)
@Test
public void testStop() {
Result result = Result.STOP;
assertTrue(result.isPass());
@ -48,7 +53,7 @@ public class TestResult {
}
// test combine method with two PASSes
@Test(timeout = 1000)
@Test
public void combinePassPass() {
Result result = Result.PASS.combine(Result.PASS);
assertTrue(result.isPass());
@ -56,7 +61,7 @@ public class TestResult {
}
// test the combine method with a PASS and a FAIL
@Test(timeout = 1000)
@Test
public void combinePassFail() {
Result result = Result.PASS.combine(Result.FAIL);
assertFalse(result.isPass());
@ -64,7 +69,7 @@ public class TestResult {
}
// test the combine method with a FAIL and a PASS
@Test(timeout = 1000)
@Test
public void combineFailPass() {
Result result = Result.FAIL.combine(Result.PASS);
assertFalse(result.isPass());
@ -72,7 +77,7 @@ public class TestResult {
}
// test the combine method with two FAILs
@Test(timeout = 1000)
@Test
public void combineFailFail() {
Result result = Result.FAIL.combine(Result.FAIL);
assertFalse(result.isPass());
@ -80,7 +85,7 @@ public class TestResult {
}
// test the combine method with a PASS and STOP
@Test(timeout = 1000)
@Test
public void combinePassStop() {
Result result = Result.PASS.combine(Result.STOP);
assertTrue(result.isPass());
@ -88,7 +93,7 @@ public class TestResult {
}
// test the combine method with a STOP and FAIL
@Test(timeout = 1000)
@Test
public void combineStopFail() {
Result result = Result.STOP.combine(Result.FAIL);
assertFalse(result.isPass());
@ -96,7 +101,7 @@ public class TestResult {
}
// test the combine method with a STOP and a PASS
@Test(timeout = 1000)
@Test
public void combineStopPass() {
Result result = Result.STOP.combine(Result.PASS);
assertTrue(result.isPass());
@ -104,7 +109,7 @@ public class TestResult {
}
// test the combine method with a FAIL and a STOP
@Test(timeout = 1000)
@Test
public void combineFailStop() {
Result result = Result.FAIL.combine(Result.STOP);
assertFalse(result.isPass());
@ -112,7 +117,7 @@ public class TestResult {
}
// test the negation of PASS
@Test(timeout = 1000)
@Test
public void negatePass() {
Result result = Result.PASS.negate();
assertFalse(result.isPass());
@ -120,7 +125,7 @@ public class TestResult {
}
// test the negation of FAIL
@Test(timeout = 1000)
@Test
public void negateFail() {
Result result = Result.FAIL.negate();
assertTrue(result.isPass());
@ -128,7 +133,7 @@ public class TestResult {
}
// test the negation of STOP
@Test(timeout = 1000)
@Test
public void negateStop() {
Result result = Result.STOP.negate();
assertFalse(result.isPass());
@ -136,7 +141,7 @@ public class TestResult {
}
// test equals with two PASSes
@Test(timeout = 1000)
@Test
public void equalsPass() {
Result one = Result.PASS;
Result two = Result.PASS.combine(Result.PASS);
@ -144,7 +149,7 @@ public class TestResult {
}
// test equals with two FAILs
@Test(timeout = 1000)
@Test
public void equalsFail() {
Result one = Result.FAIL;
Result two = Result.FAIL.combine(Result.FAIL);
@ -152,7 +157,7 @@ public class TestResult {
}
// test equals with two STOPS
@Test(timeout = 1000)
@Test
public void equalsStop() {
Result one = Result.STOP;
Result two = Result.STOP.combine(Result.STOP);
@ -160,7 +165,7 @@ public class TestResult {
}
// test all combinations of not equals
@Test(timeout = 1000)
@Test
public void notEquals() {
assertFalse(Result.PASS.equals(Result.FAIL));
assertFalse(Result.PASS.equals(Result.STOP));

View File

@ -24,7 +24,7 @@
# The HTTP port used by KMS
#
# export KMS_HTTP_PORT=16000
# export KMS_HTTP_PORT=9600
# The Admin port used by KMS
#

View File

@ -37,7 +37,7 @@ function hadoop_subproject_init
export HADOOP_CATALINA_CONFIG="${HADOOP_CONF_DIR}"
export HADOOP_CATALINA_LOG="${HADOOP_LOG_DIR}"
export HADOOP_CATALINA_HTTP_PORT="${KMS_HTTP_PORT:-16000}"
export HADOOP_CATALINA_HTTP_PORT="${KMS_HTTP_PORT:-9600}"
export HADOOP_CATALINA_ADMIN_PORT="${KMS_ADMIN_PORT:-$((HADOOP_CATALINA_HTTP_PORT+1))}"
export HADOOP_CATALINA_MAX_THREADS="${KMS_MAX_THREADS:-1000}"
export HADOOP_CATALINA_MAX_HTTP_HEADER_SIZE="${KMS_MAX_HTTP_HEADER_SIZE:-65536}"

View File

@ -32,7 +32,7 @@ KMS is a Java web-application and it runs using a pre-configured Tomcat bundled
KMS Client Configuration
------------------------
The KMS client `KeyProvider` uses the **kms** scheme, and the embedded URL must be the URL of the KMS. For example, for a KMS running on `http://localhost:16000/kms`, the KeyProvider URI is `kms://http@localhost:16000/kms`. And, for a KMS running on `https://localhost:16000/kms`, the KeyProvider URI is `kms://https@localhost:16000/kms`
The KMS client `KeyProvider` uses the **kms** scheme, and the embedded URL must be the URL of the KMS. For example, for a KMS running on `http://localhost:9600/kms`, the KeyProvider URI is `kms://http@localhost:9600/kms`. And, for a KMS running on `https://localhost:9600/kms`, the KeyProvider URI is `kms://https@localhost:9600/kms`
KMS
---
@ -178,7 +178,7 @@ $H3 Embedded Tomcat Configuration
To configure the embedded Tomcat go to the `share/hadoop/kms/tomcat/conf`.
KMS pre-configures the HTTP and Admin ports in Tomcat's `server.xml` to 16000 and 16001.
KMS pre-configures the HTTP and Admin ports in Tomcat's `server.xml` to 9600 and 9601.
Tomcat logs are also preconfigured to go to Hadoop's `logs/` directory.

View File

@ -811,7 +811,7 @@ public class DFSOutputStream extends FSOutputSummer
try {
if (retries == 0) {
throw new IOException("Unable to close file because the last block"
+ " does not have enough number of replicas.");
+ last + " does not have enough number of replicas.");
}
retries--;
Thread.sleep(sleeptime);

View File

@ -305,6 +305,14 @@ public class BlockManager implements BlockStatsMXBean {
* processed again after aquiring lock again.
*/
private int numBlocksPerIteration;
/**
* Minimum size that a block can be sent to Balancer through getBlocks.
* And after HDFS-8824, the small blocks are unused anyway, so there's no
* point to send them to balancer.
*/
private long getBlocksMinBlockSize = -1;
/**
* Progress of the Reconstruction queues initialisation.
*/
@ -414,6 +422,9 @@ public class BlockManager implements BlockStatsMXBean {
this.numBlocksPerIteration = conf.getInt(
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
this.getBlocksMinBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@ -1179,6 +1190,9 @@ public class BlockManager implements BlockStatsMXBean {
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
if (curBlock.getNumBytes() < getBlocksMinBlockSize) {
continue;
}
totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
@ -1186,6 +1200,9 @@ public class BlockManager implements BlockStatsMXBean {
for(int i=0; i<startBlock&&totalSize<size; i++) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
if (curBlock.getNumBytes() < getBlocksMinBlockSize) {
continue;
}
totalSize += addBlock(curBlock, results);
}
}

View File

@ -79,7 +79,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap) {
this.considerLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_DEFAULT);
this.considerLoadFactor = conf.getDouble(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_FACTOR,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_FACTOR_DEFAULT);

View File

@ -914,13 +914,13 @@ public class DirectoryScanner implements Runnable {
*/
private void verifyFileLocation(File actualBlockFile,
File bpFinalizedDir, long blockId) {
File blockDir = DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
if (actualBlockFile.getParentFile().compareTo(blockDir) != 0) {
File expBlockFile = new File(blockDir, actualBlockFile.getName());
LOG.warn("Block: " + blockId
+ " has to be upgraded to block ID-based layout. "
+ "Actual block file path: " + actualBlockFile
+ ", expected block file path: " + expBlockFile);
File expectedBlockDir =
DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
File actualBlockDir = actualBlockFile.getParentFile();
if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
LOG.warn("Block: " + blockId +
" found in invalid directory. Expected directory: " +
expectedBlockDir + ". Actual directory: " + actualBlockDir);
}
}

View File

@ -414,7 +414,7 @@ public class VolumeScanner extends Thread {
Block b = volume.getDataset().getStoredBlock(
cblock.getBlockPoolId(), cblock.getBlockId());
if (b == null) {
LOG.info("FileNotFound while finding block {} on volume {}",
LOG.info("Replica {} was not found in the VolumeMap for volume {}",
cblock, volume.getBasePath());
} else {
block = new ExtendedBlock(cblock.getBlockPoolId(), b);

View File

@ -697,6 +697,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
} else {
ExtendedBlock block =
new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
File expectedBlockDir = DatanodeUtil.idToBlockDir(
new File("."), block.getBlockId());
File actualBlockDir = Paths.get(".",
state.curFinalizedDir, state.curFinalizedSubDir).toFile();
if (!expectedBlockDir.equals(actualBlockDir)) {
LOG.error("nextBlock({}, {}): block id {} found in invalid " +
"directory. Expected directory: {}. " +
"Actual directory: {}", storageID, bpid,
block.getBlockId(), expectedBlockDir.getPath(),
actualBlockDir.getPath());
continue;
}
LOG.trace("nextBlock({}, {}): advancing to {}",
storageID, bpid, block);
return block;

View File

@ -1096,7 +1096,8 @@ public abstract class FSEditLogOp {
@Override void fromXml(Stanza st) throws InvalidXmlException {
this.path = st.getValue("PATH");
List<Stanza> blocks = st.getChildren("BLOCK");
List<Stanza> blocks = st.hasChildren("BLOCK") ?
st.getChildren("BLOCK") : new ArrayList<Stanza>();
this.blocks = new Block[blocks.size()];
for (int i = 0; i < blocks.size(); i++) {
this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));

View File

@ -427,6 +427,7 @@ public class DFSAdmin extends FsShell {
"\t[-allowSnapshot <snapshotDir>]\n" +
"\t[-disallowSnapshot <snapshotDir>]\n" +
"\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
"\t[-evictWriters <datanode_host:ipc_port>]\n" +
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
"\t[-metasave filename]\n" +
"\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
@ -1829,6 +1830,9 @@ public class DFSAdmin extends FsShell {
} else if ("-shutdownDatanode".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-shutdownDatanode <datanode_host:ipc_port> [upgrade]]");
} else if ("-evictWriters".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-evictWriters <datanode_host:ipc_port>]");
} else if ("-getDatanodeInfo".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-getDatanodeInfo <datanode_host:ipc_port>]");

View File

@ -385,6 +385,7 @@ Usage:
hdfs dfsadmin [-allowSnapshot <snapshotDir>]
hdfs dfsadmin [-disallowSnapshot <snapshotDir>]
hdfs dfsadmin [-shutdownDatanode <datanode_host:ipc_port> [upgrade]]
hdfs dfsadmin [-evictWriters <datanode_host:ipc_port>]
hdfs dfsadmin [-getDatanodeInfo <datanode_host:ipc_port>]
hdfs dfsadmin [-metasave filename]
hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
@ -419,6 +420,7 @@ Usage:
| `-allowSnapshot` \<snapshotDir\> | Allowing snapshots of a directory to be created. If the operation completes successfully, the directory becomes snapshottable. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. |
| `-disallowSnapshot` \<snapshotDir\> | Disallowing snapshots of a directory to be created. All snapshots of the directory must be deleted before disallowing snapshots. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. |
| `-shutdownDatanode` \<datanode\_host:ipc\_port\> [upgrade] | Submit a shutdown request for the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-shutdownDatanode) for the detail. |
| `-evictWriters` \<datanode\_host:ipc\_port\> | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. |
| `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
| `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted |
| `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |

View File

@ -1272,6 +1272,18 @@ public class DFSTestUtil {
// OP_APPEND 47
FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null);
s2.close();
// OP_UPDATE_BLOCKS 25
final String updateBlockFile = "/update_blocks";
FSDataOutputStream fout = filesystem.create(new Path(updateBlockFile), true, 4096, (short)1, 4096L);
fout.write(1);
fout.hflush();
long fileId = ((DFSOutputStream)fout.getWrappedStream()).getFileId();
DFSClient dfsclient = DFSClientAdapter.getDFSClient(filesystem);
LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(updateBlockFile, 0, Integer.MAX_VALUE);
dfsclient.getNamenode().abandonBlock(blocks.get(0).getBlock(), fileId, updateBlockFile, dfsclient.clientName);
fout.close();
// OP_SET_STORAGE_POLICY 45
filesystem.setStoragePolicy(pathFileCreate,
HdfsConstants.HOT_STORAGE_POLICY_NAME);

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.junit.Assert;
@ -85,16 +86,6 @@ public class StripedFileTestUtil {
return (byte) (pos % mod + 1);
}
static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
int readLen = 0;
int ret;
while ((ret = in.read(buf, readLen, buf.length - readLen)) >= 0 &&
readLen <= buf.length) {
readLen += ret;
}
return readLen;
}
static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
throws IOException {
FileStatus status = fs.getFileStatus(srcPath);
@ -214,11 +205,11 @@ public class StripedFileTestUtil {
static void assertSeekAndRead(FSDataInputStream fsdis, int pos,
int writeBytes) throws IOException {
fsdis.seek(pos);
byte[] buf = new byte[writeBytes];
int readLen = StripedFileTestUtil.readAll(fsdis, buf);
assertEquals(readLen, writeBytes - pos);
for (int i = 0; i < readLen; i++) {
assertEquals("Byte at " + i + " should be the same", StripedFileTestUtil.getByte(pos + i), buf[i]);
byte[] buf = new byte[writeBytes - pos];
IOUtils.readFully(fsdis, buf, 0, buf.length);
for (int i = 0; i < buf.length; i++) {
assertEquals("Byte at " + i + " should be the same",
StripedFileTestUtil.getByte(pos + i), buf[i]);
}
}

View File

@ -179,11 +179,15 @@ public class TestGetBlocks {
final int DEFAULT_BLOCK_SIZE = 1024;
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
CONF.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
REPLICATION_FACTOR).build();
try {
cluster.waitActive();
long fileLen = 2 * DEFAULT_BLOCK_SIZE;
// the third block will not be visible to getBlocks
long fileLen = 2 * DEFAULT_BLOCK_SIZE + 1;
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/tmp.txt"),
fileLen, REPLICATION_FACTOR, 0L);
@ -196,7 +200,7 @@ public class TestGetBlocks {
DFSUtilClient.getNNAddress(CONF), CONF);
locatedBlocks = dfsclient.getNamenode()
.getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
assertEquals(2, locatedBlocks.size());
assertEquals(3, locatedBlocks.size());
notWritten = false;
for (int i = 0; i < 2; i++) {
dataNodes = locatedBlocks.get(i).getLocations();

View File

@ -135,6 +135,13 @@ public interface FsDatasetTestUtils {
* @throws IOException I/O error.
*/
void truncateMeta(long newSize) throws IOException;
/**
* Make the replica unreachable, perhaps by renaming it to an
* invalid file name.
* @throws IOException On I/O error.
*/
void makeUnreachable() throws IOException;
}
/**

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.Closeable;
import java.io.File;
@ -38,6 +39,7 @@ import java.util.concurrent.Semaphore;
import com.google.common.base.Supplier;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
import org.apache.hadoop.conf.Configuration;
@ -139,6 +141,11 @@ public class TestBlockScanner {
throws Exception {
return DFSTestUtil.getFirstBlock(dfs[nsIdx], getPath(fileIdx));
}
public MaterializedReplica getMaterializedReplica(int nsIdx, int fileIdx)
throws Exception {
return cluster.getMaterializedReplica(0, getFileBlock(nsIdx, fileIdx));
}
}
/**
@ -806,4 +813,60 @@ public class TestBlockScanner {
info.blocksScanned = 0;
}
}
/**
* Test that blocks which are in the wrong location are ignored.
*/
@Test(timeout=120000)
public void testIgnoreMisplacedBlock() throws Exception {
Configuration conf = new Configuration();
// Set a really long scan period.
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
final TestContext ctx = new TestContext(conf, 1);
final int NUM_FILES = 4;
ctx.createFiles(0, NUM_FILES, 5);
MaterializedReplica unreachableReplica = ctx.getMaterializedReplica(0, 1);
ExtendedBlock unreachableBlock = ctx.getFileBlock(0, 1);
unreachableReplica.makeUnreachable();
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.volumes.get(0).getStorageID();
synchronized (info) {
info.sem = new Semaphore(NUM_FILES);
info.shouldRun = true;
info.notify();
}
// Scan the first 4 blocks
LOG.info("Waiting for the blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= NUM_FILES - 1) {
LOG.info("info = {}. blockScanned has now reached " +
info.blocksScanned, info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach " +
(NUM_FILES - 1), info);
return false;
}
}
}
}, 50, 30000);
// We should have scanned 4 blocks
synchronized (info) {
assertFalse(info.goodBlocks.contains(unreachableBlock));
assertFalse(info.badBlocks.contains(unreachableBlock));
assertEquals("Expected 3 good blocks.", 3, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Expected 3 blocksScanned", 3, info.blocksScanned);
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
info.blocksScanned = 0;
}
info.sem.release(1);
}
}

View File

@ -258,10 +258,9 @@ public class TestDataNodeMetrics {
* and reading causes totalReadTime to move.
* @throws Exception
*/
@Test(timeout=60000)
@Test(timeout=120000)
public void testDataNodeTimeSpend() throws Exception {
Configuration conf = new HdfsConfiguration();
SimulatedFSDataset.setFactory(conf);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
final FileSystem fs = cluster.getFileSystem();
@ -284,6 +283,7 @@ public class TestDataNodeMetrics {
DFSTestUtil.createFile(fs, new Path("/time.txt." + x.get()),
LONG_FILE_LEN, (short) 1, Time.monotonicNow());
DFSTestUtil.readFile(fs, new Path("/time.txt." + x.get()));
fs.delete(new Path("/time.txt." + x.get()), true);
} catch (IOException ioe) {
LOG.error("Caught IOException while ingesting DN metrics", ioe);
return false;
@ -294,7 +294,7 @@ public class TestDataNodeMetrics {
return endWriteValue > startWriteValue
&& endReadValue > startReadValue;
}
}, 30, 30000);
}, 30, 60000);
} finally {
if (cluster != null) {
cluster.shutdown();

View File

@ -170,6 +170,27 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
truncate(metaFile, newSize);
}
@Override
public void makeUnreachable() throws IOException {
long blockId = Block.getBlockId(blockFile.getAbsolutePath());
File origDir = blockFile.getParentFile();
File root = origDir.getParentFile().getParentFile();
File newDir = null;
// Keep incrementing the block ID until the block and metadata
// files end up in a different directory. Actually, with the
// current replica file placement scheme, this should only ever
// require one increment, but this is a bit of defensive coding.
do {
blockId++;
newDir = DatanodeUtil.idToBlockDir(root, blockId);
} while (origDir.equals(newDir));
Files.createDirectories(newDir.toPath());
Files.move(blockFile.toPath(),
new File(newDir, blockFile.getName()).toPath());
Files.move(metaFile.toPath(),
new File(newDir, metaFile.getName()).toPath());
}
@Override
public String toString() {
return String.format("MaterializedReplica: file=%s", blockFile);

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -62,7 +63,7 @@ public class TestFSImageWithSnapshot {
}
static final long seed = 0;
static final short REPLICATION = 3;
static final short NUM_DATANODES = 3;
static final int BLOCKSIZE = 1024;
static final long txid = 1;
@ -78,7 +79,7 @@ public class TestFSImageWithSnapshot {
@Before
public void setUp() throws Exception {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.build();
cluster.waitActive();
fsn = cluster.getNamesystem();
@ -177,7 +178,7 @@ public class TestFSImageWithSnapshot {
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
@ -188,7 +189,7 @@ public class TestFSImageWithSnapshot {
hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
@ -215,7 +216,7 @@ public class TestFSImageWithSnapshot {
hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
@ -248,20 +249,20 @@ public class TestFSImageWithSnapshot {
hdfs.createSnapshot(dir, "s" + ++s);
Path sub1file1 = new Path(sub1, "sub1file1");
Path sub1file2 = new Path(sub1, "sub1file2");
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, (short) 1, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, (short) 1, seed);
checkImage(s);
hdfs.createSnapshot(dir, "s" + ++s);
Path sub2 = new Path(dir, "sub2");
Path sub2file1 = new Path(sub2, "sub2file1");
Path sub2file2 = new Path(sub2, "sub2file2");
DFSTestUtil.createFile(hdfs, sub2file1, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub2file2, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub2file1, BLOCKSIZE, (short) 1, seed);
DFSTestUtil.createFile(hdfs, sub2file2, BLOCKSIZE, (short) 1, seed);
checkImage(s);
hdfs.createSnapshot(dir, "s" + ++s);
hdfs.setReplication(sub1file1, (short) (REPLICATION - 1));
hdfs.setReplication(sub1file1, (short) 1);
hdfs.delete(sub1file2, true);
hdfs.setOwner(sub2, "dr.who", "unknown");
hdfs.delete(sub2file1, true);
@ -300,7 +301,7 @@ public class TestFSImageWithSnapshot {
// restart the cluster, and format the cluster
cluster = new MiniDFSCluster.Builder(conf).format(true)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
@ -338,8 +339,8 @@ public class TestFSImageWithSnapshot {
Path sub1 = new Path(dir, "sub1");
Path sub1file1 = new Path(sub1, "sub1file1");
Path sub1file2 = new Path(sub1, "sub1file2");
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, (short) 1, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, (short) 1, seed);
// 1. create snapshot s0
hdfs.allowSnapshot(dir);
@ -372,7 +373,7 @@ public class TestFSImageWithSnapshot {
out.close();
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(true)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
@ -394,8 +395,8 @@ public class TestFSImageWithSnapshot {
Path sub1 = new Path(dir, "sub1");
Path sub1file1 = new Path(sub1, "sub1file1");
Path sub1file2 = new Path(sub1, "sub1file2");
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, (short) 1, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, (short) 1, seed);
hdfs.allowSnapshot(dir);
hdfs.createSnapshot(dir, "s0");
@ -410,7 +411,7 @@ public class TestFSImageWithSnapshot {
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
@ -440,7 +441,7 @@ public class TestFSImageWithSnapshot {
// restart cluster
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
hdfs = cluster.getFileSystem();
@ -478,7 +479,7 @@ public class TestFSImageWithSnapshot {
Path newDir = new Path(subsubDir, "newdir");
Path newFile = new Path(newDir, "newfile");
hdfs.mkdirs(newDir);
DFSTestUtil.createFile(hdfs, newFile, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, newFile, BLOCKSIZE, (short) 1, seed);
// create another snapshot
SnapshotTestHelper.createSnapshot(hdfs, dir, "s2");
@ -491,7 +492,7 @@ public class TestFSImageWithSnapshot {
// restart cluster
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.format(false).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
@ -504,7 +505,7 @@ public class TestFSImageWithSnapshot {
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -37,19 +39,22 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.apache.hadoop.test.GenericTestUtils;
import com.google.common.base.Supplier;
import org.junit.Test;
public class TestPendingCorruptDnMessages {
private static final Path filePath = new Path("/foo.txt");
@Test
@Test (timeout = 60000)
public void testChangedStorageId() throws IOException, URISyntaxException,
InterruptedException {
InterruptedException, TimeoutException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.build();
@ -83,27 +88,27 @@ public class TestPendingCorruptDnMessages {
// Wait until the standby NN queues up the corrupt block in the pending DN
// message queue.
while (cluster.getNamesystem(1).getBlockManager()
.getPendingDataNodeMessageCount() < 1) {
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return cluster.getNamesystem(1).getBlockManager()
.getPendingDataNodeMessageCount() == 1;
}
}, 1000, 30000);
assertEquals(1, cluster.getNamesystem(1).getBlockManager()
.getPendingDataNodeMessageCount());
String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
final String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
assertNotNull(oldStorageId);
// Reformat/restart the DN.
assertTrue(wipeAndRestartDn(cluster, 0));
// Give the DN time to start up and register, which will cause the
// DatanodeManager to dissociate the old storage ID from the DN xfer addr.
String newStorageId = "";
do {
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
newStorageId = getRegisteredDatanodeUid(cluster, 1);
System.out.println("====> oldStorageId: " + oldStorageId +
" newStorageId: " + newStorageId);
} while (newStorageId.equals(oldStorageId));
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final String newStorageId = getRegisteredDatanodeUid(cluster, 1);
return newStorageId != null && !newStorageId.equals(oldStorageId);
}
}, 1000, 30000);
assertEquals(0, cluster.getNamesystem(1).getBlockManager()
.getPendingDataNodeMessageCount());
@ -121,8 +126,8 @@ public class TestPendingCorruptDnMessages {
List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex)
.getBlockManager().getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.ALL);
assertEquals(1, registeredDatanodes.size());
return registeredDatanodes.get(0).getDatanodeUuid();
return registeredDatanodes.isEmpty() ? null :
registeredDatanodes.get(0).getDatanodeUuid();
}
private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex)

View File

@ -234,7 +234,7 @@ public class TestDFSAdmin {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(4, outs.size());
assertEquals(5, outs.size());
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
assertEquals(errs.size(), 0);

View File

@ -77,6 +77,8 @@ import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
@ -1032,12 +1034,9 @@ public class JobHistoryEventHandler extends AbstractService
+ error.getErrorCode());
}
}
} catch (IOException ex) {
} catch (YarnException | IOException | ClientHandlerException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex);
} catch (YarnException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex);
+ "Server", ex);
}
}

View File

@ -24,14 +24,27 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class TaskAttemptKillEvent extends TaskAttemptEvent {
private final String message;
// Next map attempt will be rescheduled(i.e. updated in ask with higher
// priority equivalent to that of a fast fail map)
private final boolean rescheduleAttempt;
public TaskAttemptKillEvent(TaskAttemptId attemptID,
String message, boolean rescheduleAttempt) {
super(attemptID, TaskAttemptEventType.TA_KILL);
this.message = message;
this.rescheduleAttempt = rescheduleAttempt;
}
public TaskAttemptKillEvent(TaskAttemptId attemptID,
String message) {
super(attemptID, TaskAttemptEventType.TA_KILL);
this.message = message;
this(attemptID, message, false);
}
public String getMessage() {
return message;
}
public boolean getRescheduleAttempt() {
return rescheduleAttempt;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
/**
* Task Attempt killed event.
*/
public class TaskTAttemptKilledEvent extends TaskTAttemptEvent {
// Next map attempt will be rescheduled(i.e. updated in ask with
// higher priority equivalent to that of a fast fail map)
private final boolean rescheduleAttempt;
public TaskTAttemptKilledEvent(TaskAttemptId id, boolean rescheduleAttempt) {
super(id, TaskEventType.T_ATTEMPT_KILLED);
this.rescheduleAttempt = rescheduleAttempt;
}
public boolean getRescheduleAttempt() {
return rescheduleAttempt;
}
}

View File

@ -1349,7 +1349,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (TaskType.MAP == id.getTaskId().getTaskType()) {
// reschedule only map tasks because their outputs maybe unusable
LOG.info(mesg + ". AttemptId:" + id);
eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
// Kill the attempt and indicate that next map attempt should be
// rescheduled (i.e. considered as a fast fail map).
eventHandler.handle(new TaskAttemptKillEvent(id, mesg, true));
}
}
}

View File

@ -98,6 +98,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
@ -184,6 +185,7 @@ public abstract class TaskAttemptImpl implements
private int httpPort;
private Locality locality;
private Avataar avataar;
private boolean rescheduleNextAttempt = false;
private static final CleanupContainerTransition
CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
@ -1377,6 +1379,16 @@ public abstract class TaskAttemptImpl implements
return container != null;
}
//always called in write lock
private boolean getRescheduleNextAttempt() {
return rescheduleNextAttempt;
}
//always called in write lock
private void setRescheduleNextAttempt(boolean reschedule) {
rescheduleNextAttempt = reschedule;
}
//always called in write lock
private void setFinishTime() {
//set the finish time only if launch time is set
@ -1745,9 +1757,8 @@ public abstract class TaskAttemptImpl implements
TaskEventType.T_ATTEMPT_FAILED));
break;
case KILLED:
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, false));
break;
default:
LOG.error("Task final state is not FAILED or KILLED: " + finalState);
@ -2014,8 +2025,13 @@ public abstract class TaskAttemptImpl implements
taskAttempt, TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
.getTaskId().getJobId(), tauce));
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
boolean rescheduleNextTaskAttempt = false;
if (event instanceof TaskAttemptKillEvent) {
rescheduleNextTaskAttempt =
((TaskAttemptKillEvent)event).getRescheduleAttempt();
}
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, rescheduleNextTaskAttempt));
return TaskAttemptStateInternal.KILLED;
}
}
@ -2044,6 +2060,12 @@ public abstract class TaskAttemptImpl implements
taskAttempt.getID().toString());
return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
} else {
// Store reschedule flag so that after clean up is completed, new
// attempt is scheduled/rescheduled based on it.
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.setRescheduleNextAttempt(
((TaskAttemptKillEvent)event).getRescheduleAttempt());
}
return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP;
}
}
@ -2075,9 +2097,8 @@ public abstract class TaskAttemptImpl implements
((TaskAttemptKillEvent) event).getMessage());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, taskAttempt.getRescheduleNextAttempt()));
}
}
@ -2095,9 +2116,8 @@ public abstract class TaskAttemptImpl implements
taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, false));
}
}
@ -2137,6 +2157,12 @@ public abstract class TaskAttemptImpl implements
// for it.
finalizeProgress(taskAttempt);
sendContainerCleanup(taskAttempt, event);
// Store reschedule flag so that after clean up is completed, new
// attempt is scheduled/rescheduled based on it.
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.setRescheduleNextAttempt(
((TaskAttemptKillEvent)event).getRescheduleAttempt());
}
}
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@ -594,10 +595,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar) {
addAndScheduleAttempt(avataar, false);
}
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) {
TaskAttempt attempt = addAttempt(avataar);
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
if (failedAttempts.size() > 0) {
if (failedAttempts.size() > 0 || reschedule) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
@ -968,7 +974,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
if (task.successfulAttempt == null) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
boolean rescheduleNewAttempt = false;
if (event instanceof TaskTAttemptKilledEvent) {
rescheduleNewAttempt =
((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
}
task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNewAttempt);
}
if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) {
task.commitAttempt = null;
@ -1187,7 +1198,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// from the map splitInfo. So the bad node might be sent as a location
// to the RM. But the RM would ignore that just like it would ignore
// currently pending container requests affinitized to bad nodes.
task.addAndScheduleAttempt(Avataar.VIRGIN);
boolean rescheduleNextTaskAttempt = false;
if (event instanceof TaskTAttemptKilledEvent) {
// Decide whether to reschedule next task attempt. If true, this
// typically indicates that a successful map attempt was killed on an
// unusable node being reported.
rescheduleNextTaskAttempt =
((TaskTAttemptKilledEvent)event).getRescheduleAttempt();
}
task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNextTaskAttempt);
return TaskStateInternal.SCHEDULED;
}
}

View File

@ -925,9 +925,11 @@ public class RMContainerAllocator extends RMContainerRequestor
LOG.info("Killing taskAttempt:" + tid
+ " because it is running on unusable node:"
+ taskAttemptNodeId);
// If map, reschedule next task attempt.
boolean rescheduleNextAttempt = (i == 0) ? true : false;
eventHandler.handle(new TaskAttemptKillEvent(tid,
"TaskAttempt killed because it ran on unusable node"
+ taskAttemptNodeId));
+ taskAttemptNodeId, rescheduleNextAttempt));
}
}
}

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Supplier;
import org.apache.hadoop.test.GenericTestUtils;
@ -56,13 +57,19 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Tests the state machine of MR App.
@ -201,13 +208,18 @@ public class TestMRApp {
@Test
public void testUpdatedNodes() throws Exception {
int runCount = 0;
Dispatcher disp = Mockito.spy(new AsyncDispatcher());
MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(),
true, ++runCount);
true, ++runCount, disp);
Configuration conf = new Configuration();
// after half of the map completion, reduce will start
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
// uberization forces full slowstart (1.0), so disable that
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
ContainerAllocEventHandler handler = new ContainerAllocEventHandler();
disp.register(ContainerAllocator.EventType.class, handler);
final Job job1 = app.submit(conf);
app.waitForState(job1, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size());
@ -285,6 +297,12 @@ public class TestMRApp {
events = job1.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Expecting 2 more completion events for killed", 4,
events.length);
// 2 map task attempts which were killed above should be requested from
// container allocator with the previous map task marked as failed. If
// this happens allocator will request the container for this mapper from
// RM at a higher priority of 5(i.e. with a priority equivalent to that of
// a fail fast map).
handler.waitForFailedMapContainerReqEvents(2);
// all maps must be back to running
app.waitForState(mapTask1, TaskState.RUNNING);
@ -324,7 +342,7 @@ public class TestMRApp {
// rerun
// in rerun the 1st map will be recovered from previous run
app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), false,
++runCount);
++runCount, (Dispatcher)new AsyncDispatcher());
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@ -420,6 +438,25 @@ public class TestMRApp {
app.waitForState(job2, JobState.SUCCEEDED);
}
private final class ContainerAllocEventHandler
implements EventHandler<ContainerAllocatorEvent> {
private AtomicInteger failedMapContainerReqEventCnt = new AtomicInteger(0);
@Override
public void handle(ContainerAllocatorEvent event) {
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ &&
((ContainerRequestEvent)event).getEarlierAttemptFailed()) {
failedMapContainerReqEventCnt.incrementAndGet();
}
}
public void waitForFailedMapContainerReqEvents(int count)
throws InterruptedException {
while(failedMapContainerReqEventCnt.get() != count) {
Thread.sleep(50);
}
failedMapContainerReqEventCnt.set(0);
}
}
private static void waitFor(Supplier<Boolean> predicate, int
checkIntervalMillis, int checkTotalMillis) throws InterruptedException {
try {
@ -590,9 +627,17 @@ public class TestMRApp {
}
private final class MRAppWithHistory extends MRApp {
private Dispatcher dispatcher;
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
String testName, boolean cleanOnStart, int startCount,
Dispatcher disp) {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
this.dispatcher = disp;
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
@Override

View File

@ -78,9 +78,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
@ -983,6 +987,45 @@ public class TestTaskAttempt{
finishTime, Long.valueOf(taImpl.getFinishTime()));
}
private void containerKillBeforeAssignment(boolean scheduleAttempt)
throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
ApplicationId appId = ApplicationId.newInstance(1, 2);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, mock(Path.class), 1,
mock(TaskSplitMetaInfo.class), new JobConf(),
mock(TaskAttemptListener.class), mock(Token.class),
new Credentials(), SystemClock.getInstance(),
mock(AppContext.class));
if (scheduleAttempt) {
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_SCHEDULE));
}
taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
TaskAttemptState.KILLED);
assertEquals("Task attempt's internal state is not KILLED",
taImpl.getInternalState(), TaskAttemptStateInternal.KILLED);
assertFalse("InternalError occurred", eventHandler.internalError);
TaskEvent event = eventHandler.lastTaskEvent;
assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
// In NEW state, new map attempt should not be rescheduled.
assertFalse(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
}
@Test
public void testContainerKillOnNew() throws Exception {
containerKillBeforeAssignment(false);
}
@Test
public void testContainerKillOnUnassigned() throws Exception {
containerKillBeforeAssignment(true);
}
@Test
public void testContainerKillAfterAssigned() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
@ -1032,7 +1075,7 @@ public class TestTaskAttempt{
taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_KILL));
assertEquals("Task should be in KILLED state",
assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
taImpl.getInternalState());
}
@ -1089,7 +1132,7 @@ public class TestTaskAttempt{
TaskAttemptEventType.TA_KILL));
assertFalse("InternalError occurred trying to handle TA_KILL",
eventHandler.internalError);
assertEquals("Task should be in KILLED state",
assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
taImpl.getInternalState());
}
@ -1150,12 +1193,11 @@ public class TestTaskAttempt{
TaskAttemptEventType.TA_KILL));
assertFalse("InternalError occurred trying to handle TA_KILL",
eventHandler.internalError);
assertEquals("Task should be in KILLED state",
assertEquals("Task should be in KILL_CONTAINER_CLEANUP state",
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
taImpl.getInternalState());
}
@Test
public void testKillMapTaskWhileSuccessFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
@ -1195,6 +1237,37 @@ public class TestTaskAttempt{
assertFalse("InternalError occurred", eventHandler.internalError);
}
@Test
public void testKillMapTaskAfterSuccess() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
assertEquals("Task attempt's internal state is not " +
"SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
// Send a map task attempt kill event indicating next map attempt has to be
// reschedule
taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
TaskAttemptState.KILLED);
assertEquals("Task attempt's internal state is not KILLED",
taImpl.getInternalState(), TaskAttemptStateInternal.KILLED);
assertFalse("InternalError occurred", eventHandler.internalError);
TaskEvent event = eventHandler.lastTaskEvent;
assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType());
// Send an attempt killed event to TaskImpl forwarding the same reschedule
// flag we received in task attempt kill event.
assertTrue(((TaskTAttemptKilledEvent)event).getRescheduleAttempt());
}
@Test
public void testKillMapTaskWhileFailFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
@ -1406,9 +1479,13 @@ public class TestTaskAttempt{
public static class MockEventHandler implements EventHandler {
public boolean internalError;
public TaskEvent lastTaskEvent;
@Override
public void handle(Event event) {
if (event instanceof TaskEvent) {
lastTaskEvent = (TaskEvent)event;
}
if (event instanceof JobEvent) {
JobEvent je = ((JobEvent) event);
if (JobEventType.INTERNAL_ERROR == je.getType()) {

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@ -93,6 +94,7 @@ public class TestTaskImpl {
private final int partition = 1;
private InlineDispatcher dispatcher;
private MockTaskAttemptEventHandler taskAttemptEventHandler;
private List<MockTaskAttemptImpl> taskAttempts;
private class MockTaskImpl extends TaskImpl {
@ -258,6 +260,9 @@ public class TestTaskImpl {
when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations);
taskAttempts = new ArrayList<MockTaskAttemptImpl>();
taskAttemptEventHandler = new MockTaskAttemptEventHandler();
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
}
private MockTaskImpl createMockTask(TaskType taskType) {
@ -294,8 +299,12 @@ public class TestTaskImpl {
}
private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_KILLED));
killScheduledTaskAttempt(attemptId, false);
}
private void killScheduledTaskAttempt(TaskAttemptId attemptId,
boolean reschedule) {
mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule));
assertTaskScheduledState();
}
@ -326,8 +335,12 @@ public class TestTaskImpl {
}
private void killRunningTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_KILLED));
killRunningTaskAttempt(attemptId, false);
}
private void killRunningTaskAttempt(TaskAttemptId attemptId,
boolean reschedule) {
mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule));
assertTaskRunningState();
}
@ -426,7 +439,9 @@ public class TestTaskImpl {
mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
killScheduledTaskAttempt(getLastAttempt().getAttemptId());
killScheduledTaskAttempt(getLastAttempt().getAttemptId(), true);
assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
taskAttemptEventHandler.lastTaskAttemptEvent.getType());
}
@Test
@ -453,7 +468,9 @@ public class TestTaskImpl {
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
killRunningTaskAttempt(getLastAttempt().getAttemptId());
killRunningTaskAttempt(getLastAttempt().getAttemptId(), true);
assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
taskAttemptEventHandler.lastTaskAttemptEvent.getType());
}
@Test
@ -471,6 +488,28 @@ public class TestTaskImpl {
assertTaskSucceededState();
}
@Test
/**
* Kill map attempt for succeeded map task
* {@link TaskState#SUCCEEDED}->{@link TaskState#SCHEDULED}
*/
public void testKillAttemptForSuccessfulTask() {
LOG.info("--- START: testKillAttemptForSuccessfulTask ---");
mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
commitTaskAttempt(getLastAttempt().getAttemptId());
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
assertTaskSucceededState();
mockTask.handle(
new TaskTAttemptKilledEvent(getLastAttempt().getAttemptId(), true));
assertEquals(TaskAttemptEventType.TA_RESCHEDULE,
taskAttemptEventHandler.lastTaskAttemptEvent.getType());
assertTaskScheduledState();
}
@Test
public void testTaskProgress() {
LOG.info("--- START: testTaskProgress ---");
@ -728,8 +767,8 @@ public class TestTaskImpl {
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt = taskAttempts.get(3);
taskAttempt.setState(TaskAttemptState.KILLED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_KILLED));
mockTask.handle(new TaskTAttemptKilledEvent(taskAttempt.getAttemptId(),
false));
assertEquals(TaskState.FAILED, mockTask.getState());
}
@ -840,4 +879,14 @@ public class TestTaskImpl {
Counters taskCounters = mockTask.getCounters();
assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
}
public static class MockTaskAttemptEventHandler implements EventHandler {
public TaskAttemptEvent lastTaskAttemptEvent;
@Override
public void handle(Event event) {
if (event instanceof TaskAttemptEvent) {
lastTaskAttemptEvent = (TaskAttemptEvent)event;
}
}
};
}

View File

@ -142,6 +142,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
report.setFinishTime(jobInfo.getFinishTime());
report.setJobName(jobInfo.getJobname());
report.setUser(jobInfo.getUsername());
report.setDiagnostics(jobInfo.getErrorInfo());
if ( getTotalMaps() == 0 ) {
report.setMapProgress(1.0f);
@ -335,6 +336,12 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
}
}
protected JobHistoryParser createJobHistoryParser(Path historyFileAbsolute)
throws IOException {
return new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute);
}
//History data is leisurely loaded when task level data is requested
protected synchronized void loadFullHistoryData(boolean loadTasks,
Path historyFileAbsolute) throws IOException {
@ -347,7 +354,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
JobHistoryParser parser = null;
try {
final FileSystem fs = historyFileAbsolute.getFileSystem(conf);
parser = new JobHistoryParser(fs, historyFileAbsolute);
parser = createJobHistoryParser(historyFileAbsolute);
final Path jobConfPath = new Path(historyFileAbsolute.getParent(),
JobHistoryUtils.getIntermediateConfFileName(jobId));
final Configuration conf = new Configuration();

View File

@ -19,14 +19,18 @@ package org.apache.hadoop.mapreduce.v2.hs;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -236,4 +240,27 @@ public class TestJobHistoryEntities {
}
@Test (timeout=30000)
public void testCompletedJobWithDiagnostics() throws Exception {
final String jobError = "Job Diagnostics";
JobInfo jobInfo = spy(new JobInfo());
when(jobInfo.getErrorInfo()).thenReturn(jobError);
when(jobInfo.getJobStatus()).thenReturn(JobState.FAILED.toString());
when(jobInfo.getAMInfos()).thenReturn(Collections.<JobHistoryParser.AMInfo>emptyList());
final JobHistoryParser mockParser = mock(JobHistoryParser.class);
when(mockParser.parse()).thenReturn(jobInfo);
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
when(info.getHistoryFile()).thenReturn(fullHistoryPath);
CompletedJob job =
new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user",
info, jobAclsManager) {
@Override
protected JobHistoryParser createJobHistoryParser(
Path historyFileAbsolute) throws IOException {
return mockParser;
}
};
assertEquals(jobError, job.getReport().getDiagnostics());
}
}

View File

@ -133,6 +133,7 @@
<item name="Using CGroups" href="hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html"/>
<item name="Secure Containers" href="hadoop-yarn/hadoop-yarn-site/SecureContainer.html"/>
<item name="Registry" href="hadoop-yarn/hadoop-yarn-site/registry/index.html"/>
<item name="Reservation System" href="hadoop-yarn/hadoop-yarn-site/ReservationSystem.html"/>
</menu>
<menu name="YARN REST APIs" inherit="top">

View File

@ -35,6 +35,10 @@ public class Constants {
//use a custom endpoint?
public static final String ENDPOINT = "fs.s3a.endpoint";
//Enable path style access? Overrides default virtual hosting
public static final String PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
//connect to s3 through a proxy server?
public static final String PROXY_HOST = "fs.s3a.proxy.host";
public static final String PROXY_PORT = "fs.s3a.proxy.port";

View File

@ -38,6 +38,7 @@ import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@ -244,6 +245,15 @@ public class S3AFileSystem extends FileSystem {
throw new IllegalArgumentException(msg, e);
}
}
enablePathStyleAccessIfRequired(conf);
}
private void enablePathStyleAccessIfRequired(Configuration conf) {
final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
if (pathStyleAccess) {
LOG.debug("Enabling path style access!");
s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
}
}
private void initTransferManager() {

View File

@ -224,6 +224,13 @@ this capability.
</description>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
</description>
</property>
<property>
<name>fs.s3a.proxy.host</name>
<description>Hostname of the (optional) proxy server for S3 connections.</description>

View File

@ -19,10 +19,14 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.commons.lang.StringUtils;
import com.amazonaws.AmazonClientException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -30,17 +34,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.net.URI;
import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.http.HttpStatus;
import org.junit.rules.TemporaryFolder;
public class TestS3AConfiguration {
@ -354,4 +360,39 @@ public class TestS3AConfiguration {
assertEquals("SecretKey incorrect.", "456", creds.getAccessSecret());
}
@Test
public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty() throws Exception {
conf = new Configuration();
conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true));
assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false));
try {
fs = S3ATestUtils.createTestFileSystem(conf);
final Object object = getClientOptionsField(fs.getAmazonS3Client(), "clientOptions");
assertNotNull(object);
assertTrue("Unexpected type found for clientOptions!", object instanceof S3ClientOptions);
assertTrue("Expected to find path style access to be switched on!", ((S3ClientOptions) object).isPathStyleAccess());
byte[] file = ContractTestUtils.toAsciiByteArray("test file");
ContractTestUtils.writeAndRead(fs, new Path("/path/style/access/testFile"), file, file.length, conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
} catch (final AmazonS3Exception e) {
LOG.error("Caught exception: ", e);
// Catch/pass standard path style access behaviour when live bucket
// isn't in the same region as the s3 client default. See
// http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
assertEquals(e.getStatusCode(), HttpStatus.SC_MOVED_PERMANENTLY);
}
}
private Object getClientOptionsField(AmazonS3Client s3client, String field)
throws NoSuchFieldException, IllegalAccessException {
final Field clientOptionsProps = s3client.getClass().getDeclaredField(field);
assertNotNull(clientOptionsProps);
if (!clientOptionsProps.isAccessible()) {
clientOptionsProps.setAccessible(true);
}
final Object object = clientOptionsProps.get(s3client);
return object;
}
}

View File

@ -191,7 +191,7 @@ public class SimpleCopyListing extends CopyListing {
authority = fs.getUri().getAuthority();
}
return new Path(scheme, authority, path.toUri().getPath());
return new Path(scheme, authority, makeQualified(path).toUri().getPath());
}
/**

View File

@ -674,4 +674,42 @@ public class TestDistCpSync {
testAndVerify(numCreatedModified);
}
private void initData9(Path dir) throws Exception {
final Path foo = new Path(dir, "foo");
final Path foo_f1 = new Path(foo, "f1");
DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
}
private void changeData9(Path dir) throws Exception {
final Path foo = new Path(dir, "foo");
final Path foo_f2 = new Path(foo, "f2");
DFSTestUtil.createFile(dfs, foo_f2, BLOCK_SIZE, DATA_NUM, 0L);
}
/**
* Test a case where the source path is relative.
*/
@Test
public void testSync9() throws Exception {
// use /user/$USER/source for source directory
Path sourcePath = new Path(dfs.getWorkingDirectory(), "source");
initData9(sourcePath);
initData9(target);
dfs.allowSnapshot(sourcePath);
dfs.allowSnapshot(target);
dfs.createSnapshot(sourcePath, "s1");
dfs.createSnapshot(target, "s1");
changeData9(sourcePath);
dfs.createSnapshot(sourcePath, "s2");
String[] args = new String[]{"-update","-diff", "s1", "s2",
"source", target.toString()};
new DistCp(conf, OptionsParser.parse(args)).execute();
verifyCopy(dfs.getFileStatus(sourcePath),
dfs.getFileStatus(target), false);
}
}

View File

@ -131,6 +131,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
/**
* An ApplicationMaster for executing shell commands on a set of launched
@ -1149,13 +1150,14 @@ public class ApplicationMaster {
putContainerEntity(timelineClient,
container.getId().getApplicationAttemptId(),
entity));
} catch (YarnException | IOException e) {
} catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("Container start event could not be published for "
+ container.getId().toString(), e);
}
}
private void publishContainerEndEvent(
@VisibleForTesting
void publishContainerEndEvent(
final TimelineClient timelineClient, ContainerStatus container,
String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity();
@ -1177,7 +1179,7 @@ public class ApplicationMaster {
putContainerEntity(timelineClient,
container.getContainerId().getApplicationAttemptId(),
entity));
} catch (YarnException | IOException e) {
} catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(), e);
}
@ -1212,7 +1214,7 @@ public class ApplicationMaster {
try {
TimelinePutResponse response = timelineClient.putEntities(entity);
processTimelineResponseErrors(response);
} catch (YarnException | IOException e) {
} catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -27,6 +31,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
@ -46,14 +51,24 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -61,6 +76,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
@ -69,6 +85,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import com.sun.jersey.api.client.ClientHandlerException;
public class TestDistributedShell {
private static final Log LOG =
@ -77,6 +95,7 @@ public class TestDistributedShell {
protected MiniYARNCluster yarnCluster = null;
protected MiniDFSCluster hdfsCluster = null;
private FileSystem fs = null;
private TimelineWriter spyTimelineWriter;
protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1;
private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
@ -865,6 +884,37 @@ public class TestDistributedShell {
}
}
@Test
public void testDSTimelineClientWithConnectionRefuse() throws Exception {
ApplicationMaster am = new ApplicationMaster();
TimelineClientImpl client = new TimelineClientImpl() {
@Override
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
URI resURI) throws IOException {
TimelineWriter timelineWriter =
new DirectTimelineWriter(authUgi, client, resURI);
spyTimelineWriter = spy(timelineWriter);
return spyTimelineWriter;
}
};
client.init(conf);
client.start();
TestTimelineClient.mockEntityClientResponse(spyTimelineWriter, null,
false, true);
try {
UserGroupInformation ugi = mock(UserGroupInformation.class);
when(ugi.getShortUserName()).thenReturn("user1");
// verify no ClientHandlerException get thrown out.
am.publishContainerEndEvent(client, ContainerStatus.newInstance(
BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
1), "domainId", ugi);
} finally {
client.stop();
}
}
protected void waitForNMsToRegister() throws Exception {
int sec = 60;
while (sec >= 0) {

View File

@ -24,6 +24,7 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
@ -116,7 +117,9 @@ public class TimelineClientImpl extends TimelineClient {
TimelineClientConnectionRetry connectionRetry;
// Abstract class for an operation that should be retried by timeline client
private static abstract class TimelineClientRetryOp {
@Private
@VisibleForTesting
public static abstract class TimelineClientRetryOp {
// The operation that should be retried
public abstract Object run() throws IOException;
// The method to indicate if we should retry given the incoming exception
@ -449,27 +452,8 @@ public class TimelineClientImpl extends TimelineClient {
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {
// Set up the retry operation
TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() {
@Override
public Object run() throws IOException {
// Try pass the request, if fail, keep retrying
authUgi.checkTGTAndReloginFromKeytab();
try {
return authUgi.doAs(action);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean shouldRetryOn(Exception e) {
// Only retry on connection exceptions
return (e instanceof ConnectException);
}
};
TimelineClientRetryOp tokenRetryOp =
createTimelineClientRetryOpForOperateDelegationToken(action);
return connectionRetry.retryOn(tokenRetryOp);
}
@ -680,4 +664,50 @@ public class TimelineClientImpl extends TimelineClient {
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
@Private
@VisibleForTesting
public TimelineClientRetryOp
createTimelineClientRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action) throws IOException {
return new TimelineClientRetryOpForOperateDelegationToken(
this.authUgi, action);
}
@Private
@VisibleForTesting
public class TimelineClientRetryOpForOperateDelegationToken
extends TimelineClientRetryOp {
private final UserGroupInformation authUgi;
private final PrivilegedExceptionAction<?> action;
public TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
this.authUgi = authUgi;
this.action = action;
}
@Override
public Object run() throws IOException {
// Try pass the request, if fail, keep retrying
authUgi.checkTGTAndReloginFromKeytab();
try {
return authUgi.doAs(action);
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public boolean shouldRetryOn(Exception e) {
// retry on connection exceptions
// and SocketTimeoutException
return (e instanceof ConnectException
|| e instanceof SocketTimeoutException);
}
}
}

View File

@ -27,7 +27,9 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -234,6 +236,8 @@ public class TestTimelineClient {
UserGroupInformation.setConfiguration(conf);
TimelineClientImpl client = createTimelineClient(conf);
TimelineClientImpl clientFake =
createTimelineClientFakeTimelineClientRetryOp(conf);
TestTimlineDelegationTokenSecretManager dtManager =
new TestTimlineDelegationTokenSecretManager();
try {
@ -278,8 +282,24 @@ public class TestTimelineClient {
} catch (RuntimeException ce) {
assertException(client, ce);
}
// Test DelegationTokenOperationsRetry on SocketTimeoutException
try {
TimelineDelegationTokenIdentifier timelineDT =
new TimelineDelegationTokenIdentifier(
new Text("tester"), new Text("tester"), new Text("tester"));
clientFake.cancelDelegationToken(
new Token<TimelineDelegationTokenIdentifier>(timelineDT.getBytes(),
dtManager.createPassword(timelineDT),
timelineDT.getKind(),
new Text("0.0.0.0:8188")));
assertFail();
} catch (RuntimeException ce) {
assertException(clientFake, ce);
}
} finally {
client.stop();
clientFake.stop();
dtManager.stopThreads();
}
}
@ -298,7 +318,7 @@ public class TestTimelineClient {
client.connectionRetry.getRetired());
}
private static ClientResponse mockEntityClientResponse(
public static ClientResponse mockEntityClientResponse(
TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasError, boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class);
@ -393,6 +413,27 @@ public class TestTimelineClient {
return client;
}
private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
YarnConfiguration conf) {
TimelineClientImpl client = new TimelineClientImpl() {
@Override
public TimelineClientRetryOp
createTimelineClientRetryOpForOperateDelegationToken(
final PrivilegedExceptionAction<?> action) throws IOException {
TimelineClientRetryOpForOperateDelegationToken op =
spy(new TimelineClientRetryOpForOperateDelegationToken(
UserGroupInformation.getCurrentUser(), action));
doThrow(new SocketTimeoutException("Test socketTimeoutException"))
.when(op).run();
return op;
}
};
client.init(conf);
client.start();
return client;
}
private static class TestTimlineDelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {

View File

@ -19,9 +19,10 @@
package org.apache.hadoop.yarn.webapp;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.net.ServerSocketUtil;
import org.junit.Before;
import com.sun.jersey.test.framework.JerseyTest;
import com.sun.jersey.test.framework.WebAppDescriptor;
@ -30,9 +31,16 @@ public abstract class JerseyTestBase extends JerseyTest {
super(appDescriptor);
}
@Before
public void initializeJerseyPort() throws IOException {
int jerseyPort = ServerSocketUtil.getPort(9998, 10);
System.setProperty("jersey.test.port", Integer.toString(jerseyPort));
@Override
protected int getPort(int port) {
Random rand = new Random();
int jerseyPort = port + rand.nextInt(1000);
try {
jerseyPort = ServerSocketUtil.getPort(jerseyPort, 10);
} catch (IOException e) {
// Ignore exception even after 10 times free port is
// not received.
}
return super.getPort(jerseyPort);
}
}

View File

@ -296,20 +296,8 @@ public class ContainerManagerImpl extends CompositeService implements
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering container with state: " + rcs);
}
recoverContainer(rcs);
}
String diagnostic = "Application marked finished during recovery";
for (ApplicationId appId : appsState.getFinishedApplications()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Application marked finished during recovery: " + appId);
}
dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appId, diagnostic));
}
} else {
LOG.info("Not a recoverable state store. Nothing to recover.");
}
@ -1332,11 +1320,6 @@ public class ContainerManagerImpl extends CompositeService implements
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
try {
this.context.getNMStateStore().storeFinishedApplication(appID);
} catch (IOException e) {
LOG.error("Unable to update application state in store", e);
}
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
diagnostic));

View File

@ -84,6 +84,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String APPLICATIONS_KEY_PREFIX =
"ContainerManager/applications/";
@Deprecated
private static final String FINISHED_APPS_KEY_PREFIX =
"ContainerManager/finishedApps/";
@ -392,20 +393,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
state.applications.add(
ContainerManagerApplicationProto.parseFrom(entry.getValue()));
}
state.finishedApplications = new ArrayList<ApplicationId>();
keyPrefix = FINISHED_APPS_KEY_PREFIX;
iter.seek(bytes(keyPrefix));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
ApplicationId appId =
ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
state.finishedApplications.add(appId);
}
} catch (DBException e) {
throw new IOException(e);
} finally {
@ -414,6 +401,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
cleanupDeprecatedFinishedApps();
return state;
}
@ -433,21 +422,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
@Override
public void storeFinishedApplication(ApplicationId appId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("storeFinishedApplication.appId: " + appId);
}
String key = FINISHED_APPS_KEY_PREFIX + appId;
try {
db.put(bytes(key), new byte[0]);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeApplication(ApplicationId appId)
throws IOException {
@ -460,8 +434,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
String key = APPLICATIONS_KEY_PREFIX + appId;
batch.delete(bytes(key));
key = FINISHED_APPS_KEY_PREFIX + appId;
batch.delete(bytes(key));
db.write(batch);
} finally {
batch.close();
@ -979,6 +951,52 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
@SuppressWarnings("deprecation")
private void cleanupDeprecatedFinishedApps() {
try {
cleanupKeysWithPrefix(FINISHED_APPS_KEY_PREFIX);
} catch (Exception e) {
LOG.warn("cleanup keys with prefix " + FINISHED_APPS_KEY_PREFIX +
" from leveldb failed", e);
}
}
private void cleanupKeysWithPrefix(String prefix) throws IOException {
WriteBatch batch = null;
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
try {
batch = db.createWriteBatch();
iter.seek(bytes(prefix));
while (iter.hasNext()) {
byte[] key = iter.next().getKey();
String keyStr = asString(key);
if (!keyStr.startsWith(prefix)) {
break;
}
batch.delete(key);
if (LOG.isDebugEnabled()) {
LOG.debug("cleanup " + keyStr + " from leveldb");
}
}
db.write(batch);
} catch (DBException e) {
throw new IOException(e);
} finally {
if (batch != null) {
batch.close();
}
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
}
private String getLogDeleterKey(ApplicationId appId) {
return LOG_DELETER_KEY_PREFIX + appId;
}

View File

@ -58,10 +58,6 @@ public class NMNullStateStoreService extends NMStateStoreService {
ContainerManagerApplicationProto p) throws IOException {
}
@Override
public void storeFinishedApplication(ApplicationId appId) {
}
@Override
public void removeApplication(ApplicationId appId) throws IOException {
}

View File

@ -52,15 +52,11 @@ public abstract class NMStateStoreService extends AbstractService {
public static class RecoveredApplicationsState {
List<ContainerManagerApplicationProto> applications;
List<ApplicationId> finishedApplications;
public List<ContainerManagerApplicationProto> getApplications() {
return applications;
}
public List<ApplicationId> getFinishedApplications() {
return finishedApplications;
}
}
public enum RecoveredContainerStatus {
@ -258,14 +254,6 @@ public abstract class NMStateStoreService extends AbstractService {
public abstract void storeApplication(ApplicationId appId,
ContainerManagerApplicationProto p) throws IOException;
/**
* Record that an application has finished
* @param appId the application ID
* @throws IOException
*/
public abstract void storeFinishedApplication(ApplicationId appId)
throws IOException;
/**
* Remove records corresponding to an application
* @param appId the application ID

View File

@ -259,6 +259,10 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId);
assertNotNull(app);
// no longer saving FINISH_APP event in NM stateStore,
// simulate by resending FINISH_APP event
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
assertTrue(context.getApplicationACLsManager().checkAccess(
UserGroupInformation.createRemoteUser(modUser),

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
private Set<ApplicationId> finishedApps;
private Map<ContainerId, RecoveredContainerState> containerStates;
private Map<TrackerKey, TrackerState> trackerStates;
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
@ -59,7 +58,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
protected void initStorage(Configuration conf) {
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
finishedApps = new HashSet<ApplicationId>();
containerStates = new HashMap<ContainerId, RecoveredContainerState>();
nmTokenState = new RecoveredNMTokensState();
nmTokenState.applicationMasterKeys =
@ -86,7 +84,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
RecoveredApplicationsState state = new RecoveredApplicationsState();
state.applications = new ArrayList<ContainerManagerApplicationProto>(
apps.values());
state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
return state;
}
@ -98,16 +95,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
apps.put(appId, protoCopy);
}
@Override
public synchronized void storeFinishedApplication(ApplicationId appId) {
finishedApps.add(appId);
}
@Override
public synchronized void removeApplication(ApplicationId appId)
throws IOException {
apps.remove(appId);
finishedApps.remove(appId);
}
@Override
@ -393,7 +384,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
logDeleterState.remove(appId);
}
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();

View File

@ -174,7 +174,6 @@ public class TestNMLeveldbStateStoreService {
// test empty when no state
RecoveredApplicationsState state = stateStore.loadApplicationsState();
assertTrue(state.getApplications().isEmpty());
assertTrue(state.getFinishedApplications().isEmpty());
// store an application and verify recovered
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
@ -188,10 +187,8 @@ public class TestNMLeveldbStateStoreService {
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
assertTrue(state.getFinishedApplications().isEmpty());
// finish an application and add a new one
stateStore.storeFinishedApplication(appId1);
// add a new app
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
builder = ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId2).getProto());
@ -203,18 +200,13 @@ public class TestNMLeveldbStateStoreService {
assertEquals(2, state.getApplications().size());
assertTrue(state.getApplications().contains(appProto1));
assertTrue(state.getApplications().contains(appProto2));
assertEquals(1, state.getFinishedApplications().size());
assertEquals(appId1, state.getFinishedApplications().get(0));
// test removing an application
stateStore.storeFinishedApplication(appId2);
stateStore.removeApplication(appId2);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
assertEquals(1, state.getFinishedApplications().size());
assertEquals(appId1, state.getFinishedApplications().get(0));
}
@Test

View File

@ -163,7 +163,7 @@ public class NodesListManager extends CompositeService implements
private void setDecomissionedNMs() {
Set<String> excludeList = hostsReader.getExcludedHosts();
for (final String host : excludeList) {
UnknownNodeId nodeId = new UnknownNodeId(host);
NodeId nodeId = createUnknownNodeId(host);
RMNodeImpl rmNode = new RMNodeImpl(nodeId,
rmContext, host, -1, -1, new UnknownNode(host), null, null);
rmContext.getInactiveRMNodes().put(nodeId, rmNode);
@ -430,38 +430,8 @@ public class NodesListManager extends CompositeService implements
* A NodeId instance needed upon startup for populating inactive nodes Map.
* It only knows the hostname/ip and marks the port to -1 or invalid.
*/
public static class UnknownNodeId extends NodeId {
private String host;
public UnknownNodeId(String host) {
this.host = host;
}
@Override
public String getHost() {
return this.host;
}
@Override
protected void setHost(String hst) {
}
@Override
public int getPort() {
return -1;
}
@Override
protected void setPort(int port) {
}
@Override
protected void build() {
}
public static NodeId createUnknownNodeId(String host) {
return NodeId.newInstance(host, -1);
}
/**

View File

@ -786,8 +786,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
if (previousRMNode != null) {
rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
} else {
NodesListManager.UnknownNodeId unknownNodeId =
new NodesListManager.UnknownNodeId(nodeId.getHost());
NodeId unknownNodeId =
NodesListManager.createUnknownNodeId(nodeId.getHost());
previousRMNode =
rmNode.context.getInactiveRMNodes().remove(unknownNodeId);
if (previousRMNode != null) {

View File

@ -1348,13 +1348,6 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
if (removed) {
// track reserved resource for metrics, for normal container
// getReservedResource will be null.
Resource reservedRes = rmContainer.getReservedResource();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
decReservedResource(node.getPartition(), reservedRes);
}
// Inform the ordering policy
orderingPolicy.containerReleased(application, rmContainer);

View File

@ -246,6 +246,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Update reserved metrics
queue.getMetrics().unreserveResource(getUser(),
rmContainer.getReservedResource());
queue.decReservedResource(node.getPartition(),
rmContainer.getReservedResource());
return true;
}
return false;

View File

@ -28,6 +28,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -1566,4 +1568,49 @@ public class TestClientRMService {
Assert.assertEquals("Incorrect priority has been returned", expected,
updateApplicationPriority.getApplicationPriority().getPriority());
}
private void createExcludeFile(String filename) throws IOException {
File file = new File(filename);
if (file.exists()) {
file.delete();
}
FileOutputStream out = new FileOutputStream(file);
out.write("decommisssionedHost".getBytes());
out.close();
}
@Test
public void testRMStartWithDecommissionedNode() throws Exception {
String excludeFile = "excludeFile";
createExcludeFile(excludeFile);
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeFile);
MockRM rm = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
rm.start();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client =
(ApplicationClientProtocol) rpc
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call
GetClusterNodesRequest request =
GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size());
rm.stop();
rpc.stopProxy(client, conf);
new File(excludeFile).delete();
}
}

View File

@ -690,8 +690,8 @@ public class TestRMNodeTransitions {
@Test
public void testUnknownNodeId() {
NodesListManager.UnknownNodeId nodeId =
new NodesListManager.UnknownNodeId("host1");
NodeId nodeId =
NodesListManager.createUnknownNodeId("host1");
RMNodeImpl node =
new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
rmContext.getInactiveRMNodes().putIfAbsent(nodeId,node);

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -37,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -50,8 +53,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -418,4 +426,182 @@ public class TestContainerAllocation {
rm1.close();
}
@Test(timeout = 60000)
public void testAllocationForReservedContainer() throws Exception {
/**
* Test case: Submit two application (app1/app2) to a queue. And there's one
* node with 8G resource in the cluster. App1 allocates a 6G container, Then
* app2 asks for a 4G container. App2's request will be reserved on the
* node.
*
* Before next node heartbeat, app1 container is completed/killed. So app1
* container which was reserved will be allocated.
*/
// inject node label manager
MockRM rm1 = new MockRM();
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app2
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// App2 will get preference to be allocated on node1, and node1 will be all
// used by App2.
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// Check if a 4G container allocated for app1, and nothing allocated for app2
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemory());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemory());
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemory());
// Mark one app1 container as killed/completed and re-kick RM
for (RMContainer container : schedulerApp1.getLiveContainers()) {
if (container.isAMContainer()) {
continue;
}
cs.markContainerForKillable(container);
}
// Cancel asks of app1 and re-kick RM
am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check 4G container cancelled for app1, and one container allocated for
// app2
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Usage of queue = 4G + 2 * 1G
Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemory());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemory());
Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemory());
rm1.close();
}
@Test(timeout = 60000)
public void testReservedContainerMetricsOnDecommisionedNode() throws Exception {
/**
* Test case: Submit two application (app1/app2) to a queue. And there's one
* node with 8G resource in the cluster. App1 allocates a 6G container, Then
* app2 asks for a 4G container. App2's request will be reserved on the
* node.
*
* Before next node heartbeat, app1 container is completed/killed. So app1
* container which was reserved will be allocated.
*/
// inject node label manager
MockRM rm1 = new MockRM();
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app2
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// App2 will get preference to be allocated on node1, and node1 will be all
// used by App2.
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// Check if a 4G container allocated for app1, and nothing allocated for app2
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
// NM1 has available resource = 2G (8G - 2 * 1G - 4G)
Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemory());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemory());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemory());
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemory());
// Remove the node
cs.handle(new NodeRemovedSchedulerEvent(rmNode1));
// Check all container cancelled for app1 and app2
Assert.assertEquals(0, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp2.getLiveContainers().size());
Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
// Usage and Reserved capacity of queue is 0
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemory());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemory());
Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemory());
rm1.close();
}
}

View File

@ -0,0 +1,65 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
Reservation System
==================
* [Purpose](#Purpose)
* [Overview](#Overview)
* [Flow of a Reservation](#Flow_of_a_Reservation)
* [Configuring the Reservation System](#Configuring_the_Reservation_System)
Purpose
-------
This document provides a brief overview of the `YARN ReservationSystem`.
Overview
--------
The `ReservationSystem` of YARN provides the user the ability to reserve resources over (and ahead of) time, to ensure that important production jobs will be run very predictably. The ReservationSystem performs careful admission control and provides guarantees over absolute amounts of resources (instead of % of cluster size). Reservation can be both malleable or have gang semantics, and can have time-varying resource requirements. The ReservationSystem is a component of the YARN ResourceManager.
Flow of a Reservation
----------------------
![YARN Reservation System | width=600px](./images/yarn_reservation_system.png)
With reference to the figure above, a typical reservation proceeds as follows:
* **Step 1** The user (or an automated tool on its behalf) submit a reservation request specified by the Reservation Definition Language (RDL). This describes the user need for resources over-time (e.g., a skyline of resources) and temporal constraints (e.g., deadline). This can be done both programmatically through the usual Client-to-RM protocols or via the REST api of the RM.
* **Step 2** The ReservationSystem leverages a ReservationAgent (GREE in the figure) to find a plausible allocation for the reservation in the Plan, a data structure tracking all reservation currently accepted and the available resources in the system.
* **Step 3** The SharingPolicy provides a way to enforce invariants on the reservation being accepted, potentially rejecting reservations. For example, the CapacityOvertimePolicy allows enforcement of both instantaneous max-capacity a user can request across all of his/her reservations and a limit on the integral of resources over a period of time, e.g., the user can reserve up to 50% of the cluster capacity instantanesouly, but in any 24h period of time he/she cannot exceed 10% average.
* **Step 4** Upon a successful validation the ReservationSystem returns to the user a ReservationId (think of it as an airline ticket).
* **Step 5** When the time comes, a new component called the PlanFollower publishes the state of the plan to the scheduler, by dynamically creating/tweaking/destroying queues.
* **Step 6** The user can then submit one (or more) jobs to the reservable queue, by simply including the ReservationId as part of the ApplicationSubmissionContext.
* **Step 7** The Scheduler will then provide containers from a special queue created to ensure resources reservation is respected. Within the limits of the reservation, the user has guaranteed access to the resources, above that resource sharing proceed with standard Capacity/Fairness sharing.
* **Step 8** The system includes mechanisms to adapt to drop in cluster capacity. This consists in replanning by "moving" the reservation if possible, or rejecting the smallest amount of previously accepted reservation (to ensure that other reservation will receive their full amount).
Configuring the Reservation System
----------------------------------
Configuring the `ReservationSystem` is simple. Currently we have added support for *reservations* in both `CapacityScheduler` and `FairScheduler`. You can mark any **leaf queue** in the **capacity-scheduler.xml** or **fair-scheduler.xml** as available for "reservations" (see [CapacityScheduler](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html#Configuring_ReservationSystem_with_CapacityScheduler) and the [FairScheduler](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html) for details). Then the capacity/fair share within that queue can be used for making reservations. Jobs can still be submitted to the *reservable queue* without a reservation, in which case they will be run in best-effort mode in whatever capacity is left over by the jobs running within active reservations.

View File

@ -34,6 +34,9 @@ ResourceManager REST API's.
* [Cluster Application Priority API](#Cluster_Application_Priority_API)
* [Cluster Delegation Tokens API](#Cluster_Delegation_Tokens_API)
* [Cluster Reservation API List](#Cluster_Reservation_API_List)
* [Cluster Reservation API Submit](#Cluster_Reservation_API_Submit)
* [Cluster Reservation API Update](#Cluster_Reservation_API_Update)
* [Cluster Reservation API Delete](#Cluster_Reservation_API_Delete)
Overview
--------
@ -3223,8 +3226,8 @@ The Cluster Reservation API can be used to list reservations. When listing reser
| Item | Data Type | Description |
|:---- |:---- |:---- |
| arrival | long | The UTC time representation of the earliest time this reservation can be allocated from. |
| deadline | long | The UTC time representation of the latest time within which this reservatino can be allocated. |
| reservation-name | string | A mnemonic name of the reservaiton (not a valid identifier). |
| deadline | long | The UTC time representation of the latest time within which this reservation can be allocated. |
| reservation-name | string | A mnemonic name of the reservation (not a valid identifier). |
| reservation-requests | object | A list of "stages" or phases of this reservation, each describing resource requirements and duration |
### Elements of the *reservation-requests* object
@ -3381,3 +3384,443 @@ Response Body:
</reservations>
</reservationListInfo>
```
Cluster Reservation API Submit
------------------------------
The Cluster Reservation API can be used to submit reservations.When submitting a reservation the user specify the constraints in terms of resources, and time that are required, the resulting page returns a reservation-id that the user can use to get access to the resources by specifying it as part of [Cluster Submit Applications API](#Cluster_Applications_APISubmit_Application).
### URI
* http://<rm http address:port>/ws/v1/cluster/reservation/submit
### HTTP Operations Supported
* POST
### POST Response Examples
POST requests can be used to submit reservations to the ResourceManager. As mentioned above, a reservation-id is returned upon success (in the body of the answer). Successful submissions result in a 200 response. Please note that in order to submit a reservation, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an "UNAUTHORIZED" response.
Please note that this feature is currently in the alpha stage and may change in the future.
#### Elements of the POST request object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| queue | string | The (reservable) queue you are submitting to|
| reservation-definition | object | A set of constraints representing the need for resources over time of a user. |
Elements of the *reservation-definition* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
|arrival | long | The UTC time representation of the earliest time this reservation can be allocated from. |
| deadline | long | The UTC time representation of the latest time within which this reservation can be allocated. |
| reservation-name | string | A mnemonic name of the reservation (not a valid identifier). |
| reservation-requests | object | A list of "stages" or phases of this reservation, each describing resource requirements and duration |
Elements of the *reservation-requests* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| reservation-request-interpreter | int | A numeric choice of how to interpret the set of ReservationRequest: 0 is an ANY, 1 for ALL, 2 for ORDER, 3 for ORDER\_NO\_GAP |
| reservation-request | object | The description of the resource and time capabilities for a phase/stage of this reservation |
Elements of the *reservation-request* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| duration | long | The duration of a ReservationRequeust in milliseconds (amount of consecutive milliseconds a satisfiable allocation for this portion of the reservation should exist for). |
| num-containers | int | The number of containers required in this phase of the reservation (capture the maximum parallelism of the job(s) in this phase). |
| min-concurrency | int | The minimum number of containers that must be concurrently allocated to satisfy this allocation (capture min-parallelism, useful to express gang semantics). |
| capability | object | Allows to specify the size of each container (memory, vCores).|
Elements of the *capability* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| memory | int | the number of MB of memory for this container |
| vCores | int | the number of virtual cores for this container |
**JSON response**
This examples contains a reservation composed of two stages (alternative to each other as the *reservation-request-interpreter* is set to 0), so that the first is shorter and "taller" and "gang"
with exactly 220 containers for 60 seconds, while the second alternative is longer with 120 seconds duration and less tall with 110 containers (and a min-concurrency of 1 container, thus no gang semantics).
HTTP Request:
```json
POST http://rmdns:8088/ws/v1/cluster/reservation/submit
Content-Type: application/json
{
"queue" : "dedicated",
"reservation-definition" : {
"arrival" : 1765541532000,
"deadline" : 1765542252000,
"reservation-name" : "res_1",
"reservation-requests" : {
"reservation-request-interpreter" : 0,
"reservation-request" : [
{
"duration" : 60000,
"num-containers" : 220,
"min-concurrency" : 220,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
},
{
"duration" : 120000,
"num-containers" : 110,
"min-concurrency" : 1,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
}
]
}
}
}
```
Response Header:
200 OK
Cache-Control: no-cache
Expires: Thu, 17 Dec 2015 23:36:34 GMT, Thu, 17 Dec 2015 23:36:34 GMT
Date: Thu, 17 Dec 2015 23:36:34 GMT, Thu, 17 Dec 2015 23:36:34 GMT
Pragma: no-cache, no-cache
Content-Type: application/xml
Content-Encoding: gzip
Content-Length: 137
Server: Jetty(6.1.26)
Response Body:
```json
{"reservation-id":"reservation_1448064217915_0009"}
```
**XML response**
HTTP Request:
```xml
POST http://rmdns:8088/ws/v1/cluster/reservation/submit
Accept: application/xml
Content-Type: application/xml
<reservation-submission-context>
<queue>dedicated</queue>
<reservation-definition>
<arrival>1765541532000</arrival>
<deadline>1765542252000</deadline>
<reservation-name>res_1</reservation-name>
<reservation-requests>
<reservation-request-interpreter>0</reservation-request-interpreter>
<reservation-request>
<duration>60000</duration>
<num-containers>220</num-containers>
<min-concurrency>220</min-concurrency>
<capability>
<memory>1024</memory>
<vCores>1</vCores>
</capability>
</reservation-request>
<reservation-request>
<duration>120000</duration>
<num-containers>110</num-containers>
<min-concurrency>1</min-concurrency>
<capability>
<memory>1024</memory>
<vCores>1</vCores>
</capability>
</reservation-request>
</reservation-requests>
</reservation-definition>
</reservation-submission-context>
```
Response Header:
200 OK
Cache-Control: no-cache
Expires: Thu, 17 Dec 2015 23:49:21 GMT, Thu, 17 Dec 2015 23:49:21 GMT
Date: Thu, 17 Dec 2015 23:49:21 GMT, Thu, 17 Dec 2015 23:49:21 GMT
Pragma: no-cache, no-cache
Content-Type: application/xml
Content-Encoding: gzip
Content-Length: 137
Server: Jetty(6.1.26)
Response Body:
```xml
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<reservation-submission-response>
<reservation-id>reservation_1448064217915_0010</reservation-id>
</reservation-submission-response>
```
Cluster Reservation API Update
------------------------------
The Cluster Reservation API Update can be used to update existing reservations.Update of a Reservation works similarly to submit described above, but the user submits the reservation-id of an existing reservation to be updated. The semantics is a try-and-swap, successful operation will modify the existing reservation based on the requested update parameter, while a failed execution will leave the existing reservation unchanged.
### URI
* http://<rm http address:port>/ws/v1/cluster/reservation/update
### HTTP Operations Supported
* POST
### POST Response Examples
POST requests can be used to update reservations to the ResourceManager. Successful submissions result in a 200 response, indicate in-place update of the existing reservation (id does not change). Please note that in order to update a reservation, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an "UNAUTHORIZED" response.
Please note that this feature is currently in the alpha stage and may change in the future.
#### Elements of the POST request object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| reservation-id | string | The id of the reservation to be updated (the system automatically looks up the right queue from this)|
| reservation-definition | object | A set of constraints representing the need for resources over time of a user. |
Elements of the *reservation-definition* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
|arrival | long | The UTC time representation of the earliest time this reservation can be allocated from. |
| deadline | long | The UTC time representation of the latest time within which this reservation can be allocated. |
| reservation-name | string | A mnemonic name of the reservation (not a valid identifier). |
| reservation-requests | object | A list of "stages" or phases of this reservation, each describing resource requirements and duration |
Elements of the *reservation-requests* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| reservation-request-interpreter | int | A numeric choice of how to interpret the set of ReservationRequest: 0 is an ANY, 1 for ALL, 2 for ORDER, 3 for ORDER\_NO\_GAP |
| reservation-request | object | The description of the resource and time capabilities for a phase/stage of this reservation |
Elements of the *reservation-request* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| duration | long | The duration of a ReservationRequeust in milliseconds (amount of consecutive milliseconds a satisfiable allocation for this portion of the reservation should exist for). |
| num-containers | int | The number of containers required in this phase of the reservation (capture the maximum parallelism of the job(s) in this phase). |
| min-concurrency | int | The minimum number of containers that must be concurrently allocated to satisfy this allocation (capture min-parallelism, useful to express gang semantics). |
| capability | object | Allows to specify the size of each container (memory, vCores).|
Elements of the *capability* object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| memory | int | the number of MB of memory for this container |
| vCores | int | the number of virtual cores for this container |
**JSON response**
This examples updates an existing reservation identified by *reservation_1449259268893_0005* with two stages (in order as the *reservation-request-interpreter* is set to 2), with the first stage being a "gang" of 10 containers for 5 minutes (min-concurrency of 10 containers) followed by a 50 containers for 10 minutes(min-concurrency of 1 container, thus no gang semantics).
HTTP Request:
```json
POST http://rmdns:8088/ws/v1/cluster/reservation/update
Accept: application/json
Content-Type: application/json
{
"reservation-id" : "reservation_1449259268893_0005",
"reservation-definition" : {
"arrival" : 1765541532000,
"deadline" : 1765542252000,
"reservation-name" : "res_1",
"reservation-requests" : {
"reservation-request-interpreter" : 2,
"reservation-request" : [
{
"duration" : 300000,
"num-containers" : 10,
"min-concurrency" : 10,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
},
{
"duration" : 60000,
"num-containers" : 50,
"min-concurrency" : 1,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
}
]
}
}
}
```
Response Header:
200 OK
Cache-Control: no-cache
Expires: Thu, 17 Dec 2015 23:36:34 GMT, Thu, 17 Dec 2015 23:36:34 GMT
Date: Thu, 17 Dec 2015 23:36:34 GMT, Thu, 17 Dec 2015 23:36:34 GMT
Pragma: no-cache, no-cache
Content-Type: application/json
Content-Encoding: gzip
Content-Length: 137
Server: Jetty(6.1.26)
Response Body:
No response body
**XML response**
HTTP Request:
```xml
POST http://rmdns:8088/ws/v1/cluster/reservation/update
Accept: application/xml
Content-Type: application/xml
<reservation-update-context>
<reservation-id>reservation_1449259268893_0005</reservation-id>
<reservation-definition>
<arrival>1765541532000</arrival>
<deadline>1765542252000</deadline>
<reservation-name>res_1</reservation-name>
<reservation-requests>
<reservation-request-interpreter>2</reservation-request-interpreter>
<reservation-request>
<duration>300000</duration>
<num-containers>10</num-containers>
<min-concurrency>10</min-concurrency>
<capability>
<memory>1024</memory>
<vCores>1</vCores>
</capability>
</reservation-request>
<reservation-request>
<duration>60000</duration>
<num-containers>50</num-containers>
<min-concurrency>1</min-concurrency>
<capability>
<memory>1024</memory>
<vCores>1</vCores>
</capability>
</reservation-request>
</reservation-requests>
</reservation-definition>
</reservation-update-context>
```
Response Header:
200 OK
Cache-Control: no-cache
Expires: Thu, 17 Dec 2015 23:49:21 GMT, Thu, 17 Dec 2015 23:49:21 GMT
Date: Thu, 17 Dec 2015 23:49:21 GMT, Thu, 17 Dec 2015 23:49:21 GMT
Pragma: no-cache, no-cache
Content-Type: application/xml
Content-Encoding: gzip
Content-Length: 137
Server: Jetty(6.1.26)
Response Body:
No response body
Cluster Reservation API Delete
------------------------------
The Cluster Reservation API Delete can be used to delete existing reservations.Delete works similar to update. The requests contains the reservation-id, and if successful the reservation is cancelled, otherwise the reservation remains in the system.
### URI
* http://<rm http address:port>/ws/v1/cluster/reservation/delete
### HTTP Operations Supported
* POST
### POST Response Examples
POST requests can be used to delete reservations to the ResourceManager. Successful submissions result in a 200 response, indicating that the delete succeeded. Please note that in order to delete a reservation, you must have an authentication filter setup for the HTTP interface. The functionality requires that a username is set in the HttpServletRequest. If no filter is setup, the response will be an "UNAUTHORIZED" response.
Please note that this feature is currently in the alpha stage and may change in the future.
#### Elements of the POST request object
| Item | Data Type | Description |
|:---- |:---- |:---- |
| reservation-id | string | The id of the reservation to be deleted (the system automatically looks up the right queue from this)|
**JSON response**
This examples deletes an existing reservation identified by *reservation_1449259268893_0006*
HTTP Request:
```json
POST http://10.200.91.98:8088/ws/v1/cluster/reservation/delete
Accept: application/json
Content-Type: application/json
{
"reservation-id" : "reservation_1449259268893_0006"
}
```
Response Header:
200 OK
Cache-Control: no-cache
Expires: Fri, 18 Dec 2015 01:31:05 GMT, Fri, 18 Dec 2015 01:31:05 GMT
Date: Fri, 18 Dec 2015 01:31:05 GMT, Fri, 18 Dec 2015 01:31:05 GMT
Pragma: no-cache, no-cache
Content-Type: application/json
Content-Encoding: gzip
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
Response Body:
No response body
**XML response**
HTTP Request:
```xml
POST http://10.200.91.98:8088/ws/v1/cluster/reservation/delete
Accept: application/xml
Content-Type: application/xml
<reservation-delete-context>
<reservation-id>reservation_1449259268893_0006</reservation-id>
</reservation-delete-context>
```
Response Header:
200 OK
Cache-Control: no-cache
Expires: Fri, 18 Dec 2015 01:33:23 GMT, Fri, 18 Dec 2015 01:33:23 GMT
Date: Fri, 18 Dec 2015 01:33:23 GMT, Fri, 18 Dec 2015 01:33:23 GMT
Pragma: no-cache, no-cache
Content-Type: application/xml
Content-Encoding: gzip
Content-Length: 101
Server: Jetty(6.1.26)
Response Body:
No response body

View File

@ -32,3 +32,5 @@ The Scheduler has a pluggable policy which is responsible for partitioning the c
The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure. The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.
MapReduce in hadoop-2.x maintains **API compatibility** with previous stable release (hadoop-1.x). This means that all MapReduce jobs should still run unchanged on top of YARN with just a recompile.
YARN also supports the notion of **resource reservation** via the [ReservationSystem](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ReservationSystem.html), a component that allows users to specify a profile of resources over-time and temporal constraints (e.g., deadlines), and reserve resources to ensure the predictable execution of important jobs.The *ReservationSystem* tracks resources over-time, performs admission control for reservations, and dynamically instruct the underlying scheduler to ensure that the reservation is fullfilled.

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB