From d6258b33a7428a0725ead96bc43f4dd444c7c8f1 Mon Sep 17 00:00:00 2001 From: rohithsharmaks Date: Fri, 22 Jan 2016 20:27:38 +0530 Subject: [PATCH] YARN-4497. RM might fail to restart when recovering apps whose attempts are missing. (Jun Gong via rohithsharmaks) --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 26 +++++++- .../rmapp/attempt/RMAppAttemptImpl.java | 8 +++ .../server/resourcemanager/TestRMRestart.java | 61 +++++++++++++++++++ 4 files changed, 96 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2230b4293ac..b667b5bed8b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED YARN-4578. Directories that are mounted in docker containers need to be more restrictive/container-specific. (Sidharta Seethana via vvasudev) + YARN-4497. RM might fail to restart when recovering apps whose attempts are missing. + (Jun Gong via rohithsharmaks) + Release 2.8.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 1a390df2180..10c9edc2b40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -849,17 +850,32 @@ public void recover(RMState state) { // send the ATS create Event sendATSCreateEvent(this, this.startTime); - for(int i=0; i(appState.attempts.keySet())) { // create attempt - createNewAttempt(); + createNewAttempt(attemptId); ((RMAppAttemptImpl)this.currentAttempt).recover(state); + // If previous attempt is not in final state, it means we failed to store + // its final state. We set it to FAILED now because we could not make sure + // about its final state. + if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) { + preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED); + } + preAttempt = (RMAppAttemptImpl)currentAttempt; + } + if (currentAttempt != null) { + nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1; } } private void createNewAttempt() { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, nextAttemptId++); + createNewAttempt(appAttemptId); + } + private void createNewAttempt(ApplicationAttemptId appAttemptId) { BlacklistManager currentAMBlacklist; if (currentAttempt != null) { currentAMBlacklist = currentAttempt.getAMBlacklist(); @@ -1803,4 +1819,10 @@ public boolean isAmBlacklistingEnabled() { public float getAmBlacklistingDisableThreshold() { return blacklistDisableThreshold; } + + @Private + @VisibleForTesting + public int getNextAttemptId() { + return nextAttemptId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8aefe9f908d..3f45cb41978 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -2113,4 +2113,12 @@ private void setFinishTime(long finishTime) { public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) { this.amLaunchDiagnostics = amLaunchDiagnostics; } + + public RMAppAttemptState getRecoveredFinalState() { + return recoveredFinalState; + } + + public void setRecoveredFinalState(RMAppAttemptState finalState) { + this.recoveredFinalState = finalState; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 625c3b890f2..3bab88a2485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -2379,4 +2380,64 @@ public void testRMRestartAfterPreemption() throws Exception { } } + @Test(timeout = 60000) + public void testRMRestartOnMissingAttempts() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = createMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create an app and finish the app. + RMApp app0 = rm1.submitApp(200); + ApplicationStateData app0State = memStore.getState().getApplicationState() + .get(app0.getApplicationId()); + + MockAM am0 = launchAndFailAM(app0, rm1, nm1); + MockAM am1 = launchAndFailAM(app0, rm1, nm1); + MockAM am2 = launchAndFailAM(app0, rm1, nm1); + MockAM am3 = launchAM(app0, rm1, nm1); + + // am1 is missed from MemoryRMStateStore + memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId()); + ApplicationAttemptStateData am2State = app0State.getAttempt( + am2.getApplicationAttemptId()); + // am2's state is not consistent: MemoryRMStateStore just saved its initial + // state and failed to store its final state + am2State.setState(null); + + // restart rm + MockRM rm2 = createMockRM(conf, memStore); + rm2.start(); + + Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); + RMApp recoveredApp0 = rm2.getRMContext().getRMApps().values() + .iterator().next(); + Map recoveredAppAttempts + = recoveredApp0.getAppAttempts(); + Assert.assertEquals(3, recoveredAppAttempts.size()); + Assert.assertEquals(RMAppAttemptState.FAILED, + recoveredAppAttempts.get( + am0.getApplicationAttemptId()).getAppAttemptState()); + Assert.assertEquals(RMAppAttemptState.FAILED, + recoveredAppAttempts.get( + am2.getApplicationAttemptId()).getAppAttemptState()); + Assert.assertEquals(RMAppAttemptState.LAUNCHED, + recoveredAppAttempts.get( + am3.getApplicationAttemptId()).getAppAttemptState()); + Assert.assertEquals(5, ((RMAppImpl)app0).getNextAttemptId()); + } + + private MockAM launchAndFailAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + MockAM am = launchAM(app, rm, nm); + nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FAILED); + return am; + } }