YARN-4497. RM might fail to restart when recovering apps whose attempts are missing. (Jun Gong via rohithsharmaks)

This commit is contained in:
rohithsharmaks 2016-01-22 20:27:38 +05:30
parent 0bae506c22
commit d6258b33a7
4 changed files with 96 additions and 2 deletions

View File

@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED
YARN-4578. Directories that are mounted in docker containers need to be more YARN-4578. Directories that are mounted in docker containers need to be more
restrictive/container-specific. (Sidharta Seethana via vvasudev) restrictive/container-specific. (Sidharta Seethana via vvasudev)
YARN-4497. RM might fail to restart when recovering apps whose attempts are missing.
(Jun Gong via rohithsharmaks)
Release 2.8.0 - UNRELEASED Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -34,6 +34,7 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@ -849,17 +850,32 @@ public void recover(RMState state) {
// send the ATS create Event // send the ATS create Event
sendATSCreateEvent(this, this.startTime); sendATSCreateEvent(this, this.startTime);
for(int i=0; i<appState.getAttemptCount(); ++i) { RMAppAttemptImpl preAttempt = null;
for (ApplicationAttemptId attemptId :
new TreeSet<>(appState.attempts.keySet())) {
// create attempt // create attempt
createNewAttempt(); createNewAttempt(attemptId);
((RMAppAttemptImpl)this.currentAttempt).recover(state); ((RMAppAttemptImpl)this.currentAttempt).recover(state);
// If previous attempt is not in final state, it means we failed to store
// its final state. We set it to FAILED now because we could not make sure
// about its final state.
if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) {
preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED);
}
preAttempt = (RMAppAttemptImpl)currentAttempt;
}
if (currentAttempt != null) {
nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1;
} }
} }
private void createNewAttempt() { private void createNewAttempt() {
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, nextAttemptId++); ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
createNewAttempt(appAttemptId);
}
private void createNewAttempt(ApplicationAttemptId appAttemptId) {
BlacklistManager currentAMBlacklist; BlacklistManager currentAMBlacklist;
if (currentAttempt != null) { if (currentAttempt != null) {
currentAMBlacklist = currentAttempt.getAMBlacklist(); currentAMBlacklist = currentAttempt.getAMBlacklist();
@ -1803,4 +1819,10 @@ public boolean isAmBlacklistingEnabled() {
public float getAmBlacklistingDisableThreshold() { public float getAmBlacklistingDisableThreshold() {
return blacklistDisableThreshold; return blacklistDisableThreshold;
} }
@Private
@VisibleForTesting
public int getNextAttemptId() {
return nextAttemptId;
}
} }

View File

@ -2113,4 +2113,12 @@ private void setFinishTime(long finishTime) {
public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) { public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) {
this.amLaunchDiagnostics = amLaunchDiagnostics; this.amLaunchDiagnostics = amLaunchDiagnostics;
} }
public RMAppAttemptState getRecoveredFinalState() {
return recoveredFinalState;
}
public void setRecoveredFinalState(RMAppAttemptState finalState) {
this.recoveredFinalState = finalState;
}
} }

View File

@ -101,6 +101,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -2379,4 +2380,64 @@ public void testRMRestartAfterPreemption() throws Exception {
} }
} }
@Test(timeout = 60000)
public void testRMRestartOnMissingAttempts() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// start RM
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create an app and finish the app.
RMApp app0 = rm1.submitApp(200);
ApplicationStateData app0State = memStore.getState().getApplicationState()
.get(app0.getApplicationId());
MockAM am0 = launchAndFailAM(app0, rm1, nm1);
MockAM am1 = launchAndFailAM(app0, rm1, nm1);
MockAM am2 = launchAndFailAM(app0, rm1, nm1);
MockAM am3 = launchAM(app0, rm1, nm1);
// am1 is missed from MemoryRMStateStore
memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
ApplicationAttemptStateData am2State = app0State.getAttempt(
am2.getApplicationAttemptId());
// am2's state is not consistent: MemoryRMStateStore just saved its initial
// state and failed to store its final state
am2State.setState(null);
// restart rm
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
RMApp recoveredApp0 = rm2.getRMContext().getRMApps().values()
.iterator().next();
Map<ApplicationAttemptId, RMAppAttempt> recoveredAppAttempts
= recoveredApp0.getAppAttempts();
Assert.assertEquals(3, recoveredAppAttempts.size());
Assert.assertEquals(RMAppAttemptState.FAILED,
recoveredAppAttempts.get(
am0.getApplicationAttemptId()).getAppAttemptState());
Assert.assertEquals(RMAppAttemptState.FAILED,
recoveredAppAttempts.get(
am2.getApplicationAttemptId()).getAppAttemptState());
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
recoveredAppAttempts.get(
am3.getApplicationAttemptId()).getAppAttemptState());
Assert.assertEquals(5, ((RMAppImpl)app0).getNextAttemptId());
}
private MockAM launchAndFailAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
MockAM am = launchAM(app, rm, nm);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FAILED);
return am;
}
} }