YARN-4584. RM startup failure when AM attempts greater than max-attempts. (Bibin A Chundatt via rohithsharmaks)

(cherry picked from commit e30668106d)
This commit is contained in:
Rohith Sharma K S 2016-01-22 10:14:46 +05:30
parent 245c3728ef
commit 844a8e3771
3 changed files with 77 additions and 7 deletions

View File

@ -79,6 +79,9 @@ Release 2.9.0 - UNRELEASED
YARN-4611. Fix scheduler load simulator to support multi-layer network YARN-4611. Fix scheduler load simulator to support multi-layer network
location. (Ming Ma via xgong) location. (Ming Ma via xgong)
YARN-4584. RM startup failure when AM attempts greater than max-attempts.
(Bibin A Chundatt via rohithsharmaks)
Release 2.8.0 - UNRELEASED Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -841,7 +841,7 @@ public class RMAppImpl implements RMApp, Recoverable {
this.startTime = appState.getStartTime(); this.startTime = appState.getStartTime();
this.callerContext = appState.getCallerContext(); this.callerContext = appState.getCallerContext();
// If interval > 0, some attempts might have been deleted. // If interval > 0, some attempts might have been deleted.
if (submissionContext.getAttemptFailuresValidityInterval() > 0) { if (this.attemptFailuresValidityInterval > 0) {
this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
this.nextAttemptId = firstAttemptIdInStateStore; this.nextAttemptId = firstAttemptIdInStateStore;
} }
@ -1341,7 +1341,9 @@ public class RMAppImpl implements RMApp, Recoverable {
+ "is " + numberOfFailure + ". The max attempts is " + "is " + numberOfFailure + ". The max attempts is "
+ app.maxAppAttempts); + app.maxAppAttempts);
if (app.attemptFailuresValidityInterval > 0) {
removeExcessAttempts(app); removeExcessAttempts(app);
}
if (!app.submissionContext.getUnmanagedAM() if (!app.submissionContext.getUnmanagedAM()
&& numberOfFailure < app.maxAppAttempts) { && numberOfFailure < app.maxAppAttempts) {
@ -1381,15 +1383,22 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
private void removeExcessAttempts(RMAppImpl app) { private void removeExcessAttempts(RMAppImpl app) {
while (app.nextAttemptId - app.firstAttemptIdInStateStore while (app.nextAttemptId
> app.maxAppAttempts) { - app.firstAttemptIdInStateStore > app.maxAppAttempts) {
// attempts' first element is oldest attempt because it is a // attempts' first element is oldest attempt because it is a
// LinkedHashMap // LinkedHashMap
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
app.getApplicationId(), app.firstAttemptIdInStateStore); app.getApplicationId(), app.firstAttemptIdInStateStore);
RMAppAttempt rmAppAttempt = app.getRMAppAttempt(attemptId);
long endTime = app.systemClock.getTime();
if (rmAppAttempt.getFinishTime() < (endTime
- app.attemptFailuresValidityInterval)) {
app.firstAttemptIdInStateStore++; app.firstAttemptIdInStateStore++;
LOG.info("Remove attempt from state store : " + attemptId); LOG.info("Remove attempt from state store : " + attemptId);
app.rmContext.getStateStore().removeApplicationAttempt(attemptId); app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
} else {
break;
}
} }
} }
} }

View File

@ -42,6 +42,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -104,6 +106,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -121,6 +125,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
public class TestRMRestart extends ParameterizedSchedulerTestBase { public class TestRMRestart extends ParameterizedSchedulerTestBase {
private static final Log LOG = LogFactory.getLog(TestRMRestart.class);
private final static File TEMP_DIR = new File(System.getProperty( private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision"); "test.build.data", "/tmp"), "decommision");
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
@ -2321,4 +2326,57 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm2.stop(); rm2.stop();
} }
@Test(timeout = 120000)
public void testRMRestartAfterPreemption() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
if (!getSchedulerType().equals(SchedulerType.CAPACITY)) {
return;
}
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
int CONTAINER_MEMORY = 1024;
// create app and launch the AM
RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1,
ContainerState.COMPLETE);
am0.waitForState(RMAppAttemptState.FAILED);
for (int i = 0; i < 4; i++) {
am0 = MockRM.launchAM(app0, rm1, nm1);
am0.registerAppAttempt();
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
// get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app0.getApplicationId()).getCurrentAppAttempt();
// kill app0-attempt
cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(
app0.getCurrentAppAttempt().getMasterContainer().getId()));
}
am0 = MockRM.launchAM(app0, rm1, nm1);
am0.registerAppAttempt();
rm1.killApp(app0.getApplicationId());
rm1.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.KILLED);
MockRM rm2 = null;
// start RM2
try {
rm2 = new MockRM(conf, memStore);
rm2.start();
Assert.assertTrue("RM start successfully", true);
} catch (Exception e) {
LOG.debug("Exception on start", e);
Assert.fail("RM should start with out any issue");
} finally {
rm1.stop();
}
}
} }