YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart. Contributed by Varun Saxena

(cherry picked from commit cf23f2c2b5)
This commit is contained in:
Jian He 2015-10-15 17:12:36 -07:00
parent 6774a1732a
commit 2862057f11
25 changed files with 397 additions and 380 deletions

View File

@ -893,6 +893,9 @@ Release 2.8.0 - UNRELEASED
YARN-4250. NPE in AppSchedulingInfo#isRequestLabelChanged. (Brahma Reddy Battula via rohithsharmaks) YARN-4250. NPE in AppSchedulingInfo#isRequestLabelChanged. (Brahma Reddy Battula via rohithsharmaks)
YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart.
(Varun Saxena via jianhe)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -141,7 +141,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
@ -675,11 +676,8 @@ public class ClientRMService extends AbstractService implements
} }
} }
this.rmContext this.rmContext.getDispatcher().getEventHandler().handle(
.getDispatcher() new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.FAIL,
.getEventHandler()
.handle(
new RMAppAttemptFailedEvent(attemptId,
"Attempt failed by user.")); "Attempt failed by user."));
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
@ -734,8 +732,9 @@ public class ClientRMService extends AbstractService implements
return KillApplicationResponse.newInstance(true); return KillApplicationResponse.newInstance(true);
} }
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler().handle(
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL)); new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed by user."));
// For UnmanagedAMs, return true so they don't retry // For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance( return KillApplicationResponse.newInstance(

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@ -304,7 +303,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// scheduler about the existence of the application // scheduler about the existence of the application
assert application.getState() == RMAppState.NEW; assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage())); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, e.getMessage()));
throw RPCUtil.getRemoteException(e); throw RPCUtil.getRemoteException(e);
} }
} }

View File

@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -260,8 +259,8 @@ public class AMLauncher implements Runnable {
String message = "Error launching " + application.getAppAttemptId() String message = "Error launching " + application.getAppAttemptId()
+ ". Got exception: " + StringUtils.stringifyException(ie); + ". Got exception: " + StringUtils.stringifyException(ie);
LOG.info(message); LOG.info(message);
handler.handle(new RMAppAttemptLaunchFailedEvent(application handler.handle(new RMAppAttemptEvent(application
.getAppAttemptId(), message)); .getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message));
} }
break; break;
case CLEANUP: case CLEANUP:

View File

@ -24,13 +24,24 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
public class RMAppEvent extends AbstractEvent<RMAppEventType>{ public class RMAppEvent extends AbstractEvent<RMAppEventType>{
private final ApplicationId appId; private final ApplicationId appId;
private final String diagnosticMsg;
public RMAppEvent(ApplicationId appId, RMAppEventType type) { public RMAppEvent(ApplicationId appId, RMAppEventType type) {
this(appId, type, "");
}
public RMAppEvent(ApplicationId appId, RMAppEventType type,
String diagnostic) {
super(type); super(type);
this.appId = appId; this.appId = appId;
this.diagnosticMsg = diagnostic;
} }
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
return this.appId; return this.appId;
} }
public String getDiagnosticMsg() {
return this.diagnosticMsg;
}
} }

View File

@ -22,20 +22,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
public class RMAppFailedAttemptEvent extends RMAppEvent { public class RMAppFailedAttemptEvent extends RMAppEvent {
private final String diagnostics;
private final boolean transferStateFromPreviousAttempt; private final boolean transferStateFromPreviousAttempt;
public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event, public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event,
String diagnostics, boolean transferStateFromPreviousAttempt) { String diagnostics, boolean transferStateFromPreviousAttempt) {
super(appId, event); super(appId, event, diagnostics);
this.diagnostics = diagnostics;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
} }
public String getDiagnostics() {
return this.diagnostics;
}
public boolean getTransferStateFromPreviousAttempt() { public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt; return transferStateFromPreviousAttempt;
} }

View File

@ -1,35 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class RMAppFinishedAttemptEvent extends RMAppEvent {
private final String diagnostics;
public RMAppFinishedAttemptEvent(ApplicationId appId, String diagnostics) {
super(appId, RMAppEventType.ATTEMPT_FINISHED);
this.diagnostics = diagnostics;
}
public String getDiagnostics() {
return this.diagnostics;
}
}

View File

