diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 701d1e15ee4..05bc3d34972 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -86,6 +86,9 @@ Release 2.7.2 - UNRELEASED YARN-4281. 2.7 RM app page is broken (Chang Li via jlowe) + YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart. + (Varun Saxena via jianhe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 4118f07b741..6ef69cc010a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -643,8 +643,9 @@ public class ClientRMService extends AbstractService implements return KillApplicationResponse.newInstance(true); } - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.KILL)); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed by user.")); // For UnmanagedAMs, return true so they don't retry return KillApplicationResponse.newInstance( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 17fbd73fda5..8ead54e7a70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -296,7 +295,8 @@ public class RMAppManager implements EventHandler, // scheduler about the existence of the application assert application.getState() == RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, e.getMessage())); throw RPCUtil.getRemoteException(e); } } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index f5ecbaa247b..e706cad5a30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -255,8 +254,8 @@ public class AMLauncher implements Runnable { String message = "Error launching " + application.getAppAttemptId() + ". Got exception: " + StringUtils.stringifyException(ie); LOG.info(message); - handler.handle(new RMAppAttemptLaunchFailedEvent(application - .getAppAttemptId(), message)); + handler.handle(new RMAppAttemptEvent(application + .getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message)); } break; case CLEANUP: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java index a1c234cad23..64964020721 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java @@ -24,13 +24,24 @@ import org.apache.hadoop.yarn.event.AbstractEvent; public class RMAppEvent extends AbstractEvent{ private final ApplicationId appId; + private final String diagnosticMsg; public RMAppEvent(ApplicationId appId, RMAppEventType type) { + this(appId, type, ""); + } + + public RMAppEvent(ApplicationId appId, RMAppEventType type, + String diagnostic) { super(type); this.appId = appId; + this.diagnosticMsg = diagnostic; } public ApplicationId getApplicationId() { return this.appId; } + + public String getDiagnosticMsg() { + return this.diagnosticMsg; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java index f5c0f0f886b..835322a37e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java @@ -22,20 +22,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; public class RMAppFailedAttemptEvent extends RMAppEvent { - private final String diagnostics; private final boolean transferStateFromPreviousAttempt; public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event, String diagnostics, boolean transferStateFromPreviousAttempt) { - super(appId, event); - this.diagnostics = diagnostics; + super(appId, event, diagnostics); this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; } - public String getDiagnostics() { - return this.diagnostics; - } - public boolean getTransferStateFromPreviousAttempt() { return transferStateFromPreviousAttempt; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFinishedAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFinishedAttemptEvent.java deleted file mode 100644 index f1a6340ba8e..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFinishedAttemptEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - -public class RMAppFinishedAttemptEvent extends RMAppEvent { - - private final String diagnostics; - - public RMAppFinishedAttemptEvent(ApplicationId appId, String diagnostics) { - super(appId, RMAppEventType.ATTEMPT_FINISHED); - this.diagnostics = diagnostics; - } - - public String getDiagnostics() { - return this.diagnostics; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2d1737af590..f1ebba922e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -957,12 +957,12 @@ public class RMAppImpl implements RMApp, Recoverable { if (this.submissionContext.getUnmanagedAM()) { // RM does not manage the AM. Do not retry msg = "Unmanaged application " + this.getApplicationId() - + " failed due to " + failedEvent.getDiagnostics() + + " failed due to " + failedEvent.getDiagnosticMsg() + ". Failing the application."; } else if (this.isNumAttemptsBeyondThreshold) { msg = "Application " + this.getApplicationId() + " failed " + this.maxAppAttempts + " times due to " - + failedEvent.getDiagnostics() + ". Failing the application."; + + failedEvent.getDiagnosticMsg() + ". Failing the application."; } return msg; } @@ -1003,21 +1003,14 @@ public class RMAppImpl implements RMApp, Recoverable { String diags = null; switch (event.getType()) { case APP_REJECTED: - RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event; - diags = rejectedEvent.getMessage(); - break; case ATTEMPT_FINISHED: - RMAppFinishedAttemptEvent finishedEvent = - (RMAppFinishedAttemptEvent) event; - diags = finishedEvent.getDiagnostics(); + case ATTEMPT_KILLED: + diags = event.getDiagnosticMsg(); break; case ATTEMPT_FAILED: RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; diags = getAppAttemptFailedDiagnostics(failedEvent); break; - case ATTEMPT_KILLED: - diags = getAppKilledDiagnostics(); - break; default: break; } @@ -1065,9 +1058,7 @@ public class RMAppImpl implements RMApp, Recoverable { } public void transition(RMAppImpl app, RMAppEvent event) { - RMAppFinishedAttemptEvent finishedEvent = - (RMAppFinishedAttemptEvent)event; - app.diagnostics.append(finishedEvent.getDiagnostics()); + app.diagnostics.append(event.getDiagnosticMsg()); super.transition(app, event); }; } @@ -1113,21 +1104,21 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.diagnostics.append(getAppKilledDiagnostics()); + app.diagnostics.append(event.getDiagnosticMsg()); super.transition(app, event); }; } - private static String getAppKilledDiagnostics() { - return "Application killed by user."; - } - private static class KillAttemptTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { app.stateBeforeKilling = app.getState(); - app.handler.handle(new RMAppAttemptEvent(app.currentAttempt - .getAppAttemptId(), RMAppAttemptEventType.KILL)); + // Forward app kill diagnostics in the event to kill app attempt. + // These diagnostics will be returned back in ATTEMPT_KILLED event sent by + // RMAppAttemptImpl. + app.handler.handle( + new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), + RMAppAttemptEventType.KILL, event.getDiagnosticMsg())); } } @@ -1138,8 +1129,7 @@ public class RMAppImpl implements RMApp, Recoverable { } public void transition(RMAppImpl app, RMAppEvent event) { - RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event; - app.diagnostics.append(rejectedEvent.getMessage()); + app.diagnostics.append(event.getDiagnosticMsg()); super.transition(app, event); }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java deleted file mode 100644 index baaef238ca5..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRejectedEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - -public class RMAppRejectedEvent extends RMAppEvent { - - private final String message; - - public RMAppRejectedEvent(ApplicationId appId, String message) { - super(appId, RMAppEventType.APP_REJECTED); - this.message = message; - } - - public String getMessage() { - return this.message; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java index ad5c28ae67b..6df6b19f978 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEvent.java @@ -24,14 +24,25 @@ import org.apache.hadoop.yarn.event.AbstractEvent; public class RMAppAttemptEvent extends AbstractEvent { private final ApplicationAttemptId appAttemptId; + private final String diagnosticMsg; public RMAppAttemptEvent(ApplicationAttemptId appAttemptId, RMAppAttemptEventType type) { + this(appAttemptId, type, ""); + } + + public RMAppAttemptEvent(ApplicationAttemptId appAttemptId, + RMAppAttemptEventType type, String diagnostics) { super(type); this.appAttemptId = appAttemptId; + this.diagnosticMsg = diagnostics; } public ApplicationAttemptId getApplicationAttemptId() { return this.appAttemptId; } + + public String getDiagnosticMsg() { + return diagnosticMsg; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 230bbeb736b..9b8bd88c71a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -80,11 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; @@ -1021,8 +1018,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { LOG.warn("Interrupted while waiting to resend the" + " ContainerAllocated Event."); } - appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( - appAttempt.applicationAttemptId)); + appAttempt.eventHandler.handle( + new RMAppAttemptEvent(appAttempt.applicationAttemptId, + RMAppAttemptEventType.CONTAINER_ALLOCATED)); } }.start(); } @@ -1124,9 +1122,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { int exitStatus = ContainerExitStatus.INVALID; switch (event.getType()) { case LAUNCH_FAILED: - RMAppAttemptLaunchFailedEvent launchFaileEvent = - (RMAppAttemptLaunchFailedEvent) event; - diags = launchFaileEvent.getMessage(); + diags = event.getDiagnosticMsg(); break; case REGISTERED: diags = getUnexpectedAMRegisteredDiagnostics(); @@ -1134,7 +1130,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { case UNREGISTERED: RMAppAttemptUnregistrationEvent unregisterEvent = (RMAppAttemptUnregistrationEvent) event; - diags = unregisterEvent.getDiagnostics(); + diags = unregisterEvent.getDiagnosticMsg(); // reset finalTrackingUrl to url sent by am finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl()); finalStatus = unregisterEvent.getFinalApplicationStatus(); @@ -1233,17 +1229,19 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { switch (finalAttemptState) { case FINISHED: { - appEvent = new RMAppFinishedAttemptEvent(applicationId, + appEvent = + new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHED, appAttempt.getDiagnostics()); } break; case KILLED: { appAttempt.invalidateAMHostAndPort(); + // Forward diagnostics received in attempt kill event. appEvent = new RMAppFailedAttemptEvent(applicationId, RMAppEventType.ATTEMPT_KILLED, - "Application killed by user.", false); + event.getDiagnosticMsg(), false); } break; case FAILED: @@ -1355,9 +1353,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { RMAppAttemptEvent event) { // Use diagnostic from launcher - RMAppAttemptLaunchFailedEvent launchFaileEvent - = (RMAppAttemptLaunchFailedEvent) event; - appAttempt.diagnostics.append(launchFaileEvent.getMessage()); + appAttempt.diagnostics.append(event.getDiagnosticMsg()); // Tell the app, scheduler super.transition(appAttempt, event); @@ -1612,7 +1608,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { progress = 1.0f; RMAppAttemptUnregistrationEvent unregisterEvent = (RMAppAttemptUnregistrationEvent) event; - diagnostics.append(unregisterEvent.getDiagnostics()); + diagnostics.append(unregisterEvent.getDiagnosticMsg()); originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl()); finalStatus = unregisterEvent.getFinalApplicationStatus(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java deleted file mode 100644 index 681f38c2c2d..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; - -public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent { - - public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId) { - super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptLaunchFailedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptLaunchFailedEvent.java deleted file mode 100644 index d0b49b2d160..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptLaunchFailedEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; - -public class RMAppAttemptLaunchFailedEvent extends RMAppAttemptEvent { - - private final String message; - - public RMAppAttemptLaunchFailedEvent(ApplicationAttemptId appAttemptId, - String message) { - super(appAttemptId, RMAppAttemptEventType.LAUNCH_FAILED); - this.message = message; - } - - public String getMessage() { - return this.message; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java index 473946a9caa..1ce51507ee5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java @@ -27,14 +27,13 @@ public class RMAppAttemptUnregistrationEvent extends RMAppAttemptEvent { private final String finalTrackingUrl; private final FinalApplicationStatus finalStatus; - private final String diagnostics; public RMAppAttemptUnregistrationEvent(ApplicationAttemptId appAttemptId, - String trackingUrl, FinalApplicationStatus finalStatus, String diagnostics) { - super(appAttemptId, RMAppAttemptEventType.UNREGISTERED); + String trackingUrl, FinalApplicationStatus finalStatus, + String diagnostics) { + super(appAttemptId, RMAppAttemptEventType.UNREGISTERED, diagnostics); this.finalTrackingUrl = trackingUrl; this.finalStatus = finalStatus; - this.diagnostics = diagnostics; } public String getFinalTrackingUrl() { @@ -45,8 +44,4 @@ public class RMAppAttemptUnregistrationEvent extends RMAppAttemptEvent { return this.finalStatus; } - public String getDiagnostics() { - return this.diagnostics; - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 82b2ced1a69..5e3e62a4bae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -44,7 +44,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -479,8 +480,8 @@ public class RMContainerImpl implements RMContainer { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( - container.appAttemptId)); + container.eventHandler.handle(new RMAppAttemptEvent( + container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 968a767f517..16bfcfcdcdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -534,7 +534,9 @@ public abstract class AbstractYarnScheduler this.rmContext .getDispatcher() .getEventHandler() - .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); + .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL, + "Application killed due to expiry of reservation queue " + + queueName + ".")); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInvalidException.java similarity index 90% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInvalidException.java index 35a1d66b50d..7c6be4f0ab5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInvalidException.java @@ -22,11 +22,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @Private -public class QueueNotFoundException extends YarnRuntimeException { +public class QueueInvalidException extends YarnRuntimeException { private static final long serialVersionUID = 187239430L; - public QueueNotFoundException(String message) { + public QueueInvalidException(String message) { super(message); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0daa4b1b3f7..22ac7752982 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationCons import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -87,8 +86,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; @@ -676,9 +675,8 @@ public class CapacityScheduler extends return null; } - private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { - + private String getQueueMappings(ApplicationId applicationId, String queueName, + String user) { if (mappings != null && mappings.size() > 0) { try { String mappedQueue = getMappedQueue(user); @@ -698,50 +696,115 @@ public class CapacityScheduler extends String message = "Failed to submit application " + applicationId + " submitted by user " + user + " reason: " + ioex.getMessage(); this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); - return; + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); + return null; } } + return queueName; + } + private synchronized void addApplicationOnRecovery( + ApplicationId applicationId, String queueName, String user) { + queueName = getQueueMappings(applicationId, queueName, user); + if (queueName == null) { + // Exception encountered while getting queue mappings. + return; + } // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { //During a restart, this indicates a queue was removed, which is //not presently supported - if (isAppRecovering) { + if (!YarnConfiguration.shouldRMFailFast(getConfig())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery as it was submitted to queue " + + queueName + " which no longer exists after restart.")); + return; + } else { String queueErrorMsg = "Queue named " + queueName - + " missing during application recovery." - + " Queue removal during recovery is not presently supported by the" - + " capacity scheduler, please restart with all queues configured" - + " which were present before shutdown/restart."; + + " missing during application recovery." + + " Queue removal during recovery is not presently supported by the" + + " capacity scheduler, please restart with all queues configured" + + " which were present before shutdown/restart."; LOG.fatal(queueErrorMsg); - throw new QueueNotFoundException(queueErrorMsg); + throw new QueueInvalidException(queueErrorMsg); } + } + if (!(queue instanceof LeafQueue)) { + // During RM restart, this means leaf queue was converted to a parent + // queue, which is not supported for running apps. + if (!YarnConfiguration.shouldRMFailFast(getConfig())) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery as it was submitted to queue " + + queueName + " which is no longer a leaf queue after restart.")); + return; + } else { + String queueErrorMsg = "Queue named " + queueName + + " is no longer a leaf queue during application recovery." + + " Changing a leaf queue to a parent queue during recovery is" + + " not presently supported by the capacity scheduler. Please" + + " restart with leaf queues before shutdown/restart continuing" + + " as leaf queues."; + LOG.fatal(queueErrorMsg); + throw new QueueInvalidException(queueErrorMsg); + } + } + // Submit to the queue + try { + queue.submitApplication(applicationId, user, queueName); + } catch (AccessControlException ace) { + // Ignore the exception for recovered app as the app was previously + // accepted. + } + queue.getMetrics().submitApp(user); + SchedulerApplication application = + new SchedulerApplication(queue, user); + applications.put(applicationId, application); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queueName); + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } + + private synchronized void addApplication(ApplicationId applicationId, + String queueName, String user) { + queueName = getQueueMappings(applicationId, queueName, user); + if (queueName == null) { + // Exception encountered while getting queue mappings. + return; + } + // sanity checks. + CSQueue queue = getQueue(queueName); + if (queue == null) { String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return; } if (!(queue instanceof LeafQueue)) { String message = "Application " + applicationId + " submitted by user " + user + " to non-leaf queue: " + queueName; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return; } // Submit to the queue try { queue.submitApplication(applicationId, user, queueName); } catch (AccessControlException ace) { - // Ignore the exception for recovered app as the app was previously accepted - if (!isAppRecovering) { - LOG.info("Failed to submit application " + applicationId + " to queue " - + queueName + " from user " + user, ace); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, ace.toString())); - return; - } + LOG.info("Failed to submit application " + applicationId + " to queue " + + queueName + " from user " + user, ace); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, ace.toString())); + return; } // update the metrics queue.getMetrics().submitApp(user); @@ -750,14 +813,8 @@ public class CapacityScheduler extends applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - if (isAppRecovering) { - if (LOG.isDebugEnabled()) { - LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); - } - } else { - rmContext.getDispatcher().getEventHandler() + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); - } } private synchronized void addApplicationAttempt( @@ -766,6 +823,11 @@ public class CapacityScheduler extends boolean isAttemptRecovering) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); + if (application == null) { + LOG.warn("Application " + applicationAttemptId.getApplicationId() + + " cannot be found in scheduler."); + return; + } CSQueue queue = (CSQueue) application.getQueue(); FiCaSchedulerApp attempt = @@ -1170,10 +1232,13 @@ public class CapacityScheduler extends appAddedEvent.getApplicationId(), appAddedEvent.getReservationID()); if (queueName != null) { - addApplication(appAddedEvent.getApplicationId(), - queueName, - appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + if (!appAddedEvent.getIsAppRecovering()) { + addApplication(appAddedEvent.getApplicationId(), queueName, + appAddedEvent.getUser()); + } else { + addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, + appAddedEvent.getUser()); + } } } break; @@ -1473,7 +1538,8 @@ public class CapacityScheduler extends + " submitted to a reservation which is not yet currently active: " + resQName; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return null; } if (!queue.getParent().getQueueName().equals(queueName)) { @@ -1482,7 +1548,8 @@ public class CapacityScheduler extends + resQName + " which does not belong to the specified queue: " + queueName; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return null; } // use the reservation queue to run the app diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5fb71ce22ab..2e7cb6ce041 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -582,7 +581,8 @@ public class FairScheduler extends " submitted by user " + user + " with an empty queue name."; LOG.info(message); rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return; } @@ -593,7 +593,8 @@ public class FairScheduler extends + "The queue name cannot start/end with period."; LOG.info(message); rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return; } @@ -612,7 +613,8 @@ public class FairScheduler extends " cannot submit applications to queue " + queue.getName(); LOG.info(msg); rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, msg)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, msg)); return; } @@ -708,7 +710,8 @@ public class FairScheduler extends if (appRejectMsg != null && rmApp != null) { LOG.error(appRejectMsg); rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(rmApp.getApplicationId(), appRejectMsg)); + new RMAppEvent(rmApp.getApplicationId(), + RMAppEventType.APP_REJECTED, appRejectMsg)); return null; } @@ -1285,7 +1288,8 @@ public class FairScheduler extends + " submitted to a reservation which is not yet currently active: " + resQName; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return null; } if (!queue.getParent().getQueueName().equals(queueName)) { @@ -1294,7 +1298,8 @@ public class FairScheduler extends + resQName + " which does not belong to the specified queue: " + queueName; this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); + .handle(new RMAppEvent(applicationId, + RMAppEventType.APP_REJECTED, message)); return null; } // use the reservation queue to run the app diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index e307645127c..c12fb81c948 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -855,7 +854,8 @@ public class DelegationTokenRenewer extends AbstractService { // RMApp is in NEW state and thus we havne't yet informed the // Scheduler about the existence of the application rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(event.getApplicationId(), t.getMessage())); + new RMAppEvent(event.getApplicationId(), + RMAppEventType.APP_REJECTED, t.getMessage())); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index e27e6c56346..b1ce0f1f774 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -531,7 +530,8 @@ public class MockRM extends ResourceManager { MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); am.waitForState(RMAppAttemptState.ALLOCATED); getRMContext().getDispatcher().getEventHandler() - .handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed")); + .handle(new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.LAUNCH_FAILED, "Failed")); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 0566f3d5f62..8283844269a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.FileWriter; @@ -43,16 +45,22 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -60,8 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -91,6 +99,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mortbay.log.Log; import com.google.common.base.Supplier; @@ -366,6 +375,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase private static final String R = "Default"; private static final String A = "QueueA"; private static final String B = "QueueB"; + private static final String B1 = "QueueB1"; + private static final String B2 = "QueueB2"; //don't ever create the below queue ;-) private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue"; private static final String USER_1 = "user1"; @@ -396,6 +407,24 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f); } + private void setupQueueConfigurationChildOfB(CapacitySchedulerConfiguration conf) { + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R }); + final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R; + conf.setCapacity(Q_R, 100); + final String Q_A = Q_R + "." + A; + final String Q_B = Q_R + "." + B; + final String Q_B1 = Q_B + "." + B1; + final String Q_B2 = Q_B + "." + B2; + conf.setQueues(Q_R, new String[] {A, B}); + conf.setCapacity(Q_A, 50); + conf.setCapacity(Q_B, 50); + conf.setQueues(Q_B, new String[] {B1, B2}); + conf.setCapacity(Q_B1, 50); + conf.setCapacity(Q_B2, 50); + conf.setDouble(CapacitySchedulerConfiguration + .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f); + } + // Test CS recovery with multi-level queues and multi-users: // 1. setup 2 NMs each with 8GB memory; // 2. setup 2 level queues: Default -> (QueueA, QueueB) @@ -518,18 +547,106 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(), totalUsedResource.getVirtualCores()); } - - //Test that we receive a meaningful exit-causing exception if a queue - //is removed during recovery + + private void verifyAppRecoveryWithWrongQueueConfig( + CapacitySchedulerConfiguration csConf, RMApp app, String diagnostics, + MemoryRMStateStore memStore, RMState state) throws Exception { + // Restart RM with fail-fast as false. App should be killed. + csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, false); + rm2 = new MockRM(csConf, memStore); + rm2.start(); + // Wait for app to be killed. + rm2.waitForState(app.getApplicationId(), RMAppState.KILLED); + ApplicationReport report = rm2.getApplicationReport(app.getApplicationId()); + assertEquals(report.getFinalApplicationStatus(), + FinalApplicationStatus.KILLED); + assertEquals(report.getYarnApplicationState(), YarnApplicationState.KILLED); + assertEquals(report.getDiagnostics(), diagnostics); + + // Remove updated app info(app being KILLED) from state store and reinstate + // state store to previous state i.e. which indicates app is RUNNING. + // This is to simulate app recovery with fail fast config as true. + for(Map.Entry entry : + state.getApplicationState().entrySet()) { + ApplicationStateData appState = mock(ApplicationStateData.class); + ApplicationSubmissionContext ctxt = + mock(ApplicationSubmissionContext.class); + when(appState.getApplicationSubmissionContext()).thenReturn(ctxt); + when(ctxt.getApplicationId()).thenReturn(entry.getKey()); + memStore.removeApplicationStateInternal(appState); + memStore.storeApplicationStateInternal( + entry.getKey(), entry.getValue()); + } + + // Now restart RM with fail-fast as true. QueueException should be thrown. + csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true); + MockRM rm = new MockRM(csConf, memStore); + try { + rm.start(); + Assert.fail("QueueException must have been thrown"); + } catch (QueueInvalidException e) { + } finally { + rm.close(); + } + } + + //Test behavior of an app if queue is changed from leaf to parent during + //recovery. Test case does following: + //1. Add an app to QueueB and start the attempt. + //2. Add 2 subqueues(QueueB1 and QueueB2) to QueueB, restart the RM, once with + // fail fast config as false and once with fail fast as true. + //3. Verify that app was killed if fail fast is false. + //4. Verify that QueueException was thrown if fail fast is true. + @Test (timeout = 30000) + public void testCapacityLeafQueueBecomesParentOnRecovery() throws Exception { + if (getSchedulerType() != SchedulerType.CAPACITY) { + return; + } + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfiguration(csConf); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(csConf); + rm1 = new MockRM(csConf, memStore); + rm1.start(); + MockNM nm = + new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); + nm.registerNode(); + + // Submit an app to QueueB. + RMApp app = rm1.submitApp(1024, "app", USER_2, null, B); + MockRM.launchAndRegisterAM(app, rm1, nm); + assertEquals(rm1.getApplicationReport(app.getApplicationId()). + getYarnApplicationState(), YarnApplicationState.RUNNING); + + // Take a copy of state store so that it can be reset to this state. + RMState state = memStore.loadState(); + + // Change scheduler config with child queues added to QueueB. + csConf = new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationChildOfB(csConf); + + String diags = "Application killed on recovery as it was submitted to " + + "queue QueueB which is no longer a leaf queue after restart."; + verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags, memStore, state); + } + + //Test behavior of an app if queue is removed during recovery. Test case does + //following: //1. Add some apps to two queues, attempt to add an app to a non-existant // queue to verify that the new logic is not in effect during normal app // submission - //2. Remove one of the queues, restart the RM - //3. Verify that the expected exception was thrown - @Test (timeout = 30000, expected = QueueNotFoundException.class) + //2. Remove one of the queues, restart the RM, once with fail fast config as + // false and once with fail fast as true. + //3. Verify that app was killed if fail fast is false. + //4. Verify that QueueException was thrown if fail fast is true. + @Test (timeout = 30000) public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { if (getSchedulerType() != SchedulerType.CAPACITY) { - throw new QueueNotFoundException("Dummy"); + return; } conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, @@ -554,7 +671,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - + assertEquals(rm1.getApplicationReport(app2.getApplicationId()). + getYarnApplicationState(), YarnApplicationState.RUNNING); + //Submit an app with a non existant queue to make sure it does not //cause a fatal failure in the non-recovery case RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null, @@ -565,12 +684,16 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase rm1.clearQueueMetrics(app1_2); rm1.clearQueueMetrics(app2); - // Re-start RM - csConf = - new CapacitySchedulerConfiguration(conf); + // Take a copy of state store so that it can be reset to this state. + RMState state = memStore.loadState(); + + // Set new configuration with QueueB removed. + csConf = new CapacitySchedulerConfiguration(conf); setupQueueConfigurationOnlyA(csConf); - rm2 = new MockRM(csConf, memStore); - rm2.start(); + + String diags = "Application killed on recovery as it was submitted to " + + "queue QueueB which no longer exists after restart."; + verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags, memStore, state); } private void checkParentQueue(ParentQueue parentQueue, int numContainers, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2e64d61b08a..b5ad74ac35b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -474,8 +474,8 @@ public class TestRMAppTransitions { application = testCreateAppFinishing(submissionContext); } // RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED - RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( - application.getApplicationId(), diagnostics); + RMAppEvent finishedEvent = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED, diagnostics); application.handle(finishedEvent); assertAppState(RMAppState.FINISHED, application); assertTimesAtFinish(application); @@ -549,8 +549,9 @@ public class TestRMAppTransitions { RMApp application = createNewTestApp(null); // NEW => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -567,8 +568,8 @@ public class TestRMAppTransitions { RMApp application = createNewTestApp(null); // NEW => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; - RMAppEvent event = - new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -584,8 +585,8 @@ public class TestRMAppTransitions { RMApp application = createNewTestApp(null); // NEW => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; - RMAppEvent event = - new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -602,7 +603,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppNewSaving(null); // NEW_SAVING => KILLED event RMAppEventType.KILL RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -618,8 +620,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppNewSaving(null); // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; - RMAppEvent event = - new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -635,8 +637,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppSubmittedNoRecovery(null); // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "app rejected"; - RMAppEvent event = - new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -650,8 +652,9 @@ public class TestRMAppTransitions { LOG.info("--- START: testAppSubmittedKill---"); RMApp application = testCreateAppSubmittedNoRecovery(null); // SUBMITTED => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), - RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -701,15 +704,16 @@ public class TestRMAppTransitions { LOG.info("--- START: testAppAcceptedKill ---"); RMApp application = testCreateAppAccepted(null); // ACCEPTED => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), - RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); assertAppState(RMAppState.KILLING, application); RMAppEvent appAttemptKilled = new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_KILLED); + RMAppEventType.ATTEMPT_KILLED, "Application killed by user."); application.handle(appAttemptKilled); assertAppState(RMAppState.FINAL_SAVING, application); sendAppUpdateSavedEvent(application); @@ -730,7 +734,7 @@ public class TestRMAppTransitions { // RUNNING. RMAppEvent event = new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_KILLED); + RMAppEventType.ATTEMPT_KILLED, "Application killed by user."); application.handle(event); rmDispatcher.await(); @@ -748,8 +752,9 @@ public class TestRMAppTransitions { RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); @@ -807,7 +812,9 @@ public class TestRMAppTransitions { assertAppFinalStateSaved(application); // FAILED => FAILED event RMAppEventType.KILL - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); @@ -822,7 +829,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppFinishing(null); // FINISHING => FINISHED event RMAppEventType.KILL RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); assertAppState(RMAppState.FINISHING, application); @@ -839,8 +847,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppFinalSaving(null); final String diagMsg = "some diagnostics"; // attempt_finished event comes before attempt_saved event - RMAppEvent event = - new RMAppFinishedAttemptEvent(application.getApplicationId(), diagMsg); + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED, diagMsg); application.handle(event); assertAppState(RMAppState.FINAL_SAVING, application); RMAppEvent appUpdated = @@ -861,8 +869,9 @@ public class TestRMAppTransitions { RMApp application = testCreateAppFinished(null, ""); // FINISHED => FINISHED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); @@ -880,8 +889,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppNewSaving(null); // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED - RMAppEvent event = - new RMAppRejectedEvent(application.getApplicationId(), ""); + RMAppEvent event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_REJECTED, ""); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -890,7 +899,8 @@ public class TestRMAppTransitions { // FAILED => FAILED event RMAppEventType.KILL event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); @@ -908,8 +918,9 @@ public class TestRMAppTransitions { RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); sendAttemptUpdateSavedEvent(application); @@ -918,8 +929,8 @@ public class TestRMAppTransitions { assertAppState(RMAppState.KILLED, application); // KILLED => KILLED event RMAppEventType.ATTEMPT_FINISHED - event = new RMAppFinishedAttemptEvent( - application.getApplicationId(), ""); + event = new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED, ""); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); @@ -936,7 +947,9 @@ public class TestRMAppTransitions { // KILLED => KILLED event RMAppEventType.KILL - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, + "Application killed by user."); application.handle(event); rmDispatcher.await(); assertTimesAtFinish(application); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index c8b6bd07b88..06d0ae15da9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -86,11 +87,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -123,6 +121,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -414,10 +413,16 @@ public class TestRMAppAttemptTransitions { // Check events verify(masterService). unregisterAttempt(applicationAttempt.getAppAttemptId()); - - // this works for unmanaged and managed AM's because this is actually doing - // verify(application).handle(anyObject()); - verify(application).handle(any(RMAppRejectedEvent.class)); + // ATTEMPT_FAILED should be notified to app if app attempt is submitted to + // failed state. + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object o) { + RMAppEvent event = (RMAppEvent) o; + return event.getType() == RMAppEventType.ATTEMPT_FAILED; + } + }; + verify(application).handle(argThat(matcher)); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } @@ -654,8 +659,8 @@ public class TestRMAppAttemptTransitions { thenReturn(rmContainer); applicationAttempt.handle( - new RMAppAttemptContainerAllocatedEvent( - applicationAttempt.getAppAttemptId())); + new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.CONTAINER_ALLOCATED)); assertEquals(RMAppAttemptState.ALLOCATED_SAVING, applicationAttempt.getAppAttemptState()); @@ -911,9 +916,8 @@ public class TestRMAppAttemptTransitions { Container amContainer = allocateApplicationAttempt(); String diagnostics = "Launch Failed"; applicationAttempt.handle( - new RMAppAttemptLaunchFailedEvent( - applicationAttempt.getAppAttemptId(), - diagnostics)); + new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.LAUNCH_FAILED, diagnostics)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); testAppAttemptFailedState(amContainer, diagnostics); @@ -932,8 +936,9 @@ public class TestRMAppAttemptTransitions { // verify for both launched and launch_failed transitions in final_saving applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt .getAppAttemptId(), RMAppAttemptEventType.LAUNCHED)); - applicationAttempt.handle(new RMAppAttemptLaunchFailedEvent( - applicationAttempt.getAppAttemptId(), "Launch Failed")); + applicationAttempt.handle( + new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.LAUNCH_FAILED, "Launch Failed")); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); @@ -943,8 +948,9 @@ public class TestRMAppAttemptTransitions { // verify for both launched and launch_failed transitions in killed applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt .getAppAttemptId(), RMAppAttemptEventType.LAUNCHED)); - applicationAttempt.handle(new RMAppAttemptLaunchFailedEvent( - applicationAttempt.getAppAttemptId(), "Launch Failed")); + applicationAttempt.handle(new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.LAUNCH_FAILED, "Launch Failed")); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); }