From 3e4b280de793030539ea8f01a8bbe80795a3ae6b Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 4 Nov 2014 17:44:59 -0800 Subject: [PATCH] YARN-2010. Handle app-recovery failures gracefully. (Jian He and Karthik Kambatla via kasha) (cherry picked from commit b2cd2698028118b6384904732dbf94942f644732) --- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/RMAppManager.java | 55 +++------------- .../resourcemanager/rmapp/RMAppImpl.java | 47 ++++++++++++++ .../rmapp/RMAppRecoverEvent.java | 36 ++++++++++ .../rmapp/attempt/RMAppAttemptImpl.java | 6 +- .../scheduler/QueueNotFoundException.java | 32 +++++++++ .../scheduler/capacity/CapacityScheduler.java | 7 +- .../TestWorkPreservingRMRestart.java | 18 ++--- .../rmapp/TestRMAppTransitions.java | 65 ++++++++++++++++--- 9 files changed, 196 insertions(+), 73 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 00f9e9cdfaf..044cf9c65dd 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -829,6 +829,9 @@ Release 2.6.0 - UNRELEASED YARN-2752. Made ContainerExecutor append "nice -n" arg only when priority adjustment flag is set. (Xuan Gong via zjshen) + YARN-2010. Handle app-recovery failures gracefully. + (Jian He and Karthik Kambatla via kasha) + Release 2.5.2 - UNRELEASED diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 63333b8f62e..02c6d2f0036 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -274,12 +275,11 @@ public class RMAppManager implements EventHandler, ApplicationId appId = submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; try { - credentials = parseCredentials(submissionContext); this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, - credentials, submissionContext.getCancelTokensWhenComplete(), - application.getUser()); + parseCredentials(submissionContext), + submissionContext.getCancelTokensWhenComplete(), + application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); // Sending APP_REJECTED is fine, since we assume that the @@ -299,10 +299,8 @@ public class RMAppManager implements EventHandler, } } - @SuppressWarnings("unchecked") - protected void - recoverApplication(ApplicationState appState, RMState rmState) - throws Exception { + protected void recoverApplication(ApplicationState appState, RMState rmState) + throws Exception { ApplicationSubmissionContext appContext = appState.getApplicationSubmissionContext(); ApplicationId appId = appState.getAppId(); @@ -311,33 +309,7 @@ public class RMAppManager implements EventHandler, RMAppImpl application = createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), appState.getUser()); - application.recover(rmState); - if (isApplicationInFinalState(appState.getState())) { - // We are synchronously moving the application into final state so that - // momentarily client will not see this application in NEW state. Also - // for finished applications we will avoid renewing tokens. - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - return; - } - - if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = null; - try { - credentials = parseCredentials(appContext); - // synchronously renew delegation token on recovery. - rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete(), - application.getUser()); - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } catch (Exception e) { - LOG.warn("Unable to parse and renew delegation tokens.", e); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(appId, e.getMessage())); - throw e; - } - } else { - application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); - } + application.handle(new RMAppRecoverEvent(appId, rmState)); } private RMAppImpl createAndPopulateNewRMApp( @@ -416,18 +388,9 @@ public class RMAppManager implements EventHandler, return null; } - - private boolean isApplicationInFinalState(RMAppState rmAppState) { - if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED - || rmAppState == RMAppState.KILLED) { - return true; - } else { - return false; - } - } - protected Credentials parseCredentials(ApplicationSubmissionContext application) - throws IOException { + protected Credentials parseCredentials( + ApplicationSubmissionContext application) throws IOException { Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); ByteBuffer tokens = application.getAMContainerSpec().getTokens(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 1994b36b723..9b10872641d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -36,6 +38,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -825,6 +829,15 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event; + try { + app.recover(recoverEvent.getRMState()); + } catch (Exception e) { + String msg = app.applicationId + " failed to recover. " + e.getMessage(); + failToRecoverApp(app, event, msg, e); + return RMAppState.FINAL_SAVING; + } + // The app has completed. if (app.recoveredFinalState != null) { app.recoverAppAttempts(); @@ -832,6 +845,20 @@ public class RMAppImpl implements RMApp, Recoverable { return app.recoveredFinalState; } + if (UserGroupInformation.isSecurityEnabled()) { + // synchronously renew delegation token on recovery. + try { + app.rmContext.getDelegationTokenRenewer().addApplicationSync( + app.getApplicationId(), app.parseCredentials(), + app.submissionContext.getCancelTokensWhenComplete(), app.getUser()); + } catch (Exception e) { + String msg = "Failed to renew delegation token on recovery for " + + app.applicationId + e.getMessage(); + failToRecoverApp(app, event, msg, e); + return RMAppState.FINAL_SAVING; + } + } + // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { @@ -865,6 +892,14 @@ public class RMAppImpl implements RMApp, Recoverable { // Thus we return ACCECPTED state on recovery. return RMAppState.ACCEPTED; } + + private void failToRecoverApp(RMAppImpl app, RMAppEvent event, String msg, + Exception e) { + app.diagnostics.append(msg); + LOG.error(msg, e); + app.rememberTargetTransitionsAndStoreState(event, new FinalTransition( + RMAppState.FAILED), RMAppState.FAILED, RMAppState.FAILED); + } } private static final class AddApplicationToSchedulerTransition extends @@ -1296,4 +1331,16 @@ public class RMAppImpl implements RMApp, Recoverable { public ReservationId getReservationId() { return submissionContext.getReservationID(); } + + protected Credentials parseCredentials() throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens(); + if (tokens != null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + return credentials; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java new file mode 100644 index 00000000000..b8c91a930bf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; + +public class RMAppRecoverEvent extends RMAppEvent { + + private final RMState state; + + public RMAppRecoverEvent(ApplicationId appId, RMState state) { + super(appId, RMAppEventType.RECOVER); + this.state = state; + } + + public RMState getRMState() { + return state; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index b5a6237ec01..ae11b07f868 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -833,8 +833,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { if (UserGroupInformation.isSecurityEnabled()) { byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); - clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() - .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + if (clientTokenMasterKeyBytes != null) { + clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() + .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); + } } this.amrmToken = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java new file mode 100644 index 00000000000..35a1d66b50d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +@Private +public class QueueNotFoundException extends YarnRuntimeException { + + private static final long serialVersionUID = 187239430L; + + public QueueNotFoundException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 93322281727..c383e432029 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; @@ -676,15 +677,13 @@ public class CapacityScheduler extends //During a restart, this indicates a queue was removed, which is //not presently supported if (isAppRecovering) { - //throwing RuntimeException because some other exceptions are caught - //(including YarnRuntimeException) and we want this to force an exit - String queueErrorMsg = "Queue named " + queueName + String queueErrorMsg = "Queue named " + queueName + " missing during application recovery." + " Queue removal during recovery is not presently supported by the" + " capacity scheduler, please restart with all queues configured" + " which were present before shutdown/restart."; LOG.fatal(queueErrorMsg); - throw new RuntimeException(queueErrorMsg); + throw new QueueNotFoundException(queueErrorMsg); } String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 85d38950eac..536dbd77d31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -37,6 +37,7 @@ import java.util.Set; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -570,10 +572,10 @@ public class TestWorkPreservingRMRestart { // submission //2. Remove one of the queues, restart the RM //3. Verify that the expected exception was thrown - @Test (timeout = 30000) + @Test (timeout = 30000, expected = QueueNotFoundException.class) public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { if (!schedulerClass.equals(CapacityScheduler.class)) { - return; + throw new QueueNotFoundException("Dummy"); } conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, @@ -614,17 +616,7 @@ public class TestWorkPreservingRMRestart { new CapacitySchedulerConfiguration(conf); setupQueueConfigurationOnlyA(csConf); rm2 = new MockRM(csConf, memStore); - boolean runtimeThrown = false; - try { - rm2.start(); - } catch (RuntimeException e) { - //we're catching it because we want to verify the message - //and we don't want to set it as an expected exception for the - //test because we only want it to happen here - assertTrue(e.getMessage().contains(B + " missing")); - runtimeThrown = true; - } - assertTrue(runtimeThrown); + rm2.start(); } private void checkParentQueue(ParentQueue parentQueue, int numContainers, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 6a66385f3d4..ecb6b5caff6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -35,6 +36,8 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; @@ -73,9 +77,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -199,10 +205,11 @@ public class TestRMAppTransitions { AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); store = mock(RMStateStore.class); writer = mock(RMApplicationHistoryWriter.class); + DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class); RMContext realRMContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, new AMRMTokenSecretManager(conf, this.rmContext), + renewer, new AMRMTokenSecretManager(conf, this.rmContext), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), @@ -387,8 +394,12 @@ public class TestRMAppTransitions { ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); // NEW => SUBMITTED event RMAppEventType.RECOVER + RMState state = new RMState(); + ApplicationState appState = new ApplicationState(123, 123, null, "user"); + state.getApplicationState().put(application.getApplicationId(), appState); RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.RECOVER); + new RMAppRecoverEvent(application.getApplicationId(), state); + application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); @@ -514,7 +525,46 @@ public class TestRMAppTransitions { @Test (timeout = 30000) public void testAppRecoverPath() throws IOException { LOG.info("--- START: testAppRecoverPath ---"); - testCreateAppSubmittedRecovery(null); + ApplicationSubmissionContext sub = + Records.newRecord(ApplicationSubmissionContext.class); + ContainerLaunchContext clc = + Records.newRecord(ContainerLaunchContext.class); + Credentials credentials = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setTokens(securityTokens); + sub.setAMContainerSpec(clc); + testCreateAppSubmittedRecovery(sub); + } + + @Test (timeout = 30000) + public void testAppRecoverToFailed() throws IOException { + LOG.info("--- START: testAppRecoverToFailed ---"); + ApplicationSubmissionContext sub = + Records.newRecord(ApplicationSubmissionContext.class); + ContainerLaunchContext clc = + Records.newRecord(ContainerLaunchContext.class); + Credentials credentials = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setTokens(securityTokens); + sub.setAMContainerSpec(clc); + + RMApp application = createNewTestApp(sub); + // NEW => FINAL_SAVING, event RMAppEventType.RECOVER + RMState state = new RMState(); + RMAppEvent event = + new RMAppRecoverEvent(application.getApplicationId(), state); + // NPE will throw on recovery. + application.handle(event); + assertAppState(RMAppState.FINAL_SAVING, application); + sendAppUpdateSavedEvent(application); + rmDispatcher.await(); + assertAppState(RMAppState.FAILED, application); } @Test (timeout = 30000) @@ -917,7 +967,6 @@ public class TestRMAppTransitions { } } - @SuppressWarnings("deprecation") public void testRecoverApplication(ApplicationState appState, RMState rmState) throws Exception { ApplicationSubmissionContext submissionContext = @@ -932,15 +981,15 @@ public class TestRMAppTransitions { RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, submissionContext.getResource(), 1)); Assert.assertEquals(RMAppState.NEW, application.getState()); - application.recover(rmState); + RMAppEvent recoverEvent = + new RMAppRecoverEvent(application.getApplicationId(), rmState); + // Trigger RECOVER event. + application.handle(recoverEvent); // Application final status looked from recoveredFinalStatus Assert.assertTrue("Application is not in recoveredFinalStatus.", RMAppImpl.isAppInFinalState(application)); - // Trigger RECOVER event. - application.handle(new RMAppEvent(appState.getAppId(), - RMAppEventType.RECOVER)); rmDispatcher.await(); RMAppState finalState = appState.getState(); Assert.assertEquals("Application is not in finalState.", finalState,