@ -1046,7 +1046,7 @@ public class RMAppImpl implements RMApp, Recoverable {
if (this.submissionContext.getUnmanagedAM()) { if (this.submissionContext.getUnmanagedAM()) {
// RM does not manage the AM. Do not retry // RM does not manage the AM. Do not retry
msg = "Unmanaged application " + this.getApplicationId() msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics() + " failed due to " + failedEvent.getDiagnosticMsg()
+ ". Failing the application."; + ". Failing the application.";
} else if (this.isNumAttemptsBeyondThreshold) { } else if (this.isNumAttemptsBeyondThreshold) {
int globalLimit = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, int globalLimit = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@ -1061,7 +1061,7 @@ public class RMAppImpl implements RMApp, Recoverable {
(globalLimit == maxAppAttempts) ? "" (globalLimit == maxAppAttempts) ? ""
: (" (global limit =" + globalLimit : (" (global limit =" + globalLimit
+ "; local limit is =" + maxAppAttempts + ")"), + "; local limit is =" + maxAppAttempts + ")"),
failedEvent.getDiagnostics()); failedEvent.getDiagnosticMsg());
} }
return msg; return msg;
} }
@ -1102,21 +1102,14 @@ public class RMAppImpl implements RMApp, Recoverable {
String diags = null; String diags = null;
switch (event.getType()) { switch (event.getType()) {
case APP_REJECTED: case APP_REJECTED:
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event;
diags = rejectedEvent.getMessage();
break;
case ATTEMPT_FINISHED: case ATTEMPT_FINISHED:
RMAppFinishedAttemptEvent finishedEvent = case ATTEMPT_KILLED:
(RMAppFinishedAttemptEvent) event; diags = event.getDiagnosticMsg();
diags = finishedEvent.getDiagnostics();
break; break;
case ATTEMPT_FAILED: case ATTEMPT_FAILED:
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
diags = getAppAttemptFailedDiagnostics(failedEvent); diags = getAppAttemptFailedDiagnostics(failedEvent);
break; break;
case ATTEMPT_KILLED:
diags = getAppKilledDiagnostics();
break;
default: default:
break; break;
} }
@ -1164,9 +1157,7 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppFinishedAttemptEvent finishedEvent = app.diagnostics.append(event.getDiagnosticMsg());
(RMAppFinishedAttemptEvent)event;
app.diagnostics.append(finishedEvent.getDiagnostics());
super.transition(app, event); super.transition(app, event);
}; };
} }
@ -1212,21 +1203,21 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append(getAppKilledDiagnostics()); app.diagnostics.append(event.getDiagnosticMsg());
super.transition(app, event); super.transition(app, event);
}; };
} }
private static String getAppKilledDiagnostics() {
return "Application killed by user.";
}
private static class KillAttemptTransition extends RMAppTransition { private static class KillAttemptTransition extends RMAppTransition {
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.stateBeforeKilling = app.getState(); app.stateBeforeKilling = app.getState();
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt // Forward app kill diagnostics in the event to kill app attempt.
.getAppAttemptId(), RMAppAttemptEventType.KILL)); // These diagnostics will be returned back in ATTEMPT_KILLED event sent by
// RMAppAttemptImpl.
app.handler.handle(
new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL, event.getDiagnosticMsg()));
} }
} }
@ -1237,8 +1228,7 @@ public class RMAppImpl implements RMApp, Recoverable {
} }
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event; app.diagnostics.append(event.getDiagnosticMsg());
app.diagnostics.append(rejectedEvent.getMessage());
super.transition(app, event); super.transition(app, event);
}; };
} }

View File

@ -1,35 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class RMAppRejectedEvent extends RMAppEvent {
private final String message;
public RMAppRejectedEvent(ApplicationId appId, String message) {
super(appId, RMAppEventType.APP_REJECTED);
this.message = message;
}
public String getMessage() {
return this.message;
}
}

View File

@ -24,14 +24,25 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
public class RMAppAttemptEvent extends AbstractEvent<RMAppAttemptEventType> { public class RMAppAttemptEvent extends AbstractEvent<RMAppAttemptEventType> {
private final ApplicationAttemptId appAttemptId; private final ApplicationAttemptId appAttemptId;
private final String diagnosticMsg;
public RMAppAttemptEvent(ApplicationAttemptId appAttemptId, public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
RMAppAttemptEventType type) { RMAppAttemptEventType type) {
this(appAttemptId, type, "");
}
public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
RMAppAttemptEventType type, String diagnostics) {
super(type); super(type);
this.appAttemptId = appAttemptId; this.appAttemptId = appAttemptId;
this.diagnosticMsg = diagnostics;
} }
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return this.appAttemptId; return this.appAttemptId;
} }
public String getDiagnosticMsg() {
return diagnosticMsg;
}
} }

View File

@ -82,12 +82,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@ -1085,8 +1081,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
LOG.warn("Interrupted while waiting to resend the" LOG.warn("Interrupted while waiting to resend the"
+ " ContainerAllocated Event."); + " ContainerAllocated Event.");
} }
appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( appAttempt.eventHandler.handle(
appAttempt.applicationAttemptId)); new RMAppAttemptEvent(appAttempt.applicationAttemptId,
RMAppAttemptEventType.CONTAINER_ALLOCATED));
} }
}.start(); }.start();
} }
@ -1195,9 +1192,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
int exitStatus = ContainerExitStatus.INVALID; int exitStatus = ContainerExitStatus.INVALID;
switch (event.getType()) { switch (event.getType()) {
case LAUNCH_FAILED: case LAUNCH_FAILED:
RMAppAttemptLaunchFailedEvent launchFaileEvent = diags = event.getDiagnosticMsg();
(RMAppAttemptLaunchFailedEvent) event;
diags = launchFaileEvent.getMessage();
break; break;
case REGISTERED: case REGISTERED:
diags = getUnexpectedAMRegisteredDiagnostics(); diags = getUnexpectedAMRegisteredDiagnostics();
@ -1205,7 +1200,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
case UNREGISTERED: case UNREGISTERED:
RMAppAttemptUnregistrationEvent unregisterEvent = RMAppAttemptUnregistrationEvent unregisterEvent =
(RMAppAttemptUnregistrationEvent) event; (RMAppAttemptUnregistrationEvent) event;
diags = unregisterEvent.getDiagnostics(); diags = unregisterEvent.getDiagnosticMsg();
// reset finalTrackingUrl to url sent by am // reset finalTrackingUrl to url sent by am
finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl()); finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
finalStatus = unregisterEvent.getFinalApplicationStatus(); finalStatus = unregisterEvent.getFinalApplicationStatus();
@ -1219,9 +1214,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
case KILL: case KILL:
break; break;
case FAIL: case FAIL:
RMAppAttemptFailedEvent failEvent = diags = event.getDiagnosticMsg();
(RMAppAttemptFailedEvent) event;
diags = failEvent.getDiagnostics();
break; break;
case EXPIRE: case EXPIRE:
diags = getAMExpiredDiagnostics(event); diags = getAMExpiredDiagnostics(event);
@ -1309,17 +1302,19 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
switch (finalAttemptState) { switch (finalAttemptState) {
case FINISHED: case FINISHED:
{ {
appEvent = new RMAppFinishedAttemptEvent(applicationId, appEvent =
new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHED,
appAttempt.getDiagnostics()); appAttempt.getDiagnostics());
} }
break; break;
case KILLED: case KILLED:
{ {
appAttempt.invalidateAMHostAndPort(); appAttempt.invalidateAMHostAndPort();
// Forward diagnostics received in attempt kill event.
appEvent = appEvent =
new RMAppFailedAttemptEvent(applicationId, new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.ATTEMPT_KILLED,
"Application killed by user.", false); event.getDiagnosticMsg(), false);
} }
break; break;
case FAILED: case FAILED:
@ -1377,9 +1372,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override @Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
RMAppAttemptFailedEvent failedEvent = (RMAppAttemptFailedEvent) event; if (event.getDiagnosticMsg() != null) {
if (failedEvent.getDiagnostics() != null) { appAttempt.diagnostics.append(event.getDiagnosticMsg());
appAttempt.diagnostics.append(failedEvent.getDiagnostics());
} }
super.transition(appAttempt, event); super.transition(appAttempt, event);
} }
@ -1451,9 +1445,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEvent event) { RMAppAttemptEvent event) {
// Use diagnostic from launcher // Use diagnostic from launcher
RMAppAttemptLaunchFailedEvent launchFaileEvent appAttempt.diagnostics.append(event.getDiagnosticMsg());
= (RMAppAttemptLaunchFailedEvent) event;
appAttempt.diagnostics.append(launchFaileEvent.getMessage());
// Tell the app, scheduler // Tell the app, scheduler
super.transition(appAttempt, event); super.transition(appAttempt, event);
@ -1708,7 +1700,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
progress = 1.0f; progress = 1.0f;
RMAppAttemptUnregistrationEvent unregisterEvent = RMAppAttemptUnregistrationEvent unregisterEvent =
(RMAppAttemptUnregistrationEvent) event; (RMAppAttemptUnregistrationEvent) event;
diagnostics.append(unregisterEvent.getDiagnostics()); diagnostics.append(unregisterEvent.getDiagnosticMsg());
originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl()); originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
finalStatus = unregisterEvent.getFinalApplicationStatus(); finalStatus = unregisterEvent.getFinalApplicationStatus();
} }

