YARN-2557. Add a parameter "attempt_Failures_Validity_Interval" into
DistributedShell. Contributed by Xuan Gong
This commit is contained in:
parent
9f6891d9ef
commit
8e5d6713cf
|
@ -229,6 +229,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2547. Cross Origin Filter throws UnsupportedOperationException upon
|
YARN-2547. Cross Origin Filter throws UnsupportedOperationException upon
|
||||||
destroy (Mit Desai via jeagles)
|
destroy (Mit Desai via jeagles)
|
||||||
|
|
||||||
|
YARN-2557. Add a parameter "attempt_Failures_Validity_Interval" into
|
||||||
|
DistributedShell (xgong)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Client for Distributed Shell application submission to YARN.
|
* Client for Distributed Shell application submission to YARN.
|
||||||
|
@ -163,6 +162,8 @@ public class Client {
|
||||||
// flag to indicate whether to keep containers across application attempts.
|
// flag to indicate whether to keep containers across application attempts.
|
||||||
private boolean keepContainers = false;
|
private boolean keepContainers = false;
|
||||||
|
|
||||||
|
private long attemptFailuresValidityInterval = -1;
|
||||||
|
|
||||||
// Debug flag
|
// Debug flag
|
||||||
boolean debugFlag = false;
|
boolean debugFlag = false;
|
||||||
|
|
||||||
|
@ -248,6 +249,12 @@ public class Client {
|
||||||
" If the flag is true, running containers will not be killed when" +
|
" If the flag is true, running containers will not be killed when" +
|
||||||
" application attempt fails and these containers will be retrieved by" +
|
" application attempt fails and these containers will be retrieved by" +
|
||||||
" the new application attempt ");
|
" the new application attempt ");
|
||||||
|
opts.addOption("attempt_failures_validity_interval", true,
|
||||||
|
"when attempt_failures_validity_interval in milliseconds is set to > 0," +
|
||||||
|
"the failure number will not take failures which happen out of " +
|
||||||
|
"the validityInterval into failure count. " +
|
||||||
|
"If failure count reaches to maxAppAttempts, " +
|
||||||
|
"the application will be failed.");
|
||||||
opts.addOption("debug", false, "Dump out debug information");
|
opts.addOption("debug", false, "Dump out debug information");
|
||||||
opts.addOption("help", false, "Print usage");
|
opts.addOption("help", false, "Print usage");
|
||||||
|
|
||||||
|
@ -372,6 +379,10 @@ public class Client {
|
||||||
|
|
||||||
clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
|
clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
|
||||||
|
|
||||||
|
attemptFailuresValidityInterval =
|
||||||
|
Long.parseLong(cliParser.getOptionValue(
|
||||||
|
"attempt_failures_validity_interval", "-1"));
|
||||||
|
|
||||||
log4jPropFile = cliParser.getOptionValue("log_properties", "");
|
log4jPropFile = cliParser.getOptionValue("log_properties", "");
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -456,6 +467,11 @@ public class Client {
|
||||||
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
|
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
|
||||||
appContext.setApplicationName(appName);
|
appContext.setApplicationName(appName);
|
||||||
|
|
||||||
|
if (attemptFailuresValidityInterval >= 0) {
|
||||||
|
appContext
|
||||||
|
.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
|
||||||
|
}
|
||||||
|
|
||||||
// set local resources for the application master
|
// set local resources for the application master
|
||||||
// local files or archives as needed
|
// local files or archives as needed
|
||||||
// In this scenario, the jar file for the application master is part of the local resources
|
// In this scenario, the jar file for the application master is part of the local resources
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
public class TestDSSleepingAppMaster extends ApplicationMaster{
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestDSSleepingAppMaster.class);
|
||||||
|
private static final long SLEEP_TIME = 5000;
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
boolean result = false;
|
||||||
|
try {
|
||||||
|
TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
|
||||||
|
boolean doRun = appMaster.init(args);
|
||||||
|
if (!doRun) {
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
appMaster.run();
|
||||||
|
if (appMaster.appAttemptID.getAttemptId() <= 2) {
|
||||||
|
try {
|
||||||
|
// sleep some time
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
} catch (InterruptedException e) {}
|
||||||
|
// fail the first am.
|
||||||
|
System.exit(100);
|
||||||
|
}
|
||||||
|
result = appMaster.finish();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
if (result) {
|
||||||
|
LOG.info("Application Master completed successfully. exiting");
|
||||||
|
System.exit(0);
|
||||||
|
} else {
|
||||||
|
LOG.info("Application Master failed. exiting");
|
||||||
|
System.exit(2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -308,6 +308,82 @@ public class TestDistributedShell {
|
||||||
Assert.assertTrue(result);
|
Assert.assertTrue(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
|
||||||
|
* Set attempt_failures_validity_interval as 2.5 seconds. It will check
|
||||||
|
* how many attempt failures for previous 2.5 seconds.
|
||||||
|
* The application is expected to be successful.
|
||||||
|
*/
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"1",
|
||||||
|
"--shell_command",
|
||||||
|
"sleep 8",
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--attempt_failures_validity_interval",
|
||||||
|
"2500"
|
||||||
|
};
|
||||||
|
|
||||||
|
LOG.info("Initializing DS Client");
|
||||||
|
Configuration conf = yarnCluster.getConfig();
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
Client client = new Client(TestDSSleepingAppMaster.class.getName(),
|
||||||
|
new Configuration(conf));
|
||||||
|
|
||||||
|
client.init(args);
|
||||||
|
LOG.info("Running DS Client");
|
||||||
|
boolean result = client.run();
|
||||||
|
|
||||||
|
LOG.info("Client run completed. Result=" + result);
|
||||||
|
// application should succeed
|
||||||
|
Assert.assertTrue(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
|
||||||
|
* Set attempt_failures_validity_interval as 15 seconds. It will check
|
||||||
|
* how many attempt failure for previous 15 seconds.
|
||||||
|
* The application is expected to be fail.
|
||||||
|
*/
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"1",
|
||||||
|
"--shell_command",
|
||||||
|
"sleep 8",
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--attempt_failures_validity_interval",
|
||||||
|
"15000"
|
||||||
|
};
|
||||||
|
|
||||||
|
LOG.info("Initializing DS Client");
|
||||||
|
Configuration conf = yarnCluster.getConfig();
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
Client client = new Client(TestDSSleepingAppMaster.class.getName(),
|
||||||
|
new Configuration(conf));
|
||||||
|
|
||||||
|
client.init(args);
|
||||||
|
LOG.info("Running DS Client");
|
||||||
|
boolean result = client.run();
|
||||||
|
|
||||||
|
LOG.info("Client run completed. Result=" + result);
|
||||||
|
// application should be failed
|
||||||
|
Assert.assertFalse(result);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=90000)
|
@Test(timeout=90000)
|
||||||
public void testDSShellWithCustomLogPropertyFile() throws Exception {
|
public void testDSShellWithCustomLogPropertyFile() throws Exception {
|
||||||
final File basedir =
|
final File basedir =
|
||||||
|
|
Loading…
Reference in New Issue