From ef9f251679d7e87698eecd6a119652900274a172 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Wed, 17 Apr 2013 20:19:43 +0000 Subject: [PATCH] YARN-514.Delayed store operations should not result in RM unavailability for app submission (Zhijie Shen via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1469059 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../api/records/YarnApplicationState.java | 13 +- .../src/main/proto/yarn_protos.proto | 13 +- .../resourcemanager/ClientRMService.java | 14 --- .../server/resourcemanager/RMAppManager.java | 10 +- .../recovery/MemoryRMStateStore.java | 14 +++ .../recovery/RMStateStore.java | 61 ++++++++-- .../recovery/RMStateStoreAppEvent.java | 35 ++++++ .../recovery/RMStateStoreEventType.java | 1 + .../resourcemanager/rmapp/RMAppEventType.java | 2 + .../resourcemanager/rmapp/RMAppImpl.java | 47 +++++++- .../resourcemanager/rmapp/RMAppState.java | 10 +- .../rmapp/RMAppStoredEvent.java | 36 ++++++ .../resourcemanager/webapp/RmController.java | 1 + .../resourcemanager/webapp/dao/AppInfo.java | 4 +- .../resourcemanager/TestAppManager.java | 3 +- .../rmapp/TestRMAppTransitions.java | 114 ++++++++++++++++-- 17 files changed, 325 insertions(+), 56 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a94992cca72..139550f8849 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -145,6 +145,9 @@ Release 2.0.5-beta - UNRELEASED YARN-495. Changed NM reboot behaviour to be a simple resync - kill all containers and re-register with RM. (Jian He via vinodkv) + YARN-514. Delayed store operations should not result in RM unavailability + for app submission (Zhijie Shen via bikas) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java index 7b809da17ba..edbd34d6454 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java @@ -30,9 +30,15 @@ public enum YarnApplicationState { /** Application which was just created. */ NEW, + /** Application which is being saved. */ + NEW_SAVING, + /** Application which has been submitted. */ SUBMITTED, - + + /** Application has been accepted by the scheduler */ + ACCEPTED, + /** Application which is currently running. */ RUNNING, @@ -43,8 +49,5 @@ public enum YarnApplicationState { FAILED, /** Application which was terminated by a user or admin. */ - KILLED, - - /** Application has been accepted by the scheduler */ - ACCEPTED + KILLED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d7a4d86a99a..c865c5dda74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -72,12 +72,13 @@ message ContainerProto { enum YarnApplicationStateProto { NEW = 1; - SUBMITTED = 2; - RUNNING = 3; - FINISHED = 4; - FAILED = 5; - KILLED = 6; - ACCEPTED = 7; + NEW_SAVING = 2; + SUBMITTED = 3; + ACCEPTED = 4; + RUNNING = 5; + FINISHED = 6; + FAILED = 7; + KILLED = 8; } enum FinalApplicationStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 1c3c55ea730..bcc1f6465fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -297,20 +297,6 @@ public class ClientRMService extends AbstractService implements // So call handle directly and do not send an event. rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System .currentTimeMillis())); - - // If recovery is enabled then store the application information in a - // blocking call so make sure that RM has stored the information needed - // to restart the AM after RM restart without further client communication - RMStateStore stateStore = rmContext.getStateStore(); - LOG.info("Storing Application with id " + applicationId); - try { - stateStore.storeApplication(rmContext.getRMApps().get(applicationId)); - } catch (Exception e) { - // For HA this exception needs to be handled by giving up - // master status if we got fenced - LOG.error("Failed to store application:" + applicationId, e); - ExitUtil.terminate(1, e); - } LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 7c4f9d75d5c..8a92ab10238 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -232,7 +232,8 @@ public class RMAppManager implements EventHandler, @SuppressWarnings("unchecked") protected void submitApplication( - ApplicationSubmissionContext submissionContext, long submitTime) { + ApplicationSubmissionContext submissionContext, long submitTime, + boolean isRecovered) { ApplicationId applicationId = submissionContext.getApplicationId(); RMApp application = null; try { @@ -278,7 +279,8 @@ public class RMAppManager implements EventHandler, // All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.START)); + new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER: + RMAppEventType.START)); } catch (IOException ie) { LOG.info("RMAppManager submit application exception", ie); if (application != null) { @@ -347,7 +349,7 @@ public class RMAppManager implements EventHandler, if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), - appState.getSubmitTime()); + appState.getSubmitTime(), true); // re-populate attempt information in application RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( appState.getAppId()); @@ -378,7 +380,7 @@ public class RMAppManager implements EventHandler, ApplicationSubmissionContext submissionContext = ((RMAppManagerSubmitEvent)event).getSubmissionContext(); long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime(); - submitApplication(submissionContext, submitTime); + submitApplication(submissionContext, submitTime, false); } break; default: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index c5d59378401..4b398d4f025 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -63,6 +65,11 @@ public class MemoryRMStateStore extends RMStateStore { ApplicationState appState = new ApplicationState( appStateData.getSubmitTime(), appStateData.getApplicationSubmissionContext()); + if (state.appState.containsKey(appState.getAppId())) { + Exception e = new IOException("App: " + appId + " is already stored."); + LOG.info("Error storing info for app: " + appId, e); + throw e; + } state.appState.put(appState.getAppId(), appState); } @@ -79,6 +86,13 @@ public class MemoryRMStateStore extends RMStateStore { attemptState.getAttemptId().getApplicationId()); assert appState != null; + if (appState.attempts.containsKey(attemptState.getAttemptId())) { + Exception e = new IOException("Attempt: " + + attemptState.getAttemptId() + " is already stored."); + LOG.info("Error storing info for attempt: " + + attemptState.getAttemptId(), e); + throw e; + } appState.attempts.put(attemptState.getAttemptId(), attemptState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 674a779cc2b..87a2608a38d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; @@ -166,21 +167,19 @@ public abstract class RMStateStore { public abstract RMState loadState() throws Exception; /** - * Blocking API + * Non-Blocking API * ResourceManager services use this to store the application's state - * This must not be called on the dispatcher thread + * This does not block the dispatcher threads + * RMAppStoredEvent will be sent on completion to notify the RMApp */ - public synchronized void storeApplication(RMApp app) throws Exception { + @SuppressWarnings("unchecked") + public synchronized void storeApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; - - ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl(); - appStateData.setSubmitTime(app.getSubmitTime()); - appStateData.setApplicationSubmissionContext(context); - - LOG.info("Storing info for app: " + context.getApplicationId()); - storeApplicationState(app.getApplicationId().toString(), appStateData); + ApplicationState appState = new ApplicationState( + app.getSubmitTime(), context); + dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } /** @@ -255,6 +254,30 @@ public abstract class RMStateStore { private synchronized void handleStoreEvent(RMStateStoreEvent event) { switch(event.getType()) { + case STORE_APP: + { + ApplicationState apptState = + ((RMStateStoreAppEvent) event).getAppState(); + Exception storedException = null; + ApplicationStateDataPBImpl appStateData = + new ApplicationStateDataPBImpl(); + appStateData.setSubmitTime(apptState.getSubmitTime()); + appStateData.setApplicationSubmissionContext( + apptState.getApplicationSubmissionContext()); + ApplicationId appId = + apptState.getApplicationSubmissionContext().getApplicationId(); + + LOG.info("Storing info for app: " + appId); + try { + storeApplicationState(appId.toString(), appStateData); + } catch (Exception e) { + LOG.error("Error storing app: " + appId, e); + storedException = e; + } finally { + notifyDoneStoringApplication(appId, storedException); + } + } + break; case STORE_APP_ATTEMPT: { ApplicationAttemptState attemptState = @@ -297,11 +320,25 @@ public abstract class RMStateStore { LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); } } + + @SuppressWarnings("unchecked") + /** + * In (@link handleStoreEvent}, this method is called to notify the + * application about operation completion + * @param appId id of the application that has been saved + * @param storedException the exception that is thrown when storing the + * application + */ + private void notifyDoneStoringApplication(ApplicationId appId, + Exception storedException) { + rmDispatcher.getEventHandler().handle( + new RMAppStoredEvent(appId, storedException)); + } @SuppressWarnings("unchecked") /** - * In (@link storeApplicationAttempt}, derived class can call this method to - * notify the application attempt about operation completion + * In (@link handleStoreEvent}, this method is called to notify the + * application attempt about operation completion * @param appAttempt attempt that has been saved */ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java new file mode 100644 index 00000000000..99f8e37891a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; + +public class RMStateStoreAppEvent extends RMStateStoreEvent { + + private final ApplicationState appState; + + public RMStateStoreAppEvent(ApplicationState appState) { + super(RMStateStoreEventType.STORE_APP); + this.appState = appState; + } + + public ApplicationState getAppState() { + return appState; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index 22f155cbf26..f5383fa5c05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; public enum RMStateStoreEventType { STORE_APP_ATTEMPT, + STORE_APP, REMOVE_APP } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 20eef1d5c9e..d15e12e0014 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; public enum RMAppEventType { // Source: ClientRMService START, + RECOVER, KILL, // Source: RMAppAttempt APP_REJECTED, APP_ACCEPTED, + APP_SAVED, ATTEMPT_REGISTERED, ATTEMPT_FINISHING, ATTEMPT_FINISHED, // Will send the final state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 62a3ba7218d..7b63cbe8048 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -118,13 +119,25 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, + RMAppEventType.START, new RMAppSavingTransition()) .addTransition(RMAppState.NEW, RMAppState.SUBMITTED, - RMAppEventType.START, new StartAppAttemptTransition()) + RMAppEventType.RECOVER, new StartAppAttemptTransition()) .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, new AppKilledTransition()) .addTransition(RMAppState.NEW, RMAppState.FAILED, RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + // Transitions from NEW_SAVING state + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, + RMAppEventType.APP_SAVED, new StartAppAttemptTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.KILLED, + RMAppEventType.KILL, new AppKilledTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED, + RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) @@ -182,7 +195,7 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from FAILED state .addTransition(RMAppState.FAILED, RMAppState.FAILED, - RMAppEventType.KILL) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED)) // ignorable transitions .addTransition(RMAppState.FAILED, RMAppState.FAILED, RMAppEventType.NODE_UPDATE) @@ -194,7 +207,7 @@ public class RMAppImpl implements RMApp, Recoverable { EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.ATTEMPT_KILLED)) + RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED)) // ignorable transitions .addTransition(RMAppState.KILLED, RMAppState.KILLED, RMAppEventType.NODE_UPDATE) @@ -358,6 +371,8 @@ public class RMAppImpl implements RMApp, Recoverable { switch(rmAppState) { case NEW: return YarnApplicationState.NEW; + case NEW_SAVING: + return YarnApplicationState.NEW_SAVING; case SUBMITTED: return YarnApplicationState.SUBMITTED; case ACCEPTED: @@ -378,6 +393,7 @@ public class RMAppImpl implements RMApp, Recoverable { private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { switch(state) { case NEW: + case NEW_SAVING: case SUBMITTED: case ACCEPTED: case RUNNING: @@ -591,6 +607,19 @@ public class RMAppImpl implements RMApp, Recoverable { private static final class StartAppAttemptTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { + if (event.getType().equals(RMAppEventType.APP_SAVED)) { + assert app.getState().equals(RMAppState.NEW_SAVING); + RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; + if(storeEvent.getStoredException() != null) { + // For HA this exception needs to be handled by giving up + // master status if we got fenced + LOG.error("Failed to store application: " + + storeEvent.getApplicationId(), + storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); + } + } + app.createNewAttempt(true); }; } @@ -603,6 +632,18 @@ public class RMAppImpl implements RMApp, Recoverable { } } + private static final class RMAppSavingTransition extends RMAppTransition { + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + // If recovery is enabled then store the application information in a + // non-blocking call so make sure that RM has stored the information + // needed to restart the AM after RM restart without further client + // communication + LOG.info("Storing application with id " + app.applicationId); + app.rmContext.getStateStore().storeApplication(app); + } + } + private static class AppFinishedTransition extends FinalTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppFinishedAttemptEvent finishedEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java index a9e3ce1c8f9..b7f9325e2f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java @@ -19,5 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; public enum RMAppState { - NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHING, FINISHED, FAILED, KILLED + NEW, + NEW_SAVING, + SUBMITTED, + ACCEPTED, + RUNNING, + FINISHING, + FINISHED, + FAILED, + KILLED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java new file mode 100644 index 00000000000..76fb1df0bcf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class RMAppStoredEvent extends RMAppEvent { + + private final Exception storedException; + + public RMAppStoredEvent(ApplicationId appId, Exception storedException) { + super(appId, RMAppEventType.APP_SAVED); + this.storedException = storedException; + } + + public Exception getStoredException() { + return storedException; + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java index a4826e806c9..cec16f3bb52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java @@ -63,6 +63,7 @@ public class RmController extends Controller { // limit applications to those in states relevant to scheduling set(YarnWebParams.APP_STATE, StringHelper.cjoin( RMAppState.NEW.toString(), + RMAppState.NEW_SAVING.toString(), RMAppState.SUBMITTED.toString(), RMAppState.ACCEPTED.toString(), RMAppState.RUNNING.toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java index 8a38278e56f..0977ec9c7e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java @@ -83,7 +83,9 @@ public class AppInfo { String trackingUrl = app.getTrackingUrl(); this.state = app.getState(); this.trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty() - || RMAppState.NEW == this.state || RMAppState.SUBMITTED == this.state + || RMAppState.NEW == this.state + || RMAppState.NEW_SAVING == this.state + || RMAppState.SUBMITTED == this.state || RMAppState.ACCEPTED == this.state; this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app .getFinishTime() == 0 ? "ApplicationMaster" : "History"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index fb74cb605b2..f5cc7d3858a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -164,7 +164,8 @@ public class TestAppManager{ } public void submitApplication( ApplicationSubmissionContext submissionContext) { - super.submitApplication(submissionContext, System.currentTimeMillis()); + super.submitApplication( + submissionContext, System.currentTimeMillis(), false); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 8e8e4859902..527f1975318 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -138,8 +139,9 @@ public class TestRMAppTransitions { mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); + RMStateStore store = mock(RMStateStore.class); this.rmContext = - new RMContextImpl(rmDispatcher, + new RMContextImpl(rmDispatcher, store, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, new ApplicationTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), @@ -264,21 +266,45 @@ public class TestRMAppTransitions { diag.toString().matches(regex)); } - protected RMApp testCreateAppSubmitted( + protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); - // NEW => SUBMITTED event RMAppEventType.START + // NEW => NEW_SAVING event RMAppEventType.START RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); application.handle(event); assertStartTimeSet(application); + assertAppState(RMAppState.NEW_SAVING, application); + return application; + } + + protected RMApp testCreateAppSubmittedNoRecovery( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppNewSaving(submissionContext); + // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED + RMAppEvent event = + new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(event); + assertStartTimeSet(application); + assertAppState(RMAppState.SUBMITTED, application); + return application; + } + + protected RMApp testCreateAppSubmittedRecovery( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = createNewTestApp(submissionContext); + // NEW => SUBMITTED event RMAppEventType.RECOVER + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.RECOVER); + application.handle(event); + assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); return application; } protected RMApp testCreateAppAccepted( ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppSubmitted(submissionContext); + RMApp application = testCreateAppSubmittedNoRecovery(submissionContext); // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -375,7 +401,13 @@ public class TestRMAppTransitions { application.getDiagnostics().indexOf(diagMsg) != -1); } - @Test + @Test (timeout = 30000) + public void testAppRecoverPath() throws IOException { + LOG.info("--- START: testAppRecoverPath ---"); + testCreateAppSubmittedRecovery(null); + } + + @Test (timeout = 30000) public void testAppNewKill() throws IOException { LOG.info("--- START: testAppNewKill ---"); @@ -402,11 +434,38 @@ public class TestRMAppTransitions { assertFailed(application, rejectedText); } - @Test + @Test (timeout = 30000) + public void testAppNewSavingKill() throws IOException { + LOG.info("--- START: testAppNewSavingKill ---"); + + RMApp application = testCreateAppNewSaving(null); + // NEW_SAVING => KILLED event RMAppEventType.KILL + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + application.handle(event); + rmDispatcher.await(); + assertKilled(application); + } + + @Test (timeout = 30000) + public void testAppNewSavingReject() throws IOException { + LOG.info("--- START: testAppNewSavingReject ---"); + + RMApp application = testCreateAppNewSaving(null); + // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED + String rejectedText = "Test Application Rejected"; + RMAppEvent event = + new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + application.handle(event); + rmDispatcher.await(); + assertFailed(application, rejectedText); + } + + @Test (timeout = 30000) public void testAppSubmittedRejected() throws IOException { LOG.info("--- START: testAppSubmittedRejected ---"); - RMApp application = testCreateAppSubmitted(null); + RMApp application = testCreateAppSubmittedNoRecovery(null); // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "app rejected"; RMAppEvent event = @@ -419,7 +478,7 @@ public class TestRMAppTransitions { @Test public void testAppSubmittedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppSubmittedKill---"); - RMApp application = testCreateAppSubmitted(null); + RMApp application = testCreateAppSubmittedNoRecovery(null); // SUBMITTED => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -570,7 +629,37 @@ public class TestRMAppTransitions { "", diag.toString()); } - @Test + @Test (timeout = 30000) + public void testAppFailedFailed() throws IOException { + LOG.info("--- START: testAppFailedFailed ---"); + + RMApp application = testCreateAppNewSaving(null); + + // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED + RMAppEvent event = + new RMAppRejectedEvent(application.getApplicationId(), ""); + application.handle(event); + rmDispatcher.await(); + assertTimesAtFinish(application); + assertAppState(RMAppState.FAILED, application); + + // FAILED => FAILED event RMAppEventType.KILL + event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + application.handle(event); + rmDispatcher.await(); + assertTimesAtFinish(application); + assertAppState(RMAppState.FAILED, application); + + // FAILED => FAILED event RMAppEventType.APP_SAVED + event = new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(event); + rmDispatcher.await(); + assertTimesAtFinish(application); + assertAppState(RMAppState.FAILED, application); + } + + @Test (timeout = 30000) public void testAppKilledKilled() throws IOException { LOG.info("--- START: testAppKilledKilled ---"); @@ -616,6 +705,13 @@ public class TestRMAppTransitions { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); + + // KILLED => KILLED event RMAppEventType.APP_SAVED + event = new RMAppStoredEvent(application.getApplicationId(), null); + application.handle(event); + rmDispatcher.await(); + assertTimesAtFinish(application); + assertAppState(RMAppState.KILLED, application); } @Test