View File

@ -1,31 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent {
public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED);
}
}

View File

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptFailedEvent extends RMAppAttemptEvent {
private final String diagnostics;
public RMAppAttemptFailedEvent(ApplicationAttemptId appAttemptId,
String diagnostics) {
super(appAttemptId, RMAppAttemptEventType.FAIL);
this.diagnostics = diagnostics;
}
public String getDiagnostics() {
return this.diagnostics;
}
}

View File

@ -1,38 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptLaunchFailedEvent extends RMAppAttemptEvent {
private final String message;
public RMAppAttemptLaunchFailedEvent(ApplicationAttemptId appAttemptId,
String message) {
super(appAttemptId, RMAppAttemptEventType.LAUNCH_FAILED);
this.message = message;
}
public String getMessage() {
return this.message;
}
}

View File

@ -27,14 +27,13 @@ public class RMAppAttemptUnregistrationEvent extends RMAppAttemptEvent {
private final String finalTrackingUrl; private final String finalTrackingUrl;
private final FinalApplicationStatus finalStatus; private final FinalApplicationStatus finalStatus;
private final String diagnostics;
public RMAppAttemptUnregistrationEvent(ApplicationAttemptId appAttemptId, public RMAppAttemptUnregistrationEvent(ApplicationAttemptId appAttemptId,
String trackingUrl, FinalApplicationStatus finalStatus, String diagnostics) { String trackingUrl, FinalApplicationStatus finalStatus,
super(appAttemptId, RMAppAttemptEventType.UNREGISTERED); String diagnostics) {
super(appAttemptId, RMAppAttemptEventType.UNREGISTERED, diagnostics);
this.finalTrackingUrl = trackingUrl; this.finalTrackingUrl = trackingUrl;
this.finalStatus = finalStatus; this.finalStatus = finalStatus;
this.diagnostics = diagnostics;
} }
public String getFinalTrackingUrl() { public String getFinalTrackingUrl() {
@ -45,8 +44,4 @@ public class RMAppAttemptUnregistrationEvent extends RMAppAttemptEvent {
return this.finalStatus; return this.finalStatus;
} }
public String getDiagnostics() {
return this.diagnostics;
}
} }

View File

@ -45,7 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
@ -511,8 +512,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
@Override @Override
public void transition(RMContainerImpl container, RMContainerEvent event) { public void transition(RMContainerImpl container, RMContainerEvent event) {
container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( container.eventHandler.handle(new RMAppAttemptEvent(
container.appAttemptId)); container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
} }
} }

View File

@ -651,7 +651,9 @@ public abstract class AbstractYarnScheduler
this.rmContext this.rmContext
.getDispatcher() .getDispatcher()
.getEventHandler() .getEventHandler()
.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
"Application killed due to expiry of reservation queue " +
queueName + "."));
} }
} }

View File

@ -22,11 +22,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@Private @Private
public class QueueNotFoundException extends YarnRuntimeException { public class QueueInvalidException extends YarnRuntimeException {
private static final long serialVersionUID = 187239430L; private static final long serialVersionUID = 187239430L;
public QueueNotFoundException(String message) { public QueueInvalidException(String message) {
super(message); super(message);
} }
} }

View File

