From b5acbfef486161b7db6a2310e24cd0ca4b48dbf0 Mon Sep 17 00:00:00 2001
From: Subru Krishnan
Date: Mon, 2 Oct 2017 18:14:44 -0700
Subject: [PATCH] YARN-2037. Add work preserving restart support for Unmanaged
AMs. (Botong Huang via Subru).
(cherry picked from commit d4d2fd1acd2fdddf04f45e67897804eea30d79a1)
---
.../yarn/api/ApplicationMasterProtocol.java | 21 ++-
.../records/ApplicationSubmissionContext.java | 17 +-
.../ApplicationMasterService.java | 26 +--
.../resourcemanager/DefaultAMSProcessor.java | 5 +
.../rmapp/attempt/RMAppAttemptImpl.java | 6 +-
.../scheduler/AbstractYarnScheduler.java | 13 +-
.../TestWorkPreservingUnmanagedAM.java | 159 ++++++++++++++++++
7 files changed, 214 insertions(+), 33 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
index 4d7896115e7..eb40fc7f3e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
@@ -55,27 +55,32 @@ public interface ApplicationMasterProtocol {
* The interface used by a new ApplicationMaster
to register with
* the ResourceManager
.
*
- *
+ *
*
* The ApplicationMaster
needs to provide details such as RPC
* Port, HTTP tracking url etc. as specified in
* {@link RegisterApplicationMasterRequest}.
*
- *
+ *
*
* The ResourceManager
responds with critical details such as
* maximum resource capabilities in the cluster as specified in
* {@link RegisterApplicationMasterResponse}.
*
- *
- * @param request
- * registration request
+ *
+ *
+ * Re-register is only allowed for Unmanaged Application Master
+ * (UAM) HA, with
+ * {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#getKeepContainersAcrossApplicationAttempts()}
+ * set to true.
+ *
+ *
+ * @param request registration request
* @return registration respose
* @throws YarnException
* @throws IOException
- * @throws InvalidApplicationMasterRequestException
- * The exception is thrown when an ApplicationMaster tries to
- * register more then once.
+ * @throws InvalidApplicationMasterRequestException The exception is thrown
+ * when an ApplicationMaster tries to register more then once.
* @see RegisterApplicationMasterRequest
* @see RegisterApplicationMasterResponse
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 3a601e2ea52..38db60cd9e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -395,15 +395,18 @@ public abstract class ApplicationSubmissionContext {
* Set the flag which indicates whether to keep containers across application
* attempts.
*
- * If the flag is true, running containers will not be killed when application
- * attempt fails and these containers will be retrieved by the new application
- * attempt on registration via
+ * For managed AM, if the flag is true, running containers will not be killed
+ * when application attempt fails and these containers will be retrieved by
+ * the new application attempt on registration via
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
*
- *
- * @param keepContainers
- * the flag which indicates whether to keep containers across
- * application attempts.
+ *
+ * For unmanaged AM, if the flag is true, RM allows re-register and returns
+ * the running containers in the same attempt back to the UAM for HA.
+ *
+ *
+ * @param keepContainers the flag which indicates whether to keep containers
+ * across application attempts.
*/
@Public
@Stable
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 479aa43df08..90c42be8d44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -65,8 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security
- .AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
@@ -213,14 +213,20 @@ public class ApplicationMasterService extends AbstractService implements
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (hasApplicationMasterRegistered(applicationAttemptId)) {
- String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
- LOG.warn(message);
- RMAuditLogger.logFailure(
- this.rmContext.getRMApps()
- .get(appID).getUser(),
- AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
- appID, applicationAttemptId);
- throw new InvalidApplicationMasterRequestException(message);
+ // allow UAM re-register if work preservation is enabled
+ ApplicationSubmissionContext appContext =
+ rmContext.getRMApps().get(appID).getApplicationSubmissionContext();
+ if (!(appContext.getUnmanagedAM()
+ && appContext.getKeepContainersAcrossApplicationAttempts())) {
+ String message =
+ AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
+ LOG.warn(message);
+ RMAuditLogger.logFailure(
+ this.rmContext.getRMApps().get(appID).getUser(),
+ AuditConstants.REGISTER_AM, "", "ApplicationMasterService",
+ message, appID, applicationAttemptId);
+ throw new InvalidApplicationMasterRequestException(message);
+ }
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index a993d694e28..d5444b48091 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -144,6 +144,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
.getTransferredContainers(applicationAttemptId);
if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers);
+ // Clear the node set remembered by the secret manager. Necessary
+ // for UAM restart because we use the same attemptId.
+ rmContext.getNMTokenSecretManager()
+ .clearNodeSetForAttempt(applicationAttemptId);
+
List nmTokens = new ArrayList();
for (Container container : transferredContainers) {
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 65412df808c..cccb3107dad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -363,7 +363,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Transitions from RUNNING State
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
- RMAppAttemptEventType.LAUNCHED)
+ EnumSet.of(
+ RMAppAttemptEventType.LAUNCHED,
+ // Valid only for UAM restart
+ RMAppAttemptEventType.REGISTERED))
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
@@ -1240,7 +1243,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
if (appAttempt.submissionContext
.getKeepContainersAcrossApplicationAttempts()
- && !appAttempt.submissionContext.getUnmanagedAM()
&& rmApp.getCurrentAppAttempt() != appAttempt) {
appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index d15c95fe51a..5b6fdc65feb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -241,17 +241,18 @@ public abstract class AbstractYarnScheduler
ApplicationId appId = currentAttempt.getApplicationId();
SchedulerApplication app = applications.get(appId);
List containerList = new ArrayList();
- RMApp appImpl = this.rmContext.getRMApps().get(appId);
- if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
- return containerList;
- }
if (app == null) {
return containerList;
}
Collection liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
- ContainerId amContainerId = rmContext.getRMApps().get(appId)
- .getCurrentAppAttempt().getMasterContainer().getId();
+ ContainerId amContainerId = null;
+ // For UAM, amContainer would be null
+ if (rmContext.getRMApps().get(appId).getCurrentAppAttempt()
+ .getMasterContainer() != null) {
+ amContainerId = rmContext.getRMApps().get(appId).getCurrentAppAttempt()
+ .getMasterContainer().getId();
+ }
for (RMContainer rmContainer : liveContainers) {
if (!rmContainer.getContainerId().equals(amContainerId)) {
containerList.add(rmContainer.getContainer());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java
new file mode 100644
index 00000000000..80f7319bdae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingUnmanagedAM.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test UAM handling in RM.
+ */
+public class TestWorkPreservingUnmanagedAM
+ extends ParameterizedSchedulerTestBase {
+
+ private YarnConfiguration conf;
+
+ public TestWorkPreservingUnmanagedAM(SchedulerType type) throws IOException {
+ super(type);
+ }
+
+ @Before
+ public void setup() {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ conf = getConf();
+ UserGroupInformation.setConfiguration(conf);
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ }
+
+ /**
+ * Test UAM work preserving restart. When the keepContainersAcrossAttempt flag
+ * is on, we allow UAM to directly register again and move on without getting
+ * the applicationAlreadyRegistered exception.
+ */
+ protected void testUAMRestart(boolean keepContainers) throws Exception {
+ // start RM
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNM nm =
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+ nm.registerNode();
+
+ // create app and launch the UAM
+ boolean unamanged = true;
+ int maxAttempts = 1;
+ boolean waitForAccepted = true;
+ RMApp app = rm.submitApp(200, "",
+ UserGroupInformation.getCurrentUser().getShortUserName(), null,
+ unamanged, null, maxAttempts, null, null, waitForAccepted,
+ keepContainers);
+
+ MockAM am = MockRM.launchUAM(app, rm, nm);
+
+ // Register for the first time
+ am.registerAppAttempt();
+
+ // Allocate two containers to UAM
+ int numContainers = 3;
+ List conts = am.allocate("127.0.0.1", 1000, numContainers,
+ new ArrayList()).getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ conts.addAll(am.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ Thread.sleep(100);
+ }
+
+ // Release one container
+ List releaseList =
+ Collections.singletonList(conts.get(0).getId());
+ List finishedConts =
+ am.allocate(new ArrayList(), releaseList)
+ .getCompletedContainersStatuses();
+ while (finishedConts.size() < releaseList.size()) {
+ nm.nodeHeartbeat(true);
+ finishedConts
+ .addAll(am
+ .allocate(new ArrayList(),
+ new ArrayList())
+ .getCompletedContainersStatuses());
+ Thread.sleep(100);
+ }
+
+ // Register for the second time
+ RegisterApplicationMasterResponse response = null;
+ try {
+ response = am.registerAppAttempt(false);
+ } catch (InvalidApplicationMasterRequestException e) {
+ Assert.assertEquals(false, keepContainers);
+ return;
+ }
+ Assert.assertEquals("RM should not allow second register"
+ + " for UAM without keep container flag ", true, keepContainers);
+
+ // Expecting the two running containers previously
+ Assert.assertEquals(2, response.getContainersFromPreviousAttempts().size());
+ Assert.assertEquals(1, response.getNMTokensFromPreviousAttempts().size());
+
+ // Allocate one more containers to UAM, just to be safe
+ numContainers = 1;
+ am.allocate("127.0.0.1", 1000, numContainers, new ArrayList());
+ nm.nodeHeartbeat(true);
+ conts = am.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers();
+ while (conts.size() < numContainers) {
+ nm.nodeHeartbeat(true);
+ conts.addAll(am.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ Thread.sleep(100);
+ }
+
+ rm.stop();
+ }
+
+ @Test(timeout = 600000)
+ public void testUAMRestartKeepContainers() throws Exception {
+ testUAMRestart(true);
+ }
+
+ @Test(timeout = 600000)
+ public void testUAMRestartNoKeepContainers() throws Exception {
+ testUAMRestart(false);
+ }
+
+}