From d4d2fd1acd2fdddf04f45e67897804eea30d79a1 Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Mon, 2 Oct 2017 18:14:44 -0700 Subject: [PATCH] YARN-2037. Add work preserving restart support for Unmanaged AMs. (Botong Huang via Subru). --- .../yarn/api/ApplicationMasterProtocol.java | 21 ++- .../records/ApplicationSubmissionContext.java | 17 +- .../ApplicationMasterService.java | 26 +-- .../resourcemanager/DefaultAMSProcessor.java | 5 + .../rmapp/attempt/RMAppAttemptImpl.java | 6 +- .../scheduler/AbstractYarnScheduler.java | 13 +- .../TestWorkPreservingUnmanagedAM.java | 159 ++++++++++++++++++ 7 files changed, 214 insertions(+), 33 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java index 4d7896115e7..eb40fc7f3e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java @@ -55,27 +55,32 @@ public interface ApplicationMasterProtocol { * The interface used by a new ApplicationMaster to register with * the ResourceManager. *

- * + * *

* The ApplicationMaster needs to provide details such as RPC * Port, HTTP tracking url etc. as specified in * {@link RegisterApplicationMasterRequest}. *

- * + * *

* The ResourceManager responds with critical details such as * maximum resource capabilities in the cluster as specified in * {@link RegisterApplicationMasterResponse}. *

- * - * @param request - * registration request + * + *

+ * Re-register is only allowed for Unmanaged Application Master + * (UAM) HA, with + * {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#getKeepContainersAcrossApplicationAttempts()} + * set to true. + *

+ * + * @param request registration request * @return registration respose * @throws YarnException * @throws IOException - * @throws InvalidApplicationMasterRequestException - * The exception is thrown when an ApplicationMaster tries to - * register more then once. + * @throws InvalidApplicationMasterRequestException The exception is thrown + * when an ApplicationMaster tries to register more then once. * @see RegisterApplicationMasterRequest * @see RegisterApplicationMasterResponse */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 4f1d147791c..a6bbca7c56f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -395,15 +395,18 @@ public static ApplicationSubmissionContext newInstance( * Set the flag which indicates whether to keep containers across application * attempts. *

- * If the flag is true, running containers will not be killed when application - * attempt fails and these containers will be retrieved by the new application - * attempt on registration via + * For managed AM, if the flag is true, running containers will not be killed + * when application attempt fails and these containers will be retrieved by + * the new application attempt on registration via * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}. *

- * - * @param keepContainers - * the flag which indicates whether to keep containers across - * application attempts. + *

+ * For unmanaged AM, if the flag is true, RM allows re-register and returns + * the running containers in the same attempt back to the UAM for HA. + *

+ * + * @param keepContainers the flag which indicates whether to keep containers + * across application attempts. */ @Public @Stable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 479aa43df08..90c42be8d44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; @@ -65,8 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.security - .AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; @@ -213,14 +213,20 @@ public RegisterApplicationMasterResponse registerApplicationMaster( synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (hasApplicationMasterRegistered(applicationAttemptId)) { - String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID; - LOG.warn(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps() - .get(appID).getUser(), - AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - appID, applicationAttemptId); - throw new InvalidApplicationMasterRequestException(message); + // allow UAM re-register if work preservation is enabled + ApplicationSubmissionContext appContext = + rmContext.getRMApps().get(appID).getApplicationSubmissionContext(); + if (!(appContext.getUnmanagedAM() + && appContext.getKeepContainersAcrossApplicationAttempts())) { + String message = + AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID; + LOG.warn(message); + RMAuditLogger.logFailure( + this.rmContext.getRMApps().get(appID).getUser(), + AuditConstants.REGISTER_AM, "", "ApplicationMasterService", + message, appID, applicationAttemptId); + throw new InvalidApplicationMasterRequestException(message); + } } this.amLivelinessMonitor.receivedPing(applicationAttemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 5632efecaa0..3eef270933a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -148,6 +148,11 @@ public void registerApplicationMaster( .getTransferredContainers(applicationAttemptId); if (!transferredContainers.isEmpty()) { response.setContainersFromPreviousAttempts(transferredContainers); + // Clear the node set remembered by the secret manager. Necessary + // for UAM restart because we use the same attemptId. + rmContext.getNMTokenSecretManager() + .clearNodeSetForAttempt(applicationAttemptId); + List nmTokens = new ArrayList(); for (Container container : transferredContainers) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index fa29e008cb3..1b4d6135cc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -363,7 +363,10 @@ RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) // Transitions from RUNNING State .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, - RMAppAttemptEventType.LAUNCHED) + EnumSet.of( + RMAppAttemptEventType.LAUNCHED, + // Valid only for UAM restart + RMAppAttemptEventType.REGISTERED)) .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, @@ -1236,7 +1239,6 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, if (appAttempt.submissionContext .getKeepContainersAcrossApplicationAttempts() - && !appAttempt.submissionContext.getUnmanagedAM() && rmApp.getCurrentAppAttempt() != appAttempt) { appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4896ab0e76a..9d6037e2c41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -243,17 +243,18 @@ public List getTransferredContainers( ApplicationId appId = currentAttempt.getApplicationId(); SchedulerApplication app = applications.get(appId); List containerList = new ArrayList(); - RMApp appImpl = this.rmContext.getRMApps().get(appId); - if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { - return containerList; - } if (app == null) { return containerList; } Collection liveContainers = app.getCurrentAppAttempt().getLiveContainers(); - ContainerId amContainerId = rmContext.getRMApps().get(appId) - .getCurrentAppAttempt().getMasterContainer().getId(); + ContainerId amContainerId = null; + // For UAM, amContainer would be null + if (rmContext.getRMApps().get(appId).getCurrentAppAttempt() + .getMasterContainer() != null) { + amContainerId = rmContext.getRMApps().get(appId).getCurrentAppAttempt() + .getMasterContainer().getId(); + } for (RMContainer rmContainer : liveContainers) { if (!rmContainer.getContainerId().equals(amContainerId)) { containerList.add(rmContainer.getContainer()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java new file mode 100644 index 00000000000..80f7319bdae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test UAM handling in RM. + */ +public class TestWorkPreservingUnmanagedAM + extends ParameterizedSchedulerTestBase { + + private YarnConfiguration conf; + + public TestWorkPreservingUnmanagedAM(SchedulerType type) throws IOException { + super(type); + } + + @Before + public void setup() { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = getConf(); + UserGroupInformation.setConfiguration(conf); + DefaultMetricsSystem.setMiniClusterMode(true); + } + + /** + * Test UAM work preserving restart. When the keepContainersAcrossAttempt flag + * is on, we allow UAM to directly register again and move on without getting + * the applicationAlreadyRegistered exception. + */ + protected void testUAMRestart(boolean keepContainers) throws Exception { + // start RM + MockRM rm = new MockRM(); + rm.start(); + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm.registerNode(); + + // create app and launch the UAM + boolean unamanged = true; + int maxAttempts = 1; + boolean waitForAccepted = true; + RMApp app = rm.submitApp(200, "", + UserGroupInformation.getCurrentUser().getShortUserName(), null, + unamanged, null, maxAttempts, null, null, waitForAccepted, + keepContainers); + + MockAM am = MockRM.launchUAM(app, rm, nm); + + // Register for the first time + am.registerAppAttempt(); + + // Allocate two containers to UAM + int numContainers = 3; + List conts = am.allocate("127.0.0.1", 1000, numContainers, + new ArrayList()).getAllocatedContainers(); + while (conts.size() < numContainers) { + nm.nodeHeartbeat(true); + conts.addAll(am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(100); + } + + // Release one container + List releaseList = + Collections.singletonList(conts.get(0).getId()); + List finishedConts = + am.allocate(new ArrayList(), releaseList) + .getCompletedContainersStatuses(); + while (finishedConts.size() < releaseList.size()) { + nm.nodeHeartbeat(true); + finishedConts + .addAll(am + .allocate(new ArrayList(), + new ArrayList()) + .getCompletedContainersStatuses()); + Thread.sleep(100); + } + + // Register for the second time + RegisterApplicationMasterResponse response = null; + try { + response = am.registerAppAttempt(false); + } catch (InvalidApplicationMasterRequestException e) { + Assert.assertEquals(false, keepContainers); + return; + } + Assert.assertEquals("RM should not allow second register" + + " for UAM without keep container flag ", true, keepContainers); + + // Expecting the two running containers previously + Assert.assertEquals(2, response.getContainersFromPreviousAttempts().size()); + Assert.assertEquals(1, response.getNMTokensFromPreviousAttempts().size()); + + // Allocate one more containers to UAM, just to be safe + numContainers = 1; + am.allocate("127.0.0.1", 1000, numContainers, new ArrayList()); + nm.nodeHeartbeat(true); + conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (conts.size() < numContainers) { + nm.nodeHeartbeat(true); + conts.addAll(am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(100); + } + + rm.stop(); + } + + @Test(timeout = 600000) + public void testUAMRestartKeepContainers() throws Exception { + testUAMRestart(true); + } + + @Test(timeout = 600000) + public void testUAMRestartNoKeepContainers() throws Exception { + testUAMRestart(false); + } + +}