@ -80,7 +80,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationCons
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -97,8 +96,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@ -666,47 +665,97 @@ public class CapacityScheduler extends
return queues.get(queueName); return queues.get(queueName);
} }
private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplicationOnRecovery(
String queueName, String user, boolean isAppRecovering, Priority priority) { ApplicationId applicationId, String queueName, String user,
// sanity checks. Priority priority) {
CSQueue queue = getQueue(queueName); CSQueue queue = getQueue(queueName);
if (queue == null) { if (queue == null) {
//During a restart, this indicates a queue was removed, which is //During a restart, this indicates a queue was removed, which is
//not presently supported //not presently supported
if (isAppRecovering) { if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed on recovery as it was submitted to queue " +
queueName + " which no longer exists after restart."));
return;
} else {
String queueErrorMsg = "Queue named " + queueName String queueErrorMsg = "Queue named " + queueName
+ " missing during application recovery." + " missing during application recovery."
+ " Queue removal during recovery is not presently supported by the" + " Queue removal during recovery is not presently supported by the"
+ " capacity scheduler, please restart with all queues configured" + " capacity scheduler, please restart with all queues configured"
+ " which were present before shutdown/restart."; + " which were present before shutdown/restart.";
LOG.fatal(queueErrorMsg); LOG.fatal(queueErrorMsg);
throw new QueueNotFoundException(queueErrorMsg); throw new QueueInvalidException(queueErrorMsg);
} }
}
if (!(queue instanceof LeafQueue)) {
// During RM restart, this means leaf queue was converted to a parent
// queue, which is not supported for running apps.
if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed on recovery as it was submitted to queue " +
queueName + " which is no longer a leaf queue after restart."));
return;
} else {
String queueErrorMsg = "Queue named " + queueName
+ " is no longer a leaf queue during application recovery."
+ " Changing a leaf queue to a parent queue during recovery is"
+ " not presently supported by the capacity scheduler. Please"
+ " restart with leaf queues before shutdown/restart continuing"
+ " as leaf queues.";
LOG.fatal(queueErrorMsg);
throw new QueueInvalidException(queueErrorMsg);
}
}
// Submit to the queue
try {
queue.submitApplication(applicationId, user, queueName);
} catch (AccessControlException ace) {
// Ignore the exception for recovered app as the app was previously
// accepted.
}
queue.getMetrics().submitApp(user);
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
}
private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user, Priority priority) {
// Sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId + String message = "Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName; " submitted by user " + user + " to unknown queue: " + queueName;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return; return;
} }
if (!(queue instanceof LeafQueue)) { if (!(queue instanceof LeafQueue)) {
String message = "Application " + applicationId + String message = "Application " + applicationId +
" submitted by user " + user + " to non-leaf queue: " + queueName; " submitted by user " + user + " to non-leaf queue: " + queueName;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return; return;
} }
// Submit to the queue // Submit to the queue
try { try {
queue.submitApplication(applicationId, user, queueName); queue.submitApplication(applicationId, user, queueName);
} catch (AccessControlException ace) { } catch (AccessControlException ace) {
// Ignore the exception for recovered app as the app was previously accepted LOG.info("Failed to submit application " + applicationId + " to queue "
if (!isAppRecovering) { + queueName + " from user " + user, ace);
LOG.info("Failed to submit application " + applicationId + " to queue " this.rmContext.getDispatcher().getEventHandler()
+ queueName + " from user " + user, ace); .handle(new RMAppEvent(applicationId,
this.rmContext.getDispatcher().getEventHandler() RMAppEventType.APP_REJECTED, ace.toString()));
.handle(new RMAppRejectedEvent(applicationId, ace.toString())); return;
return;
}
} }
// update the metrics // update the metrics
queue.getMetrics().submitApp(user); queue.getMetrics().submitApp(user);
@ -715,14 +764,8 @@ public class CapacityScheduler extends
applications.put(applicationId, application); applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName); + ", in queue: " + queueName);
if (isAppRecovering) { rmContext.getDispatcher().getEventHandler()
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
} }
private synchronized void addApplicationAttempt( private synchronized void addApplicationAttempt(
@ -731,6 +774,11 @@ public class CapacityScheduler extends
boolean isAttemptRecovering) { boolean isAttemptRecovering) {
SchedulerApplication<FiCaSchedulerApp> application = SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());
if (application == null) {
LOG.warn("Application " + applicationAttemptId.getApplicationId() +
" cannot be found in scheduler.");
return;
}
CSQueue queue = (CSQueue) application.getQueue(); CSQueue queue = (CSQueue) application.getQueue();
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
@ -1277,11 +1325,13 @@ public class CapacityScheduler extends
appAddedEvent.getApplicationId(), appAddedEvent.getApplicationId(),
appAddedEvent.getReservationID()); appAddedEvent.getReservationID());
if (queueName != null) { if (queueName != null) {
addApplication(appAddedEvent.getApplicationId(), if (!appAddedEvent.getIsAppRecovering()) {
queueName, addApplication(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
appAddedEvent.getIsAppRecovering(), } else {
appAddedEvent.getApplicatonPriority()); addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
}
} }
} }
break; break;
@ -1631,7 +1681,8 @@ public class CapacityScheduler extends
+ " submitted to a reservation which is not yet currently active: " + " submitted to a reservation which is not yet currently active: "
+ resQName; + resQName;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return null; return null;
} }
if (!queue.getParent().getQueueName().equals(queueName)) { if (!queue.getParent().getQueueName().equals(queueName)) {
@ -1640,7 +1691,8 @@ public class CapacityScheduler extends
+ resQName + " which does not belong to the specified queue: " + resQName + " which does not belong to the specified queue: "
+ queueName; + queueName;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return null; return null;
} }
// use the reservation queue to run the app // use the reservation queue to run the app

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -614,7 +613,8 @@ public class FairScheduler extends
" submitted by user " + user + " with an empty queue name."; " submitted by user " + user + " with an empty queue name.";
LOG.info(message); LOG.info(message);
rmContext.getDispatcher().getEventHandler() rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return; return;
} }
@ -625,7 +625,8 @@ public class FairScheduler extends
+ "The queue name cannot start/end with period."; + "The queue name cannot start/end with period.";
LOG.info(message); LOG.info(message);
rmContext.getDispatcher().getEventHandler() rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return; return;
} }
@ -644,7 +645,8 @@ public class FairScheduler extends
" cannot submit applications to queue " + queue.getName(); " cannot submit applications to queue " + queue.getName();
LOG.info(msg); LOG.info(msg);
rmContext.getDispatcher().getEventHandler() rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, msg)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, msg));
return; return;
} }
@ -742,7 +744,8 @@ public class FairScheduler extends
if (appRejectMsg != null && rmApp != null) { if (appRejectMsg != null && rmApp != null) {
LOG.error(appRejectMsg); LOG.error(appRejectMsg);
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(rmApp.getApplicationId(), appRejectMsg)); new RMAppEvent(rmApp.getApplicationId(),
RMAppEventType.APP_REJECTED, appRejectMsg));
return null; return null;
} }
@ -1302,7 +1305,8 @@ public class FairScheduler extends
+ " submitted to a reservation which is not yet currently active: " + " submitted to a reservation which is not yet currently active: "
+ resQName; + resQName;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return null; return null;
} }
if (!queue.getParent().getQueueName().equals(queueName)) { if (!queue.getParent().getQueueName().equals(queueName)) {
@ -1311,7 +1315,8 @@ public class FairScheduler extends
+ resQName + " which does not belong to the specified queue: " + resQName + " which does not belong to the specified queue: "
+ queueName; + queueName;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message)); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
return null; return null;
} }
// use the reservation queue to run the app // use the reservation queue to run the app

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -872,7 +871,8 @@ public class DelegationTokenRenewer extends AbstractService {
// RMApp is in NEW state and thus we havne't yet informed the // RMApp is in NEW state and thus we havne't yet informed the
// Scheduler about the existence of the application // Scheduler about the existence of the application
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(event.getApplicationId(), t.getMessage())); new RMAppEvent(event.getApplicationId(),
RMAppEventType.APP_REJECTED, t.getMessage()));
} }
} }
} }

