YARN-4497. RM might fail to restart when recovering apps whose attempts are missing. (Jun Gong via rohithsharmaks)
(cherry picked from commit d6258b33a7
)
This commit is contained in:
parent
6368d102b9
commit
493275b27b
|
@ -85,6 +85,9 @@ Release 2.9.0 - UNRELEASED
|
|||
YARN-4578. Directories that are mounted in docker containers need to be more
|
||||
restrictive/container-specific. (Sidharta Seethana via vvasudev)
|
||||
|
||||
YARN-4497. RM might fail to restart when recovering apps whose attempts are missing.
|
||||
(Jun Gong via rohithsharmaks)
|
||||
|
||||
Release 2.8.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
|
@ -849,17 +850,32 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// send the ATS create Event
|
||||
sendATSCreateEvent(this, this.startTime);
|
||||
|
||||
for(int i=0; i<appState.getAttemptCount(); ++i) {
|
||||
RMAppAttemptImpl preAttempt = null;
|
||||
for (ApplicationAttemptId attemptId :
|
||||
new TreeSet<>(appState.attempts.keySet())) {
|
||||
// create attempt
|
||||
createNewAttempt();
|
||||
createNewAttempt(attemptId);
|
||||
((RMAppAttemptImpl)this.currentAttempt).recover(state);
|
||||
// If previous attempt is not in final state, it means we failed to store
|
||||
// its final state. We set it to FAILED now because we could not make sure
|
||||
// about its final state.
|
||||
if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) {
|
||||
preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED);
|
||||
}
|
||||
preAttempt = (RMAppAttemptImpl)currentAttempt;
|
||||
}
|
||||
if (currentAttempt != null) {
|
||||
nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1;
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewAttempt() {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
|
||||
createNewAttempt(appAttemptId);
|
||||
}
|
||||
|
||||
private void createNewAttempt(ApplicationAttemptId appAttemptId) {
|
||||
BlacklistManager currentAMBlacklist;
|
||||
if (currentAttempt != null) {
|
||||
currentAMBlacklist = currentAttempt.getAMBlacklist();
|
||||
|
@ -1803,4 +1819,10 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
public float getAmBlacklistingDisableThreshold() {
|
||||
return blacklistDisableThreshold;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public int getNextAttemptId() {
|
||||
return nextAttemptId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2113,4 +2113,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) {
|
||||
this.amLaunchDiagnostics = amLaunchDiagnostics;
|
||||
}
|
||||
|
||||
public RMAppAttemptState getRecoveredFinalState() {
|
||||
return recoveredFinalState;
|
||||
}
|
||||
|
||||
public void setRecoveredFinalState(RMAppAttemptState finalState) {
|
||||
this.recoveredFinalState = finalState;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreRMDTMa
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
@ -2379,4 +2380,64 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRMRestartOnMissingAttempts() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = createMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// create an app and finish the app.
|
||||
RMApp app0 = rm1.submitApp(200);
|
||||
ApplicationStateData app0State = memStore.getState().getApplicationState()
|
||||
.get(app0.getApplicationId());
|
||||
|
||||
MockAM am0 = launchAndFailAM(app0, rm1, nm1);
|
||||
MockAM am1 = launchAndFailAM(app0, rm1, nm1);
|
||||
MockAM am2 = launchAndFailAM(app0, rm1, nm1);
|
||||
MockAM am3 = launchAM(app0, rm1, nm1);
|
||||
|
||||
// am1 is missed from MemoryRMStateStore
|
||||
memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
|
||||
ApplicationAttemptStateData am2State = app0State.getAttempt(
|
||||
am2.getApplicationAttemptId());
|
||||
// am2's state is not consistent: MemoryRMStateStore just saved its initial
|
||||
// state and failed to store its final state
|
||||
am2State.setState(null);
|
||||
|
||||
// restart rm
|
||||
MockRM rm2 = createMockRM(conf, memStore);
|
||||
rm2.start();
|
||||
|
||||
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
||||
RMApp recoveredApp0 = rm2.getRMContext().getRMApps().values()
|
||||
.iterator().next();
|
||||
Map<ApplicationAttemptId, RMAppAttempt> recoveredAppAttempts
|
||||
= recoveredApp0.getAppAttempts();
|
||||
Assert.assertEquals(3, recoveredAppAttempts.size());
|
||||
Assert.assertEquals(RMAppAttemptState.FAILED,
|
||||
recoveredAppAttempts.get(
|
||||
am0.getApplicationAttemptId()).getAppAttemptState());
|
||||
Assert.assertEquals(RMAppAttemptState.FAILED,
|
||||
recoveredAppAttempts.get(
|
||||
am2.getApplicationAttemptId()).getAppAttemptState());
|
||||
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
|
||||
recoveredAppAttempts.get(
|
||||
am3.getApplicationAttemptId()).getAppAttemptState());
|
||||
Assert.assertEquals(5, ((RMAppImpl)app0).getNextAttemptId());
|
||||
}
|
||||
|
||||
private MockAM launchAndFailAM(RMApp app, MockRM rm, MockNM nm)
|
||||
throws Exception {
|
||||
MockAM am = launchAM(app, rm, nm);
|
||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FAILED);
|
||||
return am;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue