YARN-514.Delayed store operations should not result in RM unavailability for app submission (Zhijie Shen via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1469059 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-04-17 20:19:43 +00:00
parent d090a3b600
commit ef9f251679
17 changed files with 325 additions and 56 deletions

View File

@ -145,6 +145,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-495. Changed NM reboot behaviour to be a simple resync - kill all YARN-495. Changed NM reboot behaviour to be a simple resync - kill all
containers and re-register with RM. (Jian He via vinodkv) containers and re-register with RM. (Jian He via vinodkv)
YARN-514. Delayed store operations should not result in RM unavailability
for app submission (Zhijie Shen via bikas)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -30,9 +30,15 @@ public enum YarnApplicationState {
/** Application which was just created. */ /** Application which was just created. */
NEW, NEW,
/** Application which is being saved. */
NEW_SAVING,
/** Application which has been submitted. */ /** Application which has been submitted. */
SUBMITTED, SUBMITTED,
/** Application has been accepted by the scheduler */
ACCEPTED,
/** Application which is currently running. */ /** Application which is currently running. */
RUNNING, RUNNING,
@ -43,8 +49,5 @@ public enum YarnApplicationState {
FAILED, FAILED,
/** Application which was terminated by a user or admin. */ /** Application which was terminated by a user or admin. */
KILLED, KILLED
/** Application has been accepted by the scheduler */
ACCEPTED
} }

View File

@ -72,12 +72,13 @@ message ContainerProto {
enum YarnApplicationStateProto { enum YarnApplicationStateProto {
NEW = 1; NEW = 1;
SUBMITTED = 2; NEW_SAVING = 2;
RUNNING = 3; SUBMITTED = 3;
FINISHED = 4; ACCEPTED = 4;
FAILED = 5; RUNNING = 5;
KILLED = 6; FINISHED = 6;
ACCEPTED = 7; FAILED = 7;
KILLED = 8;
} }
enum FinalApplicationStatusProto { enum FinalApplicationStatusProto {

View File

@ -297,20 +297,6 @@ public class ClientRMService extends AbstractService implements
// So call handle directly and do not send an event. // So call handle directly and do not send an event.
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis())); .currentTimeMillis()));
// If recovery is enabled then store the application information in a
// blocking call so make sure that RM has stored the information needed
// to restart the AM after RM restart without further client communication
RMStateStore stateStore = rmContext.getStateStore();
LOG.info("Storing Application with id " + applicationId);
try {
stateStore.storeApplication(rmContext.getRMApps().get(applicationId));
} catch (Exception e) {
// For HA this exception needs to be handled by giving up
// master status if we got fenced
LOG.error("Failed to store application:" + applicationId, e);
ExitUtil.terminate(1, e);
}
LOG.info("Application with id " + applicationId.getId() + LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user); " submitted by user " + user);

View File

