YARN-2037. Add work preserving restart support for Unmanaged AMs. (Botong Huang via Subru).

(cherry picked from commit d4d2fd1acd)
This commit is contained in:
Subru Krishnan 2017-10-02 18:14:44 -07:00
parent c8c2c89282
commit b5acbfef48
7 changed files with 214 additions and 33 deletions

View File

@ -55,27 +55,32 @@ public interface ApplicationMasterProtocol {
* The interface used by a new <code>ApplicationMaster</code> to register with
* the <code>ResourceManager</code>.
* </p>
*
*
* <p>
* The <code>ApplicationMaster</code> needs to provide details such as RPC
* Port, HTTP tracking url etc. as specified in
* {@link RegisterApplicationMasterRequest}.
* </p>
*
*
* <p>
* The <code>ResourceManager</code> responds with critical details such as
* maximum resource capabilities in the cluster as specified in
* {@link RegisterApplicationMasterResponse}.
* </p>
*
* @param request
* registration request
*
* <p>
* Re-register is only allowed for <code>Unmanaged Application Master</code>
* (UAM) HA, with
* {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#getKeepContainersAcrossApplicationAttempts()}
* set to true.
* </p>
*
* @param request registration request
* @return registration respose
* @throws YarnException
* @throws IOException
* @throws InvalidApplicationMasterRequestException
* The exception is thrown when an ApplicationMaster tries to
* register more then once.
* @throws InvalidApplicationMasterRequestException The exception is thrown
* when an ApplicationMaster tries to register more then once.
* @see RegisterApplicationMasterRequest
* @see RegisterApplicationMasterResponse
*/

View File

@ -395,15 +395,18 @@ public abstract class ApplicationSubmissionContext {
* Set the flag which indicates whether to keep containers across application
* attempts.
* <p>
* If the flag is true, running containers will not be killed when application
* attempt fails and these containers will be retrieved by the new application
* attempt on registration via
* For managed AM, if the flag is true, running containers will not be killed
* when application attempt fails and these containers will be retrieved by
* the new application attempt on registration via
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
* </p>
*
* @param keepContainers
* the flag which indicates whether to keep containers across
* application attempts.
* <p>
* For unmanaged AM, if the flag is true, RM allows re-register and returns
* the running containers in the same attempt back to the UAM for HA.
* </p>
*
* @param keepContainers the flag which indicates whether to keep containers
* across application attempts.
*/
@Public
@Stable

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@ -65,8 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security
.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
@ -213,14 +213,20 @@ public class ApplicationMasterService extends AbstractService implements
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (hasApplicationMasterRegistered(applicationAttemptId)) {
String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
.get(appID).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
appID, applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
// allow UAM re-register if work preservation is enabled
ApplicationSubmissionContext appContext =
rmContext.getRMApps().get(appID).getApplicationSubmissionContext();
if (!(appContext.getUnmanagedAM()
&& appContext.getKeepContainersAcrossApplicationAttempts())) {
String message =
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService",
message, appID, applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);

View File

@ -144,6 +144,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
.getTransferredContainers(applicationAttemptId);
if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers);
// Clear the node set remembered by the secret manager. Necessary
// for UAM restart because we use the same attemptId.
rmContext.getNMTokenSecretManager()
.clearNodeSetForAttempt(applicationAttemptId);
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {
try {

View File

@ -363,7 +363,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Transitions from RUNNING State
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.LAUNCHED)
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
// Valid only for UAM restart
RMAppAttemptEventType.REGISTERED))
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
@ -1240,7 +1243,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
if (appAttempt.submissionContext
.getKeepContainersAcrossApplicationAttempts()
&& !appAttempt.submissionContext.getUnmanagedAM()
&& rmApp.getCurrentAppAttempt() != appAttempt) {
appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
}

View File

@ -241,17 +241,18 @@ public abstract class AbstractYarnScheduler
ApplicationId appId = currentAttempt.getApplicationId();
SchedulerApplication<T> app = applications.get(appId);
List<Container> containerList = new ArrayList<Container>();
RMApp appImpl = this.rmContext.getRMApps().get(appId);
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
return containerList;
}
if (app == null) {
return containerList;
}
Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
ContainerId amContainerId = rmContext.getRMApps().get(appId)
.getCurrentAppAttempt().getMasterContainer().getId();
ContainerId amContainerId = null;
// For UAM, amContainer would be null
if (rmContext.getRMApps().get(appId).getCurrentAppAttempt()
.getMasterContainer() != null) {
amContainerId = rmContext.getRMApps().get(appId).getCurrentAppAttempt()
.getMasterContainer().getId();
}
for (RMContainer rmContainer : liveContainers) {
if (!rmContainer.getContainerId().equals(amContainerId)) {
containerList.add(rmContainer.getContainer());

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test UAM handling in RM.
*/
public class TestWorkPreservingUnmanagedAM
extends ParameterizedSchedulerTestBase {
private YarnConfiguration conf;
public TestWorkPreservingUnmanagedAM(SchedulerType type) throws IOException {
super(type);
}
@Before
public void setup() {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
conf = getConf();
UserGroupInformation.setConfiguration(conf);
DefaultMetricsSystem.setMiniClusterMode(true);
}
/**
* Test UAM work preserving restart. When the keepContainersAcrossAttempt flag
* is on, we allow UAM to directly register again and move on without getting
* the applicationAlreadyRegistered exception.
*/
protected void testUAMRestart(boolean keepContainers) throws Exception {
// start RM
MockRM rm = new MockRM();
rm.start();
MockNM nm =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm.registerNode();
// create app and launch the UAM
boolean unamanged = true;
int maxAttempts = 1;
boolean waitForAccepted = true;
RMApp app = rm.submitApp(200, "",
UserGroupInformation.getCurrentUser().getShortUserName(), null,
unamanged, null, maxAttempts, null, null, waitForAccepted,
keepContainers);
MockAM am = MockRM.launchUAM(app, rm, nm);
// Register for the first time
am.registerAppAttempt();
// Allocate two containers to UAM
int numContainers = 3;
List<Container> conts = am.allocate("127.0.0.1", 1000, numContainers,
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(100);
}
// Release one container
List<ContainerId> releaseList =
Collections.singletonList(conts.get(0).getId());
List<ContainerStatus> finishedConts =
am.allocate(new ArrayList<ResourceRequest>(), releaseList)
.getCompletedContainersStatuses();
while (finishedConts.size() < releaseList.size()) {
nm.nodeHeartbeat(true);
finishedConts
.addAll(am
.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>())
.getCompletedContainersStatuses());
Thread.sleep(100);
}
// Register for the second time
RegisterApplicationMasterResponse response = null;
try {
response = am.registerAppAttempt(false);
} catch (InvalidApplicationMasterRequestException e) {
Assert.assertEquals(false, keepContainers);
return;
}
Assert.assertEquals("RM should not allow second register"
+ " for UAM without keep container flag ", true, keepContainers);
// Expecting the two running containers previously
Assert.assertEquals(2, response.getContainersFromPreviousAttempts().size());
Assert.assertEquals(1, response.getNMTokensFromPreviousAttempts().size());
// Allocate one more containers to UAM, just to be safe
numContainers = 1;
am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(100);
}
rm.stop();
}
@Test(timeout = 600000)
public void testUAMRestartKeepContainers() throws Exception {
testUAMRestart(true);
}
@Test(timeout = 600000)
public void testUAMRestartNoKeepContainers() throws Exception {
testUAMRestart(false);
}
}