View File

@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -622,7 +621,8 @@ public class MockRM extends ResourceManager {
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
am.waitForState(RMAppAttemptState.ALLOCATED); am.waitForState(RMAppAttemptState.ALLOCATED);
getRMContext().getDispatcher().getEventHandler() getRMContext().getDispatcher().getEventHandler()
.handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed")); .handle(new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.LAUNCH_FAILED, "Failed"));
} }
@Override @Override

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -39,16 +41,22 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -56,8 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -86,6 +94,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mortbay.log.Log;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@ -361,6 +370,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
private static final String R = "Default"; private static final String R = "Default";
private static final String A = "QueueA"; private static final String A = "QueueA";
private static final String B = "QueueB"; private static final String B = "QueueB";
private static final String B1 = "QueueB1";
private static final String B2 = "QueueB2";
//don't ever create the below queue ;-) //don't ever create the below queue ;-)
private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue"; private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue";
private static final String USER_1 = "user1"; private static final String USER_1 = "user1";
@ -391,6 +402,24 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f); .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f);
} }
private void setupQueueConfigurationChildOfB(CapacitySchedulerConfiguration conf) {
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
conf.setCapacity(Q_R, 100);
final String Q_A = Q_R + "." + A;
final String Q_B = Q_R + "." + B;
final String Q_B1 = Q_B + "." + B1;
final String Q_B2 = Q_B + "." + B2;
conf.setQueues(Q_R, new String[] {A, B});
conf.setCapacity(Q_A, 50);
conf.setCapacity(Q_B, 50);
conf.setQueues(Q_B, new String[] {B1, B2});
conf.setCapacity(Q_B1, 50);
conf.setCapacity(Q_B2, 50);
conf.setDouble(CapacitySchedulerConfiguration
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
}
// Test CS recovery with multi-level queues and multi-users: // Test CS recovery with multi-level queues and multi-users:
// 1. setup 2 NMs each with 8GB memory; // 1. setup 2 NMs each with 8GB memory;
// 2. setup 2 level queues: Default -> (QueueA, QueueB) // 2. setup 2 level queues: Default -> (QueueA, QueueB)
@ -514,17 +543,105 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
totalUsedResource.getVirtualCores()); totalUsedResource.getVirtualCores());
} }
//Test that we receive a meaningful exit-causing exception if a queue private void verifyAppRecoveryWithWrongQueueConfig(
//is removed during recovery CapacitySchedulerConfiguration csConf, RMApp app, String diagnostics,
MemoryRMStateStore memStore, RMState state) throws Exception {
// Restart RM with fail-fast as false. App should be killed.
csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, false);
rm2 = new MockRM(csConf, memStore);
rm2.start();
// Wait for app to be killed.
rm2.waitForState(app.getApplicationId(), RMAppState.KILLED);
ApplicationReport report = rm2.getApplicationReport(app.getApplicationId());
assertEquals(report.getFinalApplicationStatus(),
FinalApplicationStatus.KILLED);
assertEquals(report.getYarnApplicationState(), YarnApplicationState.KILLED);
assertEquals(report.getDiagnostics(), diagnostics);
// Remove updated app info(app being KILLED) from state store and reinstate
// state store to previous state i.e. which indicates app is RUNNING.
// This is to simulate app recovery with fail fast config as true.
for(Map.Entry<ApplicationId, ApplicationStateData> entry :
state.getApplicationState().entrySet()) {
ApplicationStateData appState = mock(ApplicationStateData.class);
ApplicationSubmissionContext ctxt =
mock(ApplicationSubmissionContext.class);
when(appState.getApplicationSubmissionContext()).thenReturn(ctxt);
when(ctxt.getApplicationId()).thenReturn(entry.getKey());
memStore.removeApplicationStateInternal(appState);
memStore.storeApplicationStateInternal(
entry.getKey(), entry.getValue());
}
// Now restart RM with fail-fast as true. QueueException should be thrown.
csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true);
MockRM rm = new MockRM(csConf, memStore);
try {
rm.start();
Assert.fail("QueueException must have been thrown");
} catch (QueueInvalidException e) {
} finally {
rm.close();
}
}
//Test behavior of an app if queue is changed from leaf to parent during
//recovery. Test case does following:
//1. Add an app to QueueB and start the attempt.
//2. Add 2 subqueues(QueueB1 and QueueB2) to QueueB, restart the RM, once with
// fail fast config as false and once with fail fast as true.
//3. Verify that app was killed if fail fast is false.
//4. Verify that QueueException was thrown if fail fast is true.
@Test (timeout = 30000)
public void testCapacityLeafQueueBecomesParentOnRecovery() throws Exception {
if (getSchedulerType() != SchedulerType.CAPACITY) {
return;
}
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfiguration(csConf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(csConf);
rm1 = new MockRM(csConf, memStore);
rm1.start();
MockNM nm =
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
nm.registerNode();
// Submit an app to QueueB.
RMApp app = rm1.submitApp(1024, "app", USER_2, null, B);
MockRM.launchAndRegisterAM(app, rm1, nm);
assertEquals(rm1.getApplicationReport(app.getApplicationId()).
getYarnApplicationState(), YarnApplicationState.RUNNING);
// Take a copy of state store so that it can be reset to this state.
RMState state = memStore.loadState();
// Change scheduler config with child queues added to QueueB.
csConf = new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationChildOfB(csConf);
String diags = "Application killed on recovery as it was submitted to " +
"queue QueueB which is no longer a leaf queue after restart.";
verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags, memStore, state);
}
//Test behavior of an app if queue is removed during recovery. Test case does
//following:
//1. Add some apps to two queues, attempt to add an app to a non-existant //1. Add some apps to two queues, attempt to add an app to a non-existant
// queue to verify that the new logic is not in effect during normal app // queue to verify that the new logic is not in effect during normal app
// submission // submission
//2. Remove one of the queues, restart the RM //2. Remove one of the queues, restart the RM, once with fail fast config as
//3. Verify that the expected exception was thrown // false and once with fail fast as true.
@Test (timeout = 30000, expected = QueueNotFoundException.class) //3. Verify that app was killed if fail fast is false.
//4. Verify that QueueException was thrown if fail fast is true.
@Test (timeout = 30000)
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
if (getSchedulerType() != SchedulerType.CAPACITY) { if (getSchedulerType() != SchedulerType.CAPACITY) {
throw new QueueNotFoundException("Dummy"); return;
} }
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
@ -549,6 +666,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B); RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
assertEquals(rm1.getApplicationReport(app2.getApplicationId()).
getYarnApplicationState(), YarnApplicationState.RUNNING);
//Submit an app with a non existant queue to make sure it does not //Submit an app with a non existant queue to make sure it does not
//cause a fatal failure in the non-recovery case //cause a fatal failure in the non-recovery case
@ -560,12 +679,16 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
rm1.clearQueueMetrics(app1_2); rm1.clearQueueMetrics(app1_2);
rm1.clearQueueMetrics(app2); rm1.clearQueueMetrics(app2);
// Re-start RM // Take a copy of state store so that it can be reset to this state.
csConf = RMState state = memStore.loadState();
new CapacitySchedulerConfiguration(conf);
// Set new configuration with QueueB removed.
csConf = new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationOnlyA(csConf); setupQueueConfigurationOnlyA(csConf);
rm2 = new MockRM(csConf, memStore);
rm2.start(); String diags = "Application killed on recovery as it was submitted to " +
"queue QueueB which no longer exists after restart.";
verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags, memStore, state);
} }
private void checkParentQueue(ParentQueue parentQueue, int numContainers, private void checkParentQueue(ParentQueue parentQueue, int numContainers,

View File

@ -473,8 +473,8 @@ public class TestRMAppTransitions {
application = testCreateAppFinishing(submissionContext); application = testCreateAppFinishing(submissionContext);
} }
// RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED // RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( RMAppEvent finishedEvent = new RMAppEvent(application.getApplicationId(),
application.getApplicationId(), diagnostics); RMAppEventType.ATTEMPT_FINISHED, diagnostics);
application.handle(finishedEvent); application.handle(finishedEvent);
assertAppState(RMAppState.FINISHED, application); assertAppState(RMAppState.FINISHED, application);
assertTimesAtFinish(application); assertTimesAtFinish(application);
@ -549,7 +549,8 @@ public class TestRMAppTransitions {
RMApp application = createNewTestApp(null); RMApp application = createNewTestApp(null);
// NEW => KILLED event RMAppEventType.KILL // NEW => KILLED event RMAppEventType.KILL
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -566,8 +567,8 @@ public class TestRMAppTransitions {
RMApp application = createNewTestApp(null); RMApp application = createNewTestApp(null);
// NEW => FAILED event RMAppEventType.APP_REJECTED // NEW => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "Test Application Rejected"; String rejectedText = "Test Application Rejected";
RMAppEvent event = RMAppEvent event = new RMAppEvent(application.getApplicationId(),
new RMAppRejectedEvent(application.getApplicationId(), rejectedText); RMAppEventType.APP_REJECTED, rejectedText);
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -583,8 +584,8 @@ public class TestRMAppTransitions {
RMApp application = createNewTestApp(null); RMApp application = createNewTestApp(null);
// NEW => FAILED event RMAppEventType.APP_REJECTED // NEW => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "Test Application Rejected"; String rejectedText = "Test Application Rejected";
RMAppEvent event = RMAppEvent event = new RMAppEvent(application.getApplicationId(),
new RMAppRejectedEvent(application.getApplicationId(), rejectedText); RMAppEventType.APP_REJECTED, rejectedText);
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -601,7 +602,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppNewSaving(null); RMApp application = testCreateAppNewSaving(null);
// NEW_SAVING => KILLED event RMAppEventType.KILL // NEW_SAVING => KILLED event RMAppEventType.KILL
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -617,8 +619,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppNewSaving(null); RMApp application = testCreateAppNewSaving(null);
// NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "Test Application Rejected"; String rejectedText = "Test Application Rejected";
RMAppEvent event = RMAppEvent event = new RMAppEvent(application.getApplicationId(),
new RMAppRejectedEvent(application.getApplicationId(), rejectedText); RMAppEventType.APP_REJECTED, rejectedText);
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -634,8 +636,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppSubmittedNoRecovery(null); RMApp application = testCreateAppSubmittedNoRecovery(null);
// SUBMITTED => FAILED event RMAppEventType.APP_REJECTED // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "app rejected"; String rejectedText = "app rejected";
RMAppEvent event = RMAppEvent event = new RMAppEvent(application.getApplicationId(),
new RMAppRejectedEvent(application.getApplicationId(), rejectedText); RMAppEventType.APP_REJECTED, rejectedText);
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -649,8 +651,9 @@ public class TestRMAppTransitions {
LOG.info("--- START: testAppSubmittedKill---"); LOG.info("--- START: testAppSubmittedKill---");
RMApp application = testCreateAppSubmittedNoRecovery(null); RMApp application = testCreateAppSubmittedNoRecovery(null);
// SUBMITTED => KILLED event RMAppEventType.KILL // SUBMITTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEvent event =
RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -700,15 +703,16 @@ public class TestRMAppTransitions {
LOG.info("--- START: testAppAcceptedKill ---"); LOG.info("--- START: testAppAcceptedKill ---");
RMApp application = testCreateAppAccepted(null); RMApp application = testCreateAppAccepted(null);
// ACCEPTED => KILLED event RMAppEventType.KILL // ACCEPTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEvent event =
RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertAppState(RMAppState.KILLING, application); assertAppState(RMAppState.KILLING, application);
RMAppEvent appAttemptKilled = RMAppEvent appAttemptKilled =
new RMAppEvent(application.getApplicationId(), new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_KILLED); RMAppEventType.ATTEMPT_KILLED, "Application killed by user.");
application.handle(appAttemptKilled); application.handle(appAttemptKilled);
assertAppState(RMAppState.FINAL_SAVING, application); assertAppState(RMAppState.FINAL_SAVING, application);
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -729,7 +733,7 @@ public class TestRMAppTransitions {
// RUNNING. // RUNNING.
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_KILLED); RMAppEventType.ATTEMPT_KILLED, "Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
@ -748,7 +752,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppRunning(null); RMApp application = testCreateAppRunning(null);
// RUNNING => KILLED event RMAppEventType.KILL // RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
@ -806,7 +811,9 @@ public class TestRMAppTransitions {
assertAppFinalStateSaved(application); assertAppFinalStateSaved(application);
// FAILED => FAILED event RMAppEventType.KILL // FAILED => FAILED event RMAppEventType.KILL
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertFailed(application, ".*Failing the application.*"); assertFailed(application, ".*Failing the application.*");
@ -821,7 +828,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppFinishing(null); RMApp application = testCreateAppFinishing(null);
// FINISHING => FINISHED event RMAppEventType.KILL // FINISHING => FINISHED event RMAppEventType.KILL
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertAppState(RMAppState.FINISHING, application); assertAppState(RMAppState.FINISHING, application);
@ -838,8 +846,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppFinalSaving(null); RMApp application = testCreateAppFinalSaving(null);
final String diagMsg = "some diagnostics"; final String diagMsg = "some diagnostics";
// attempt_finished event comes before attempt_saved event // attempt_finished event comes before attempt_saved event
RMAppEvent event = RMAppEvent event = new RMAppEvent(application.getApplicationId(),
new RMAppFinishedAttemptEvent(application.getApplicationId(), diagMsg); RMAppEventType.ATTEMPT_FINISHED, diagMsg);
application.handle(event); application.handle(event);
assertAppState(RMAppState.FINAL_SAVING, application); assertAppState(RMAppState.FINAL_SAVING, application);
RMAppEvent appUpdated = RMAppEvent appUpdated =
@ -861,7 +869,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppFinished(null, ""); RMApp application = testCreateAppFinished(null, "");
// FINISHED => FINISHED event RMAppEventType.KILL // FINISHED => FINISHED event RMAppEventType.KILL
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertTimesAtFinish(application); assertTimesAtFinish(application);
@ -879,8 +888,8 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppNewSaving(null); RMApp application = testCreateAppNewSaving(null);
// NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED
RMAppEvent event = RMAppEvent event = new RMAppEvent(application.getApplicationId(),
new RMAppRejectedEvent(application.getApplicationId(), ""); RMAppEventType.APP_REJECTED, "");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
@ -889,7 +898,8 @@ public class TestRMAppTransitions {
// FAILED => FAILED event RMAppEventType.KILL // FAILED => FAILED event RMAppEventType.KILL
event = event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertTimesAtFinish(application); assertTimesAtFinish(application);
@ -908,7 +918,8 @@ public class TestRMAppTransitions {
// RUNNING => KILLED event RMAppEventType.KILL // RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event = RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
sendAttemptUpdateSavedEvent(application); sendAttemptUpdateSavedEvent(application);
@ -917,8 +928,8 @@ public class TestRMAppTransitions {
assertAppState(RMAppState.KILLED, application); assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_FINISHED // KILLED => KILLED event RMAppEventType.ATTEMPT_FINISHED
event = new RMAppFinishedAttemptEvent( event = new RMAppEvent(application.getApplicationId(),
application.getApplicationId(), ""); RMAppEventType.ATTEMPT_FINISHED, "");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertTimesAtFinish(application); assertTimesAtFinish(application);
@ -935,7 +946,9 @@ public class TestRMAppTransitions {
// KILLED => KILLED event RMAppEventType.KILL // KILLED => KILLED event RMAppEventType.KILL
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
"Application killed by user.");
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
assertTimesAtFinish(application); assertTimesAtFinish(application);

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -86,12 +87,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@ -124,6 +121,7 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers; import org.mockito.Matchers;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -416,10 +414,16 @@ public class TestRMAppAttemptTransitions {
// Check events // Check events
verify(masterService). verify(masterService).
unregisterAttempt(applicationAttempt.getAppAttemptId()); unregisterAttempt(applicationAttempt.getAppAttemptId());
// ATTEMPT_FAILED should be notified to app if app attempt is submitted to
// this works for unmanaged and managed AM's because this is actually doing // failed state.
// verify(application).handle(anyObject()); ArgumentMatcher<RMAppEvent> matcher = new ArgumentMatcher<RMAppEvent>() {
verify(application).handle(any(RMAppRejectedEvent.class)); @Override
public boolean matches(Object o) {
RMAppEvent event = (RMAppEvent) o;
return event.getType() == RMAppEventType.ATTEMPT_FAILED;
}
};
verify(application).handle(argThat(matcher));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
@ -649,8 +653,8 @@ public class TestRMAppAttemptTransitions {
thenReturn(rmContainer); thenReturn(rmContainer);
applicationAttempt.handle( applicationAttempt.handle(
new RMAppAttemptContainerAllocatedEvent( new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
applicationAttempt.getAppAttemptId())); RMAppAttemptEventType.CONTAINER_ALLOCATED));
assertEquals(RMAppAttemptState.ALLOCATED_SAVING, assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
@ -906,9 +910,8 @@ public class TestRMAppAttemptTransitions {
Container amContainer = allocateApplicationAttempt(); Container amContainer = allocateApplicationAttempt();
String diagnostics = "Launch Failed"; String diagnostics = "Launch Failed";
applicationAttempt.handle( applicationAttempt.handle(
new RMAppAttemptLaunchFailedEvent( new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, diagnostics));
diagnostics));
assertEquals(YarnApplicationAttemptState.ALLOCATED, assertEquals(YarnApplicationAttemptState.ALLOCATED,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(amContainer, diagnostics); testAppAttemptFailedState(amContainer, diagnostics);
@ -927,8 +930,9 @@ public class TestRMAppAttemptTransitions {
// verify for both launched and launch_failed transitions in final_saving // verify for both launched and launch_failed transitions in final_saving
applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), RMAppAttemptEventType.LAUNCHED)); .getAppAttemptId(), RMAppAttemptEventType.LAUNCHED));
applicationAttempt.handle(new RMAppAttemptLaunchFailedEvent( applicationAttempt.handle(
applicationAttempt.getAppAttemptId(), "Launch Failed")); new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.LAUNCH_FAILED, "Launch Failed"));
assertEquals(RMAppAttemptState.FINAL_SAVING, assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
@ -938,8 +942,9 @@ public class TestRMAppAttemptTransitions {
// verify for both launched and launch_failed transitions in killed // verify for both launched and launch_failed transitions in killed
applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), RMAppAttemptEventType.LAUNCHED)); .getAppAttemptId(), RMAppAttemptEventType.LAUNCHED));
applicationAttempt.handle(new RMAppAttemptLaunchFailedEvent( applicationAttempt.handle(new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(), "Launch Failed")); applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.LAUNCH_FAILED, "Launch Failed"));
assertEquals(RMAppAttemptState.KILLED, assertEquals(RMAppAttemptState.KILLED,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
} }
@ -1546,8 +1551,8 @@ public class TestRMAppAttemptTransitions {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testNewToFailed() { public void testNewToFailed() {
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS)); .getAppAttemptId(), RMAppAttemptEventType.FAIL, FAILED_DIAGNOSTICS));
assertEquals(YarnApplicationAttemptState.NEW, assertEquals(YarnApplicationAttemptState.NEW,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(null, FAILED_DIAGNOSTICS); testAppAttemptFailedState(null, FAILED_DIAGNOSTICS);
@ -1557,8 +1562,8 @@ public class TestRMAppAttemptTransitions {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSubmittedToFailed() { public void testSubmittedToFailed() {
submitApplicationAttempt(); submitApplicationAttempt();
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS)); .getAppAttemptId(), RMAppAttemptEventType.FAIL, FAILED_DIAGNOSTICS));
assertEquals(YarnApplicationAttemptState.SUBMITTED, assertEquals(YarnApplicationAttemptState.SUBMITTED,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(null, FAILED_DIAGNOSTICS); testAppAttemptFailedState(null, FAILED_DIAGNOSTICS);
@ -1567,8 +1572,8 @@ public class TestRMAppAttemptTransitions {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testScheduledToFailed() { public void testScheduledToFailed() {
scheduleApplicationAttempt(); scheduleApplicationAttempt();
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS)); .getAppAttemptId(), RMAppAttemptEventType.FAIL, FAILED_DIAGNOSTICS));
assertEquals(YarnApplicationAttemptState.SCHEDULED, assertEquals(YarnApplicationAttemptState.SCHEDULED,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(null, FAILED_DIAGNOSTICS); testAppAttemptFailedState(null, FAILED_DIAGNOSTICS);
@ -1579,8 +1584,8 @@ public class TestRMAppAttemptTransitions {
Container amContainer = allocateApplicationAttempt(); Container amContainer = allocateApplicationAttempt();
assertEquals(YarnApplicationAttemptState.ALLOCATED, assertEquals(YarnApplicationAttemptState.ALLOCATED,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS)); .getAppAttemptId(), RMAppAttemptEventType.FAIL, FAILED_DIAGNOSTICS));
testAppAttemptFailedState(amContainer, FAILED_DIAGNOSTICS); testAppAttemptFailedState(amContainer, FAILED_DIAGNOSTICS);
} }
@ -1589,8 +1594,8 @@ public class TestRMAppAttemptTransitions {
Container amContainer = allocateApplicationAttempt(); Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer); launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS)); .getAppAttemptId(), RMAppAttemptEventType.FAIL, FAILED_DIAGNOSTICS));
assertEquals(RMAppAttemptState.FINAL_SAVING, assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());