@ -232,7 +232,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void submitApplication( protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime) { ApplicationSubmissionContext submissionContext, long submitTime,
boolean isRecovered) {
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null; RMApp application = null;
try { try {
@ -278,7 +279,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// All done, start the RMApp // All done, start the RMApp
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START)); new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
RMAppEventType.START));
} catch (IOException ie) { } catch (IOException ie) {
LOG.info("RMAppManager submit application exception", ie); LOG.info("RMAppManager submit application exception", ie);
if (application != null) { if (application != null) {
@ -347,7 +349,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
if(shouldRecover) { if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId()); LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(), submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime()); appState.getSubmitTime(), true);
// re-populate attempt information in application // re-populate attempt information in application
RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
appState.getAppId()); appState.getAppId());
@ -378,7 +380,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
ApplicationSubmissionContext submissionContext = ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext(); ((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime(); long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
submitApplication(submissionContext, submitTime); submitApplication(submissionContext, submitTime, false);
} }
break; break;
default: default:

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -63,6 +65,11 @@ public class MemoryRMStateStore extends RMStateStore {
ApplicationState appState = new ApplicationState( ApplicationState appState = new ApplicationState(
appStateData.getSubmitTime(), appStateData.getSubmitTime(),
appStateData.getApplicationSubmissionContext()); appStateData.getApplicationSubmissionContext());
if (state.appState.containsKey(appState.getAppId())) {
Exception e = new IOException("App: " + appId + " is already stored.");
LOG.info("Error storing info for app: " + appId, e);
throw e;
}
state.appState.put(appState.getAppId(), appState); state.appState.put(appState.getAppId(), appState);
} }
@ -79,6 +86,13 @@ public class MemoryRMStateStore extends RMStateStore {
attemptState.getAttemptId().getApplicationId()); attemptState.getAttemptId().getApplicationId());
assert appState != null; assert appState != null;
if (appState.attempts.containsKey(attemptState.getAttemptId())) {
Exception e = new IOException("Attempt: " +
attemptState.getAttemptId() + " is already stored.");
LOG.info("Error storing info for attempt: " +
attemptState.getAttemptId(), e);
throw e;
}
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
@ -166,21 +167,19 @@ public abstract class RMStateStore {
public abstract RMState loadState() throws Exception; public abstract RMState loadState() throws Exception;
/** /**
* Blocking API * Non-Blocking API
* ResourceManager services use this to store the application's state * ResourceManager services use this to store the application's state
* This must not be called on the dispatcher thread * This does not block the dispatcher threads
* RMAppStoredEvent will be sent on completion to notify the RMApp
*/ */
public synchronized void storeApplication(RMApp app) throws Exception { @SuppressWarnings("unchecked")
public synchronized void storeApplication(RMApp app) {
ApplicationSubmissionContext context = app ApplicationSubmissionContext context = app
.getApplicationSubmissionContext(); .getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl; assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationState appState = new ApplicationState(
ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl(); app.getSubmitTime(), context);
appStateData.setSubmitTime(app.getSubmitTime()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
appStateData.setApplicationSubmissionContext(context);
LOG.info("Storing info for app: " + context.getApplicationId());
storeApplicationState(app.getApplicationId().toString(), appStateData);
} }
/** /**
@ -255,6 +254,30 @@ public abstract class RMStateStore {
private synchronized void handleStoreEvent(RMStateStoreEvent event) { private synchronized void handleStoreEvent(RMStateStoreEvent event) {
switch(event.getType()) { switch(event.getType()) {
case STORE_APP:
{
ApplicationState apptState =
((RMStateStoreAppEvent) event).getAppState();
Exception storedException = null;
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl();
appStateData.setSubmitTime(apptState.getSubmitTime());
appStateData.setApplicationSubmissionContext(
apptState.getApplicationSubmissionContext());
ApplicationId appId =
apptState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try {
storeApplicationState(appId.toString(), appStateData);
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
storedException = e;
} finally {
notifyDoneStoringApplication(appId, storedException);
}
}
break;
case STORE_APP_ATTEMPT: case STORE_APP_ATTEMPT:
{ {
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
@ -297,11 +320,25 @@ public abstract class RMStateStore {
LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
} }
} }
@SuppressWarnings("unchecked")
/**
* In (@link handleStoreEvent}, this method is called to notify the
* application about operation completion
* @param appId id of the application that has been saved
* @param storedException the exception that is thrown when storing the
* application
*/
private void notifyDoneStoringApplication(ApplicationId appId,
Exception storedException) {
rmDispatcher.getEventHandler().handle(
new RMAppStoredEvent(appId, storedException));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
/** /**
* In (@link storeApplicationAttempt}, derived class can call this method to * In (@link handleStoreEvent}, this method is called to notify the
* notify the application attempt about operation completion * application attempt about operation completion
* @param appAttempt attempt that has been saved * @param appAttempt attempt that has been saved
*/ */
private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
public class RMStateStoreAppEvent extends RMStateStoreEvent {
private final ApplicationState appState;
public RMStateStoreAppEvent(ApplicationState appState) {
super(RMStateStoreEventType.STORE_APP);
this.appState = appState;
}
public ApplicationState getAppState() {
return appState;
}
}

View File

@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
public enum RMStateStoreEventType { public enum RMStateStoreEventType {
STORE_APP_ATTEMPT, STORE_APP_ATTEMPT,
STORE_APP,
REMOVE_APP REMOVE_APP
} }

View File

@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
public enum RMAppEventType { public enum RMAppEventType {
// Source: ClientRMService // Source: ClientRMService
START, START,
RECOVER,
KILL, KILL,
// Source: RMAppAttempt // Source: RMAppAttempt
APP_REJECTED, APP_REJECTED,
APP_ACCEPTED, APP_ACCEPTED,
APP_SAVED,
ATTEMPT_REGISTERED, ATTEMPT_REGISTERED,
ATTEMPT_FINISHING, ATTEMPT_FINISHING,
ATTEMPT_FINISHED, // Will send the final state ATTEMPT_FINISHED, // Will send the final state

View File

@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -118,13 +119,25 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW state // Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW, .addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppSavingTransition())
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED, .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
RMAppEventType.START, new StartAppAttemptTransition()) RMAppEventType.RECOVER, new StartAppAttemptTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition()) new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FAILED, .addTransition(RMAppState.NEW, RMAppState.FAILED,
RMAppEventType.APP_REJECTED, new AppRejectedTransition()) RMAppEventType.APP_REJECTED, new AppRejectedTransition())
// Transitions from NEW_SAVING state
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_SAVED, new StartAppAttemptTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.KILLED,
RMAppEventType.KILL, new AppKilledTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED,
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
// Transitions from SUBMITTED state // Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@ -182,7 +195,7 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from FAILED state // Transitions from FAILED state
.addTransition(RMAppState.FAILED, RMAppState.FAILED, .addTransition(RMAppState.FAILED, RMAppState.FAILED,
RMAppEventType.KILL) EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.FAILED, RMAppState.FAILED, .addTransition(RMAppState.FAILED, RMAppState.FAILED,
RMAppEventType.NODE_UPDATE) RMAppEventType.NODE_UPDATE)
@ -194,7 +207,7 @@ public class RMAppImpl implements RMApp, Recoverable {
EnumSet.of(RMAppEventType.APP_ACCEPTED, EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.ATTEMPT_KILLED)) RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.KILLED, RMAppState.KILLED, .addTransition(RMAppState.KILLED, RMAppState.KILLED,
RMAppEventType.NODE_UPDATE) RMAppEventType.NODE_UPDATE)
@ -358,6 +371,8 @@ public class RMAppImpl implements RMApp, Recoverable {
switch(rmAppState) { switch(rmAppState) {
case NEW: case NEW:
return YarnApplicationState.NEW; return YarnApplicationState.NEW;
case NEW_SAVING:
return YarnApplicationState.NEW_SAVING;
case SUBMITTED: case SUBMITTED:
return YarnApplicationState.SUBMITTED; return YarnApplicationState.SUBMITTED;
case ACCEPTED: case ACCEPTED:
@ -378,6 +393,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
switch(state) { switch(state) {
case NEW: case NEW:
case NEW_SAVING:
case SUBMITTED: case SUBMITTED:
case ACCEPTED: case ACCEPTED:
case RUNNING: case RUNNING:
@ -591,6 +607,19 @@ public class RMAppImpl implements RMApp, Recoverable {
private static final class StartAppAttemptTransition extends RMAppTransition { private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
if (event.getType().equals(RMAppEventType.APP_SAVED)) {
assert app.getState().equals(RMAppState.NEW_SAVING);
RMAppStoredEvent storeEvent = (RMAppStoredEvent) event;
if(storeEvent.getStoredException() != null) {
// For HA this exception needs to be handled by giving up
// master status if we got fenced
LOG.error("Failed to store application: "
+ storeEvent.getApplicationId(),
storeEvent.getStoredException());
ExitUtil.terminate(1, storeEvent.getStoredException());
}
}
app.createNewAttempt(true); app.createNewAttempt(true);
}; };
} }
@ -603,6 +632,18 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
} }
private static final class RMAppSavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
// communication
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeApplication(app);
}
}
private static class AppFinishedTransition extends FinalTransition { private static class AppFinishedTransition extends FinalTransition {
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppFinishedAttemptEvent finishedEvent = RMAppFinishedAttemptEvent finishedEvent =

View File

@ -19,5 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp; package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
public enum RMAppState { public enum RMAppState {
NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHING, FINISHED, FAILED, KILLED NEW,
NEW_SAVING,
SUBMITTED,
ACCEPTED,
RUNNING,
FINISHING,
FINISHED,
FAILED,
KILLED
} }

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class RMAppStoredEvent extends RMAppEvent {
private final Exception storedException;
public RMAppStoredEvent(ApplicationId appId, Exception storedException) {
super(appId, RMAppEventType.APP_SAVED);
this.storedException = storedException;
}
public Exception getStoredException() {
return storedException;
}
}

View File

@ -63,6 +63,7 @@ public class RmController extends Controller {
// limit applications to those in states relevant to scheduling // limit applications to those in states relevant to scheduling
set(YarnWebParams.APP_STATE, StringHelper.cjoin( set(YarnWebParams.APP_STATE, StringHelper.cjoin(
RMAppState.NEW.toString(), RMAppState.NEW.toString(),
RMAppState.NEW_SAVING.toString(),
RMAppState.SUBMITTED.toString(), RMAppState.SUBMITTED.toString(),
RMAppState.ACCEPTED.toString(), RMAppState.ACCEPTED.toString(),
RMAppState.RUNNING.toString(), RMAppState.RUNNING.toString(),

View File

@ -83,7 +83,9 @@ public class AppInfo {
String trackingUrl = app.getTrackingUrl(); String trackingUrl = app.getTrackingUrl();
this.state = app.getState(); this.state = app.getState();
this.trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty() this.trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty()
|| RMAppState.NEW == this.state || RMAppState.SUBMITTED == this.state || RMAppState.NEW == this.state
|| RMAppState.NEW_SAVING == this.state
|| RMAppState.SUBMITTED == this.state
|| RMAppState.ACCEPTED == this.state; || RMAppState.ACCEPTED == this.state;
this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app
.getFinishTime() == 0 ? "ApplicationMaster" : "History"); .getFinishTime() == 0 ? "ApplicationMaster" : "History");

View File

@ -164,7 +164,8 @@ public class TestAppManager{
} }
public void submitApplication( public void submitApplication(
ApplicationSubmissionContext submissionContext) { ApplicationSubmissionContext submissionContext) {
super.submitApplication(submissionContext, System.currentTimeMillis()); super.submitApplication(
submissionContext, System.currentTimeMillis(), false);
} }
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@ -138,8 +139,9 @@ public class TestRMAppTransitions {
mock(ContainerAllocationExpirer.class); mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
RMStateStore store = mock(RMStateStore.class);
this.rmContext = this.rmContext =
new RMContextImpl(rmDispatcher, new RMContextImpl(rmDispatcher, store,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new ApplicationTokenSecretManager(conf), null, new ApplicationTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf), new RMContainerTokenSecretManager(conf),
@ -264,21 +266,45 @@ public class TestRMAppTransitions {
diag.toString().matches(regex)); diag.toString().matches(regex));
} }
protected RMApp testCreateAppSubmitted( protected RMApp testCreateAppNewSaving(
ApplicationSubmissionContext submissionContext) throws IOException { ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext); RMApp application = createNewTestApp(submissionContext);
// NEW => SUBMITTED event RMAppEventType.START // NEW => NEW_SAVING event RMAppEventType.START
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.START); new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
application.handle(event); application.handle(event);
assertStartTimeSet(application); assertStartTimeSet(application);
assertAppState(RMAppState.NEW_SAVING, application);
return application;
}
protected RMApp testCreateAppSubmittedNoRecovery(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = testCreateAppNewSaving(submissionContext);
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
RMAppEvent event =
new RMAppStoredEvent(application.getApplicationId(), null);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
return application;
}
protected RMApp testCreateAppSubmittedRecovery(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
// NEW => SUBMITTED event RMAppEventType.RECOVER
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.RECOVER);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application); assertAppState(RMAppState.SUBMITTED, application);
return application; return application;
} }
protected RMApp testCreateAppAccepted( protected RMApp testCreateAppAccepted(
ApplicationSubmissionContext submissionContext) throws IOException { ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = testCreateAppSubmitted(submissionContext); RMApp application = testCreateAppSubmittedNoRecovery(submissionContext);
// SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), new RMAppEvent(application.getApplicationId(),
@ -375,7 +401,13 @@ public class TestRMAppTransitions {
application.getDiagnostics().indexOf(diagMsg) != -1); application.getDiagnostics().indexOf(diagMsg) != -1);
} }
@Test @Test (timeout = 30000)
public void testAppRecoverPath() throws IOException {
LOG.info("--- START: testAppRecoverPath ---");
testCreateAppSubmittedRecovery(null);
}
@Test (timeout = 30000)
public void testAppNewKill() throws IOException { public void testAppNewKill() throws IOException {
LOG.info("--- START: testAppNewKill ---"); LOG.info("--- START: testAppNewKill ---");
@ -402,11 +434,38 @@ public class TestRMAppTransitions {
assertFailed(application, rejectedText); assertFailed(application, rejectedText);
} }
@Test @Test (timeout = 30000)
public void testAppNewSavingKill() throws IOException {
LOG.info("--- START: testAppNewSavingKill ---");
RMApp application = testCreateAppNewSaving(null);
// NEW_SAVING => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
assertKilled(application);
}
@Test (timeout = 30000)
public void testAppNewSavingReject() throws IOException {
LOG.info("--- START: testAppNewSavingReject ---");
RMApp application = testCreateAppNewSaving(null);
// NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "Test Application Rejected";
RMAppEvent event =
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
application.handle(event);
rmDispatcher.await();
assertFailed(application, rejectedText);
}
@Test (timeout = 30000)
public void testAppSubmittedRejected() throws IOException { public void testAppSubmittedRejected() throws IOException {
LOG.info("--- START: testAppSubmittedRejected ---"); LOG.info("--- START: testAppSubmittedRejected ---");
RMApp application = testCreateAppSubmitted(null); RMApp application = testCreateAppSubmittedNoRecovery(null);
// SUBMITTED => FAILED event RMAppEventType.APP_REJECTED // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "app rejected"; String rejectedText = "app rejected";
RMAppEvent event = RMAppEvent event =
@ -419,7 +478,7 @@ public class TestRMAppTransitions {
@Test @Test
public void testAppSubmittedKill() throws IOException, InterruptedException { public void testAppSubmittedKill() throws IOException, InterruptedException {
LOG.info("--- START: testAppSubmittedKill---"); LOG.info("--- START: testAppSubmittedKill---");
RMApp application = testCreateAppSubmitted(null); RMApp application = testCreateAppSubmittedNoRecovery(null);
// SUBMITTED => KILLED event RMAppEventType.KILL // SUBMITTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEvent event = new RMAppEvent(application.getApplicationId(),
RMAppEventType.KILL); RMAppEventType.KILL);
@ -570,7 +629,37 @@ public class TestRMAppTransitions {
"", diag.toString()); "", diag.toString());
} }
@Test @Test (timeout = 30000)
public void testAppFailedFailed() throws IOException {
LOG.info("--- START: testAppFailedFailed ---");
RMApp application = testCreateAppNewSaving(null);
// NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED
RMAppEvent event =
new RMAppRejectedEvent(application.getApplicationId(), "");
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
// FAILED => FAILED event RMAppEventType.KILL
event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
// FAILED => FAILED event RMAppEventType.APP_SAVED
event = new RMAppStoredEvent(application.getApplicationId(), null);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application);
}
@Test (timeout = 30000)
public void testAppKilledKilled() throws IOException { public void testAppKilledKilled() throws IOException {
LOG.info("--- START: testAppKilledKilled ---"); LOG.info("--- START: testAppKilledKilled ---");
@ -616,6 +705,13 @@ public class TestRMAppTransitions {
rmDispatcher.await(); rmDispatcher.await();
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application); assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.APP_SAVED
event = new RMAppStoredEvent(application.getApplicationId(), null);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
} }
@Test @Test