YARN-2037. Add work preserving restart support for Unmanaged AMs. (Botong Huang via Subru).
This commit is contained in:
parent
015abcd8ce
commit
d4d2fd1acd
|
@ -55,27 +55,32 @@ public interface ApplicationMasterProtocol {
|
||||||
* The interface used by a new <code>ApplicationMaster</code> to register with
|
* The interface used by a new <code>ApplicationMaster</code> to register with
|
||||||
* the <code>ResourceManager</code>.
|
* the <code>ResourceManager</code>.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* The <code>ApplicationMaster</code> needs to provide details such as RPC
|
* The <code>ApplicationMaster</code> needs to provide details such as RPC
|
||||||
* Port, HTTP tracking url etc. as specified in
|
* Port, HTTP tracking url etc. as specified in
|
||||||
* {@link RegisterApplicationMasterRequest}.
|
* {@link RegisterApplicationMasterRequest}.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* The <code>ResourceManager</code> responds with critical details such as
|
* The <code>ResourceManager</code> responds with critical details such as
|
||||||
* maximum resource capabilities in the cluster as specified in
|
* maximum resource capabilities in the cluster as specified in
|
||||||
* {@link RegisterApplicationMasterResponse}.
|
* {@link RegisterApplicationMasterResponse}.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* @param request
|
* <p>
|
||||||
* registration request
|
* Re-register is only allowed for <code>Unmanaged Application Master</code>
|
||||||
|
* (UAM) HA, with
|
||||||
|
* {@link org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#getKeepContainersAcrossApplicationAttempts()}
|
||||||
|
* set to true.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param request registration request
|
||||||
* @return registration respose
|
* @return registration respose
|
||||||
* @throws YarnException
|
* @throws YarnException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @throws InvalidApplicationMasterRequestException
|
* @throws InvalidApplicationMasterRequestException The exception is thrown
|
||||||
* The exception is thrown when an ApplicationMaster tries to
|
* when an ApplicationMaster tries to register more then once.
|
||||||
* register more then once.
|
|
||||||
* @see RegisterApplicationMasterRequest
|
* @see RegisterApplicationMasterRequest
|
||||||
* @see RegisterApplicationMasterResponse
|
* @see RegisterApplicationMasterResponse
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -395,15 +395,18 @@ public abstract class ApplicationSubmissionContext {
|
||||||
* Set the flag which indicates whether to keep containers across application
|
* Set the flag which indicates whether to keep containers across application
|
||||||
* attempts.
|
* attempts.
|
||||||
* <p>
|
* <p>
|
||||||
* If the flag is true, running containers will not be killed when application
|
* For managed AM, if the flag is true, running containers will not be killed
|
||||||
* attempt fails and these containers will be retrieved by the new application
|
* when application attempt fails and these containers will be retrieved by
|
||||||
* attempt on registration via
|
* the new application attempt on registration via
|
||||||
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
|
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
* <p>
|
||||||
* @param keepContainers
|
* For unmanaged AM, if the flag is true, RM allows re-register and returns
|
||||||
* the flag which indicates whether to keep containers across
|
* the running containers in the same attempt back to the UAM for HA.
|
||||||
* application attempts.
|
* </p>
|
||||||
|
*
|
||||||
|
* @param keepContainers the flag which indicates whether to keep containers
|
||||||
|
* across application attempts.
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||||
|
@ -65,8 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
.AMRMTokenSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||||
|
@ -213,14 +213,20 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
AllocateResponse lastResponse = lock.getAllocateResponse();
|
AllocateResponse lastResponse = lock.getAllocateResponse();
|
||||||
if (hasApplicationMasterRegistered(applicationAttemptId)) {
|
if (hasApplicationMasterRegistered(applicationAttemptId)) {
|
||||||
String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
|
// allow UAM re-register if work preservation is enabled
|
||||||
LOG.warn(message);
|
ApplicationSubmissionContext appContext =
|
||||||
RMAuditLogger.logFailure(
|
rmContext.getRMApps().get(appID).getApplicationSubmissionContext();
|
||||||
this.rmContext.getRMApps()
|
if (!(appContext.getUnmanagedAM()
|
||||||
.get(appID).getUser(),
|
&& appContext.getKeepContainersAcrossApplicationAttempts())) {
|
||||||
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
|
String message =
|
||||||
appID, applicationAttemptId);
|
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID;
|
||||||
throw new InvalidApplicationMasterRequestException(message);
|
LOG.warn(message);
|
||||||
|
RMAuditLogger.logFailure(
|
||||||
|
this.rmContext.getRMApps().get(appID).getUser(),
|
||||||
|
AuditConstants.REGISTER_AM, "", "ApplicationMasterService",
|
||||||
|
message, appID, applicationAttemptId);
|
||||||
|
throw new InvalidApplicationMasterRequestException(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
||||||
|
|
|
@ -148,6 +148,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
|
||||||
.getTransferredContainers(applicationAttemptId);
|
.getTransferredContainers(applicationAttemptId);
|
||||||
if (!transferredContainers.isEmpty()) {
|
if (!transferredContainers.isEmpty()) {
|
||||||
response.setContainersFromPreviousAttempts(transferredContainers);
|
response.setContainersFromPreviousAttempts(transferredContainers);
|
||||||
|
// Clear the node set remembered by the secret manager. Necessary
|
||||||
|
// for UAM restart because we use the same attemptId.
|
||||||
|
rmContext.getNMTokenSecretManager()
|
||||||
|
.clearNodeSetForAttempt(applicationAttemptId);
|
||||||
|
|
||||||
List<NMToken> nmTokens = new ArrayList<NMToken>();
|
List<NMToken> nmTokens = new ArrayList<NMToken>();
|
||||||
for (Container container : transferredContainers) {
|
for (Container container : transferredContainers) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -363,7 +363,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
|
|
||||||
// Transitions from RUNNING State
|
// Transitions from RUNNING State
|
||||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
||||||
RMAppAttemptEventType.LAUNCHED)
|
EnumSet.of(
|
||||||
|
RMAppAttemptEventType.LAUNCHED,
|
||||||
|
// Valid only for UAM restart
|
||||||
|
RMAppAttemptEventType.REGISTERED))
|
||||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
|
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
|
||||||
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
|
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
|
||||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
||||||
|
@ -1236,7 +1239,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
|
|
||||||
if (appAttempt.submissionContext
|
if (appAttempt.submissionContext
|
||||||
.getKeepContainersAcrossApplicationAttempts()
|
.getKeepContainersAcrossApplicationAttempts()
|
||||||
&& !appAttempt.submissionContext.getUnmanagedAM()
|
|
||||||
&& rmApp.getCurrentAppAttempt() != appAttempt) {
|
&& rmApp.getCurrentAppAttempt() != appAttempt) {
|
||||||
appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
|
appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
|
||||||
}
|
}
|
||||||
|
|
|
@ -243,17 +243,18 @@ public abstract class AbstractYarnScheduler
|
||||||
ApplicationId appId = currentAttempt.getApplicationId();
|
ApplicationId appId = currentAttempt.getApplicationId();
|
||||||
SchedulerApplication<T> app = applications.get(appId);
|
SchedulerApplication<T> app = applications.get(appId);
|
||||||
List<Container> containerList = new ArrayList<Container>();
|
List<Container> containerList = new ArrayList<Container>();
|
||||||
RMApp appImpl = this.rmContext.getRMApps().get(appId);
|
|
||||||
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
|
|
||||||
return containerList;
|
|
||||||
}
|
|
||||||
if (app == null) {
|
if (app == null) {
|
||||||
return containerList;
|
return containerList;
|
||||||
}
|
}
|
||||||
Collection<RMContainer> liveContainers =
|
Collection<RMContainer> liveContainers =
|
||||||
app.getCurrentAppAttempt().getLiveContainers();
|
app.getCurrentAppAttempt().getLiveContainers();
|
||||||
ContainerId amContainerId = rmContext.getRMApps().get(appId)
|
ContainerId amContainerId = null;
|
||||||
.getCurrentAppAttempt().getMasterContainer().getId();
|
// For UAM, amContainer would be null
|
||||||
|
if (rmContext.getRMApps().get(appId).getCurrentAppAttempt()
|
||||||
|
.getMasterContainer() != null) {
|
||||||
|
amContainerId = rmContext.getRMApps().get(appId).getCurrentAppAttempt()
|
||||||
|
.getMasterContainer().getId();
|
||||||
|
}
|
||||||
for (RMContainer rmContainer : liveContainers) {
|
for (RMContainer rmContainer : liveContainers) {
|
||||||
if (!rmContainer.getContainerId().equals(amContainerId)) {
|
if (!rmContainer.getContainerId().equals(amContainerId)) {
|
||||||
containerList.add(rmContainer.getContainer());
|
containerList.add(rmContainer.getContainer());
|
||||||
|
|
|
@ -0,0 +1,159 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test UAM handling in RM.
|
||||||
|
*/
|
||||||
|
public class TestWorkPreservingUnmanagedAM
|
||||||
|
extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
|
||||||
|
public TestWorkPreservingUnmanagedAM(SchedulerType type) throws IOException {
|
||||||
|
super(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
|
conf = getConf();
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
DefaultMetricsSystem.setMiniClusterMode(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test UAM work preserving restart. When the keepContainersAcrossAttempt flag
|
||||||
|
* is on, we allow UAM to directly register again and move on without getting
|
||||||
|
* the applicationAlreadyRegistered exception.
|
||||||
|
*/
|
||||||
|
protected void testUAMRestart(boolean keepContainers) throws Exception {
|
||||||
|
// start RM
|
||||||
|
MockRM rm = new MockRM();
|
||||||
|
rm.start();
|
||||||
|
MockNM nm =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||||
|
nm.registerNode();
|
||||||
|
|
||||||
|
// create app and launch the UAM
|
||||||
|
boolean unamanged = true;
|
||||||
|
int maxAttempts = 1;
|
||||||
|
boolean waitForAccepted = true;
|
||||||
|
RMApp app = rm.submitApp(200, "",
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName(), null,
|
||||||
|
unamanged, null, maxAttempts, null, null, waitForAccepted,
|
||||||
|
keepContainers);
|
||||||
|
|
||||||
|
MockAM am = MockRM.launchUAM(app, rm, nm);
|
||||||
|
|
||||||
|
// Register for the first time
|
||||||
|
am.registerAppAttempt();
|
||||||
|
|
||||||
|
// Allocate two containers to UAM
|
||||||
|
int numContainers = 3;
|
||||||
|
List<Container> conts = am.allocate("127.0.0.1", 1000, numContainers,
|
||||||
|
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||||
|
while (conts.size() < numContainers) {
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release one container
|
||||||
|
List<ContainerId> releaseList =
|
||||||
|
Collections.singletonList(conts.get(0).getId());
|
||||||
|
List<ContainerStatus> finishedConts =
|
||||||
|
am.allocate(new ArrayList<ResourceRequest>(), releaseList)
|
||||||
|
.getCompletedContainersStatuses();
|
||||||
|
while (finishedConts.size() < releaseList.size()) {
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
finishedConts
|
||||||
|
.addAll(am
|
||||||
|
.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>())
|
||||||
|
.getCompletedContainersStatuses());
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register for the second time
|
||||||
|
RegisterApplicationMasterResponse response = null;
|
||||||
|
try {
|
||||||
|
response = am.registerAppAttempt(false);
|
||||||
|
} catch (InvalidApplicationMasterRequestException e) {
|
||||||
|
Assert.assertEquals(false, keepContainers);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Assert.assertEquals("RM should not allow second register"
|
||||||
|
+ " for UAM without keep container flag ", true, keepContainers);
|
||||||
|
|
||||||
|
// Expecting the two running containers previously
|
||||||
|
Assert.assertEquals(2, response.getContainersFromPreviousAttempts().size());
|
||||||
|
Assert.assertEquals(1, response.getNMTokensFromPreviousAttempts().size());
|
||||||
|
|
||||||
|
// Allocate one more containers to UAM, just to be safe
|
||||||
|
numContainers = 1;
|
||||||
|
am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>());
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
conts = am.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||||
|
while (conts.size() < numContainers) {
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testUAMRestartKeepContainers() throws Exception {
|
||||||
|
testUAMRestart(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testUAMRestartNoKeepContainers() throws Exception {
|
||||||
|
testUAMRestart(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue