YARN-1041. Added the ApplicationMasterProtocol API for applications to use the ability in ResourceManager to optionally not kill containers when the ApplicationMaster exits. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1557318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-11 07:07:17 +00:00
parent f677175f35
commit 25bc68d15e
16 changed files with 2196 additions and 49 deletions

View File

@ -63,6 +63,10 @@ Release 2.4.0 - UNRELEASED
YARN-1033. Expose RM active/standby state to Web UI and REST API (kasha)
YARN-1041. Added the ApplicationMasterProtocol API for applications to use the
ability in ResourceManager to optionally not kill containers when the
ApplicationMaster exits. (Jian He via vinodkv)
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -27,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
@ -47,16 +49,19 @@ import org.apache.hadoop.yarn.util.Records;
@Public
@Stable
public abstract class RegisterApplicationMasterResponse {
@Private
@Unstable
public static RegisterApplicationMasterResponse newInstance(
Resource minCapability, Resource maxCapability,
Map<ApplicationAccessType, String> acls, ByteBuffer key) {
Map<ApplicationAccessType, String> acls, ByteBuffer key,
List<Container> containersFromPreviousAttempt) {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(maxCapability);
response.setApplicationACLs(acls);
response.setClientToAMTokenMasterKey(key);
response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
return response;
}
@ -105,4 +110,30 @@ public abstract class RegisterApplicationMasterResponse {
@Public
@Stable
public abstract void setClientToAMTokenMasterKey(ByteBuffer key);
/**
* <p>
* Get the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt.
* </p>
*
* @return the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt
*/
@Public
@Unstable
public abstract List<Container> getContainersFromPreviousAttempt();
/**
* Set the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt.
*
* @param containersFromPreviousAttempt
* the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt.
*/
@Private
@Unstable
public abstract void setContainersFromPreviousAttempt(
List<Container> containersFromPreviousAttempt);
}

View File

@ -44,6 +44,7 @@ message RegisterApplicationMasterResponseProto {
optional ResourceProto maximumCapability = 1;
optional bytes client_to_am_token_master_key = 2;
repeated ApplicationACLMapProto application_ACLs = 3;
repeated ContainerProto containers_from_previous_attempt = 4;
}
message FinishApplicationMasterRequestProto {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -29,10 +30,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
@ -52,6 +56,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
private Resource maximumResourceCapability;
private Map<ApplicationAccessType, String> applicationACLS = null;
private List<Container> containersFromPreviousAttempt = null;
public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder();
@ -105,6 +110,9 @@ public class RegisterApplicationMasterResponsePBImpl extends
if (this.applicationACLS != null) {
addApplicationACLs();
}
if (this.containersFromPreviousAttempt != null) {
addRunningContainersToProto();
}
}
@ -226,6 +234,43 @@ public class RegisterApplicationMasterResponsePBImpl extends
ByteBuffer.wrap(builder.getClientToAmTokenMasterKey().toByteArray());
return key;
}
@Override
public List<Container> getContainersFromPreviousAttempt() {
if (this.containersFromPreviousAttempt != null) {
return this.containersFromPreviousAttempt;
}
initRunningContainersList();
return this.containersFromPreviousAttempt;
}
@Override
public void setContainersFromPreviousAttempt(final List<Container> containers) {
if (containers == null) {
return;
}
this.containersFromPreviousAttempt = new ArrayList<Container>();
this.containersFromPreviousAttempt.addAll(containers);
}
private void initRunningContainersList() {
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersFromPreviousAttemptList();
containersFromPreviousAttempt = new ArrayList<Container>();
for (ContainerProto c : list) {
containersFromPreviousAttempt.add(convertFromProtoFormat(c));
}
}
private void addRunningContainersToProto() {
maybeInitBuilder();
builder.clearContainersFromPreviousAttempt();
List<ContainerProto> list = new ArrayList<ContainerProto>();
for (Container c : containersFromPreviousAttempt) {
list.add(convertToProtoFormat(c));
}
builder.addAllContainersFromPreviousAttempt(list);
}
private Resource convertFromProtoFormat(ResourceProto resource) {
return new ResourcePBImpl(resource);
@ -235,4 +280,11 @@ public class RegisterApplicationMasterResponsePBImpl extends
return ((ResourcePBImpl)resource).getProto();
}
private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
return new ContainerPBImpl(p);
}
private ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl) t).getProto();
}
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
@ -78,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -271,6 +273,11 @@ public class ApplicationMasterService extends AbstractService implements
.getClientToAMTokenSecretManager()
.getMasterKey(applicationAttemptId).getEncoded()));
}
List<Container> containerList =
((AbstractYarnScheduler) rScheduler)
.getTransferredContainers(applicationAttemptId);
response.setContainersFromPreviousAttempt(containerList);
return response;
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
public class AbstractYarnScheduler {
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication> applications;
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
SchedulerApplication app = applications.get(appId);
List<Container> containerList = new ArrayList<Container>();
RMApp appImpl = this.rmContext.getRMApps().get(appId);
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
return containerList;
}
Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
ContainerId amContainerId =
rmContext.getRMApps().get(appId).getCurrentAppAttempt()
.getMasterContainer().getId();
for (RMContainer rmContainer : liveContainers) {
if (!rmContainer.getContainerId().equals(amContainerId)) {
containerList.add(rmContainer.getContainer());
}
}
return containerList;
}
public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
return applications;
}
}

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -95,7 +96,7 @@ import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
public class CapacityScheduler
public class CapacityScheduler extends AbstractYarnScheduler
implements PreemptableResourceScheduler, CapacitySchedulerContext,
Configurable {
@ -177,7 +178,6 @@ public class CapacityScheduler
private CapacitySchedulerConfiguration conf;
private Configuration yarnConf;
private RMContext rmContext;
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
@ -191,10 +191,6 @@ public class CapacityScheduler
private Resource minimumAllocation;
private Resource maximumAllocation;
@VisibleForTesting
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
private boolean initialized = false;
private ResourceCalculator calculator;
@ -271,9 +267,10 @@ public class CapacityScheduler
this.maximumAllocation = this.conf.getMaximumAllocation();
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
this.rmContext = rmContext;
initializeQueues(this.conf);
initialized = true;

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -120,10 +121,10 @@ import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
public class FairScheduler implements ResourceScheduler {
public class FairScheduler extends AbstractYarnScheduler implements
ResourceScheduler {
private boolean initialized;
private FairSchedulerConfiguration conf;
private RMContext rmContext;
private Resource minimumAllocation;
private Resource maximumAllocation;
private Resource incrAllocation;
@ -157,11 +158,6 @@ public class FairScheduler implements ResourceScheduler {
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
// This stores per-application scheduling information,
@VisibleForTesting
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@ -1235,6 +1231,9 @@ public class FairScheduler implements ResourceScheduler {
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
// This stores per-application scheduling information
this.applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@ -1357,5 +1356,4 @@ public class FairScheduler implements ResourceScheduler {
queue.collectSchedulerApplications(apps);
return apps;
}
}

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@ -104,7 +105,8 @@ import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
public class FifoScheduler implements ResourceScheduler, Configurable {
public class FifoScheduler extends AbstractYarnScheduler implements
ResourceScheduler, Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@ -115,7 +117,6 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
private RMContext rmContext;
protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
@ -124,11 +125,6 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private Resource maximumAllocation;
private boolean usePortForNodeName;
// Use ConcurrentSkipListMap because applications need to be ordered
@VisibleForTesting
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
@ -243,6 +239,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
if (!this.initialized) {
validateConf(conf);
this.rmContext = rmContext;
//Use ConcurrentSkipListMap because applications need to be ordered
this.applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,

View File

@ -24,6 +24,7 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@ -150,7 +151,29 @@ public class TestAMRestart {
ApplicationAttemptId newAttemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
// launch the new AM
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
nm1.nodeHeartbeat(true);
MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
RegisterApplicationMasterResponse registerResponse =
am2.registerAppAttempt();
// Assert two containers are running: container2 and container3;
Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt()
.size());
boolean containerId2Exists = false, containerId3Exists = false;
for (Container container : registerResponse
.getContainersFromPreviousAttempt()) {
if (container.getId().equals(containerId2)) {
containerId2Exists = true;
}
if (container.getId().equals(containerId3)) {
containerId3Exists = true;
}
}
Assert.assertTrue(containerId2Exists && containerId3Exists);
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// complete container by sending the container complete event which has earlier
// attempt's attemptId

View File

@ -642,7 +642,7 @@ public class TestCapacityScheduler {
SchedulerApplication app =
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
cs.applications, cs, "a1");
cs.getSchedulerApplications(), cs, "a1");
Assert.assertEquals("a1", app.getQueue().getQueueName());
}
}

View File

@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@ -260,7 +259,7 @@ public class TestFairScheduler {
scheduler.addApplication(id.getApplicationId(), queueId, userId);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.applications.containsKey(id.getApplicationId())) {
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
scheduler.addApplicationAttempt(id, false);
}
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
@ -2546,6 +2545,6 @@ public class TestFairScheduler {
FairScheduler scheduler =
(FairScheduler) resourceManager.getResourceScheduler();
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
scheduler.applications, scheduler, "default");
scheduler.getSchedulerApplications(), scheduler, "default");
}
}

View File

@ -591,8 +591,8 @@ public class TestFifoScheduler {
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler();
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(fs.applications,
fs, "queue");
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
fs.getSchedulerApplications(), fs, "queue");
}
private void checkApplicationResourceUsage(int expected,

View File

@ -0,0 +1,615 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
private final int GB = 1024;
private ResourceManager resourceManager = null;
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@Before
public void setUp() throws Exception {
resourceManager = new ResourceManager();
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
resourceManager.init(conf);
}
@After
public void tearDown() throws Exception {
resourceManager.stop();
}
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
registerNode(String hostName, int containerManagerPort, int nmHttpPort,
String rackName, Resource capability) throws IOException,
YarnException {
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
hostName, containerManagerPort, nmHttpPort, rackName, capability,
resourceManager);
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers) {
ResourceRequest request = recordFactory
.newRecordInstance(ResourceRequest.class);
request.setCapability(Resources.createResource(memory));
request.setResourceName(host);
request.setNumContainers(numContainers);
Priority prio = recordFactory.newRecordInstance(Priority.class);
prio.setPriority(priority);
request.setPriority(prio);
return request;
}
@Test(timeout=5000)
public void testFifoSchedulerCapacityWhenNoNMs() {
FifoScheduler scheduler = new FifoScheduler();
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity());
}
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null);
FifoScheduler schedular = new FifoScheduler();
schedular.reinitialize(new Configuration(), rmContext);
QueueMetrics metrics = schedular.getRootQueueMetrics();
int beforeAppsSubmitted = metrics.getAppsSubmitted();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
schedular.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
schedular.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
schedular.handle(attemptEvent2);
int afterAppsSubmitted = metrics.getAppsSubmitted();
Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
}
@Test(timeout=2000)
public void testNodeLocalAssignment() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
Configuration conf = new Configuration();
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null);
FifoScheduler scheduler = new FifoScheduler();
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
Resources.createResource(1024 * 64), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
scheduler.handle(nodeEvent1);
int _appId = 1;
int _appAttemptId = 1;
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
_appAttemptId);
AppAddedSchedulerEvent appEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
"user1");
scheduler.handle(appEvent);
AppAttemptAddedSchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
int memory = 64;
int nConts = 3;
int priority = 20;
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest nodeLocal = createResourceRequest(memory,
node0.getHostName(), priority, nConts);
ResourceRequest rackLocal = createResourceRequest(memory,
node0.getRackName(), priority, nConts);
ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
nConts);
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
// Before the node update event, there are 3 local requests outstanding
Assert.assertEquals(3, nodeLocal.getNumContainers());
scheduler.handle(node0Update);
// After the node update event, check that there are no more local requests
// outstanding
Assert.assertEquals(0, nodeLocal.getNumContainers());
//Also check that the containers were scheduled
SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
Assert.assertEquals(3, info.getLiveContainers().size());
}
@Test(timeout=2000)
public void testUpdateResourceOnNode() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
Configuration conf = new Configuration();
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null);
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")
public Map<NodeId, FiCaSchedulerNode> getNodes(){
return nodes;
}
};
scheduler.reinitialize(new Configuration(), rmContext);
RMNode node0 = MockNodes.newNodeInfo(1,
Resources.createResource(2048, 4), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
scheduler.handle(nodeEvent1);
Method method = scheduler.getClass().getDeclaredMethod("getNodes");
@SuppressWarnings("unchecked")
Map<NodeId, FiCaSchedulerNode> schedulerNodes =
(Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
assertEquals(schedulerNodes.values().size(), 1);
// set resource of RMNode to 1024 and verify it works.
node0.setResourceOption(ResourceOption.newInstance(
Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
assertEquals(node0.getTotalCapability().getMemory(), 1024);
// verify that SchedulerNode's resource hasn't been changed.
assertEquals(schedulerNodes.get(node0.getNodeID()).
getAvailableResource().getMemory(), 2048);
// now, NM heartbeat comes.
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
scheduler.handle(node0Update);
// SchedulerNode's available resource is changed.
assertEquals(schedulerNodes.get(node0.getNodeID()).
getAvailableResource().getMemory(), 1024);
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity());
int _appId = 1;
int _appAttemptId = 1;
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
_appAttemptId);
AppAddedSchedulerEvent appEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
"user1");
scheduler.handle(appEvent);
AppAttemptAddedSchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
int memory = 1024;
int priority = 1;
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest nodeLocal = createResourceRequest(memory,
node0.getHostName(), priority, 1);
ResourceRequest rackLocal = createResourceRequest(memory,
node0.getRackName(), priority, 1);
ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority,
1);
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
// Before the node update event, there are one local request
Assert.assertEquals(1, nodeLocal.getNumContainers());
// Now schedule.
scheduler.handle(node0Update);
// After the node update event, check no local request
Assert.assertEquals(0, nodeLocal.getNumContainers());
// Also check that one container was scheduled
SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId);
Assert.assertEquals(1, info.getLiveContainers().size());
// And check the default Queue now is full.
queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(1.0f, queueInfo.getCurrentCapacity());
}
// @Test
public void testFifoScheduler() throws Exception {
LOG.info("--- START: testFifoScheduler ---");
final int GB = 1024;
// Register node1
String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(4 * GB, 1));
nm_0.heartbeat();
// Register node2
String host_1 = "host_1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * GB, 1));
nm_1.heartbeat();
// ResourceRequest priorities
Priority priority_0 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0);
Priority priority_1 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1);
// Submit an application
Application application_0 = new Application("user_0", resourceManager);
application_0.submit();
application_0.addNodeManager(host_0, 1234, nm_0);
application_0.addNodeManager(host_1, 1234, nm_1);
Resource capability_0_0 = Resources.createResource(GB);
application_0.addResourceRequestSpec(priority_1, capability_0_0);
Resource capability_0_1 = Resources.createResource(2 * GB);
application_0.addResourceRequestSpec(priority_0, capability_0_1);
Task task_0_0 = new Task(application_0, priority_1,
new String[] {host_0, host_1});
application_0.addTask(task_0_0);
// Submit another application
Application application_1 = new Application("user_1", resourceManager);
application_1.submit();
application_1.addNodeManager(host_0, 1234, nm_0);
application_1.addNodeManager(host_1, 1234, nm_1);
Resource capability_1_0 = Resources.createResource(3 * GB);
application_1.addResourceRequestSpec(priority_1, capability_1_0);
Resource capability_1_1 = Resources.createResource(4 * GB);
application_1.addResourceRequestSpec(priority_0, capability_1_1);
Task task_1_0 = new Task(application_1, priority_1,
new String[] {host_0, host_1});
application_1.addTask(task_1_0);
// Send resource requests to the scheduler
LOG.info("Send resource requests to the scheduler");
application_0.schedule();
application_1.schedule();
// Send a heartbeat to kick the tires on the Scheduler
LOG.info("Send a heartbeat to kick the tires on the Scheduler... " +
"nm0 -> task_0_0 and task_1_0 allocated, used=4G " +
"nm1 -> nothing allocated");
nm_0.heartbeat(); // task_0_0 and task_1_0 allocated, used=4G
nm_1.heartbeat(); // nothing allocated
// Get allocations from the scheduler
application_0.schedule(); // task_0_0
checkApplicationResourceUsage(GB, application_0);
application_1.schedule(); // task_1_0
checkApplicationResourceUsage(3 * GB, application_1);
nm_0.heartbeat();
nm_1.heartbeat();
checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G)
checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available
LOG.info("Adding new tasks...");
Task task_1_1 = new Task(application_1, priority_1,
new String[] {ResourceRequest.ANY});
application_1.addTask(task_1_1);
Task task_1_2 = new Task(application_1, priority_1,
new String[] {ResourceRequest.ANY});
application_1.addTask(task_1_2);
Task task_1_3 = new Task(application_1, priority_0,
new String[] {ResourceRequest.ANY});
application_1.addTask(task_1_3);
application_1.schedule();
Task task_0_1 = new Task(application_0, priority_1,
new String[] {host_0, host_1});
application_0.addTask(task_0_1);
Task task_0_2 = new Task(application_0, priority_1,
new String[] {host_0, host_1});
application_0.addTask(task_0_2);
Task task_0_3 = new Task(application_0, priority_0,
new String[] {ResourceRequest.ANY});
application_0.addTask(task_0_3);
application_0.schedule();
// Send a heartbeat to kick the tires on the Scheduler
LOG.info("Sending hb from " + nm_0.getHostName());
nm_0.heartbeat(); // nothing new, used=4G
LOG.info("Sending hb from " + nm_1.getHostName());
nm_1.heartbeat(); // task_0_3, used=2G
// Get allocations from the scheduler
LOG.info("Trying to allocate...");
application_0.schedule();
checkApplicationResourceUsage(3 * GB, application_0);
application_1.schedule();
checkApplicationResourceUsage(3 * GB, application_1);
nm_0.heartbeat();
nm_1.heartbeat();
checkNodeResourceUsage(4*GB, nm_0);
checkNodeResourceUsage(2*GB, nm_1);
// Complete tasks
LOG.info("Finishing up task_0_0");
application_0.finishTask(task_0_0); // Now task_0_1
application_0.schedule();
application_1.schedule();
nm_0.heartbeat();
nm_1.heartbeat();
checkApplicationResourceUsage(3 * GB, application_0);
checkApplicationResourceUsage(3 * GB, application_1);
checkNodeResourceUsage(4*GB, nm_0);
checkNodeResourceUsage(2*GB, nm_1);
LOG.info("Finishing up task_1_0");
application_1.finishTask(task_1_0); // Now task_0_2
application_0.schedule(); // final overcommit for app0 caused here
application_1.schedule();
nm_0.heartbeat(); // final overcommit for app0 occurs here
nm_1.heartbeat();
checkApplicationResourceUsage(4 * GB, application_0);
checkApplicationResourceUsage(0 * GB, application_1);
//checkNodeResourceUsage(1*GB, nm_0); // final over-commit -> rm.node->1G, test.node=2G
checkNodeResourceUsage(2*GB, nm_1);
LOG.info("Finishing up task_0_3");
application_0.finishTask(task_0_3); // No more
application_0.schedule();
application_1.schedule();
nm_0.heartbeat();
nm_1.heartbeat();
checkApplicationResourceUsage(2 * GB, application_0);
checkApplicationResourceUsage(0 * GB, application_1);
//checkNodeResourceUsage(2*GB, nm_0); // final over-commit, rm.node->1G, test.node->2G
checkNodeResourceUsage(0*GB, nm_1);
LOG.info("Finishing up task_0_1");
application_0.finishTask(task_0_1);
application_0.schedule();
application_1.schedule();
nm_0.heartbeat();
nm_1.heartbeat();
checkApplicationResourceUsage(1 * GB, application_0);
checkApplicationResourceUsage(0 * GB, application_1);
LOG.info("Finishing up task_0_2");
application_0.finishTask(task_0_2); // now task_1_3 can go!
application_0.schedule();
application_1.schedule();
nm_0.heartbeat();
nm_1.heartbeat();
checkApplicationResourceUsage(0 * GB, application_0);
checkApplicationResourceUsage(4 * GB, application_1);
LOG.info("Finishing up task_1_3");
application_1.finishTask(task_1_3); // now task_1_1
application_0.schedule();
application_1.schedule();
nm_0.heartbeat();
nm_1.heartbeat();
checkApplicationResourceUsage(0 * GB, application_0);
checkApplicationResourceUsage(3 * GB, application_1);
LOG.info("Finishing up task_1_1");
application_1.finishTask(task_1_1);
application_0.schedule();
application_1.schedule();
nm_0.heartbeat();
nm_1.heartbeat();
checkApplicationResourceUsage(0 * GB, application_0);
checkApplicationResourceUsage(3 * GB, application_1);
LOG.info("--- END: testFifoScheduler ---");
}
@SuppressWarnings("resource")
@Test
public void testBlackListNodes() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
fs.handle(new NodeAddedSchedulerEvent(node));
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent appEvent =
new AppAddedSchedulerEvent(appId, "default",
"user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
fs.handle(attemptEvent);
// Verify the blacklist can be updated independent of requesting containers
fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
rm.stop();
}
@Test
public void testGetAppsInQueue() throws Exception {
Application application_0 = new Application("user_0", resourceManager);
application_0.submit();
Application application_1 = new Application("user_0", resourceManager);
application_1.submit();
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
List<ApplicationAttemptId> appsInDefault = scheduler.getAppsInQueue("default");
assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId()));
assertEquals(2, appsInDefault.size());
Assert.assertNull(scheduler.getAppsInQueue("someotherqueue"));
}
@Test
public void testAddAndRemoveAppFromFiFoScheduler() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler();
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(fs.applications,
fs, "queue");
}
private void checkApplicationResourceUsage(int expected,
Application application) {
Assert.assertEquals(expected, application.getUsedResources().getMemory());
}
private void checkNodeResourceUsage(int expected,
org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) {
Assert.assertEquals(expected, node.getUsed().getMemory());
node.checkResourceUsage();
}
public static void main(String[] arg) throws Exception {
TestFifoScheduler t = new TestFifoScheduler();
t.setUp();
t.testFifoScheduler();
t.tearDown();
}
}

View File

@ -41,9 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -1387,31 +1384,30 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.stop();
}
@Test
@Test (timeout = 20000)
public void testMultipleAppAttempts() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 8192);
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.ALLOCATED);
MockAM am = MockRM.launchAM(app1, rm, amNodeManager);
int maxAppAttempts = rm.getConfig().getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
assertTrue(maxAppAttempts > 1);
int retriesLeft = maxAppAttempts;
while (--retriesLeft > 0) {
RMAppEvent event =
new RMAppFailedAttemptEvent(app1.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "", false);
app1.handle(event);
int numAttempt = 1;
while (true) {
// fail the AM by sending CONTAINER_FINISHED event without registering.
amNodeManager.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FAILED);
if (numAttempt == maxAppAttempts) {
rm.waitForState(app1.getApplicationId(), RMAppState.FAILED);
break;
}
// wait for app to start a new attempt.
rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
amNodeManager.nodeHeartbeat(true);
am = MockRM.launchAM(app1, rm, amNodeManager);
numAttempt++;
}
// kick the scheduler to allocate the am container.
amNodeManager.nodeHeartbeat(true);
rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.ALLOCATED);
assertEquals("incorrect number of attempts", maxAppAttempts,
app1.getAppAttempts().values().size());
testAppAttemptsHelper(app1.getApplicationId().toString(), app1,