YARN-4779. Fix AM container allocation logic in SLS. Contributed by Wangda Tan.

This commit is contained in:
Sunil G 2017-02-24 21:39:25 +05:30
parent e8694deb6a
commit b32ffa2753
5 changed files with 305 additions and 161 deletions

View File

@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -55,12 +56,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
@ -119,10 +122,10 @@ public class SLSRunner {
this.printSimulation = printsimulation;
metricsOutputDir = outputDir;
nmMap = new HashMap<NodeId, NMSimulator>();
queueAppNumMap = new HashMap<String, Integer>();
amMap = new HashMap<String, AMSimulator>();
amClassMap = new HashMap<String, Class>();
nmMap = new HashMap<>();
queueAppNumMap = new HashMap<>();
amMap = new ConcurrentHashMap<>();
amClassMap = new HashMap<>();
// runner configuration
conf = new Configuration(false);
@ -179,7 +182,14 @@ public class SLSRunner {
}
rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
rm = new ResourceManager();
final SLSRunner se = this;
rm = new ResourceManager() {
@Override
protected ApplicationMasterLauncher createAMLauncher() {
return new MockAMLauncher(se, this.rmContext, amMap);
}
};
rm.init(rmConf);
rm.start();
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
@ -107,11 +109,19 @@ public abstract class AMSimulator extends TaskRunner.Task {
// progress
protected int totalContainers;
protected int finishedContainers;
// waiting for AM container
volatile boolean isAMContainerRunning = false;
volatile Container amContainer;
protected final Logger LOG = Logger.getLogger(AMSimulator.class);
// resource for AM container
private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<AllocateResponse>();
this.responseQueue = new LinkedBlockingQueue<>();
}
public void init(int id, int heartbeatInterval,
@ -142,23 +152,30 @@ public abstract class AMSimulator extends TaskRunner.Task {
// submit application, waiting until ACCEPTED
submitApp();
// register application master
registerAM();
// track app metrics
trackApp();
}
public synchronized void notifyAMContainerLaunched(Container masterContainer)
throws Exception {
this.amContainer = masterContainer;
this.appAttemptId = masterContainer.getId().getApplicationAttemptId();
registerAM();
isAMContainerRunning = true;
}
@Override
public void middleStep() throws Exception {
// process responses in the queue
processResponseQueue();
// send out request
sendContainerRequest();
// check whether finish
checkStop();
if (isAMContainerRunning) {
// process responses in the queue
processResponseQueue();
// send out request
sendContainerRequest();
// check whether finish
checkStop();
}
}
@Override
@ -168,6 +185,22 @@ public abstract class AMSimulator extends TaskRunner.Task {
if (isTracked) {
untrackApp();
}
// Finish AM container
if (amContainer != null) {
LOG.info("AM container = " + amContainer.getId() + " reported to finish");
se.getNmMap().get(amContainer.getNodeId()).cleanupContainer(
amContainer.getId());
} else {
LOG.info("AM container is null");
}
if (null == appAttemptId) {
// If appAttemptId == null, AM is not launched from RM's perspective, so
// it's unnecessary to finish am as well
return;
}
// unregister application master
final FinishApplicationMasterRequest finishAMRequest = recordFactory
.newRecordInstance(FinishApplicationMasterRequest.class);
@ -256,7 +289,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
conLauContext.setLocalResources(new HashMap<String, LocalResource>());
conLauContext.setServiceData(new HashMap<String, ByteBuffer>());
appSubContext.setAMContainerSpec(conLauContext);
appSubContext.setUnmanagedAM(true);
appSubContext.setResource(Resources
.createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
MR_AM_CONTAINER_RESOURCE_VCORES));
subAppRequest.setApplicationSubmissionContext(appSubContext);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@ -267,22 +302,6 @@ public abstract class AMSimulator extends TaskRunner.Task {
}
});
LOG.info(MessageFormat.format("Submit a new application {0}", appId));
// waiting until application ACCEPTED
RMApp app = rm.getRMContext().getRMApps().get(appId);
while(app.getState() != RMAppState.ACCEPTED) {
Thread.sleep(10);
}
// Waiting until application attempt reach LAUNCHED
// "Unmanaged AM must register after AM attempt reaches LAUNCHED state"
this.appAttemptId = rm.getRMContext().getRMApps().get(appId)
.getCurrentAppAttempt().getAppAttemptId();
RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId)
.getCurrentAppAttempt();
while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) {
Thread.sleep(10);
}
}
private void registerAM()
@ -335,7 +354,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
for (ContainerSimulator cs : csList) {
String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname());
// check rack local
String rackname = rackHostNames[0];
String rackname = "/" + rackHostNames[0];
if (rackLocalRequestMap.containsKey(rackname)) {
rackLocalRequestMap.get(rackname).setNumContainers(
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
@ -383,4 +402,12 @@ public abstract class AMSimulator extends TaskRunner.Task {
public int getNumTasks() {
return totalContainers;
}
public ApplicationId getApplicationId() {
return appId;
}
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
}

View File

@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.avro.Protocol;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
@ -63,10 +64,10 @@ public class MRAMSimulator extends AMSimulator {
private static final int PRIORITY_REDUCE = 10;
private static final int PRIORITY_MAP = 20;
// pending maps
private LinkedList<ContainerSimulator> pendingMaps =
new LinkedList<ContainerSimulator>();
new LinkedList<>();
// pending failed maps
private LinkedList<ContainerSimulator> pendingFailedMaps =
@ -107,14 +108,9 @@ public class MRAMSimulator extends AMSimulator {
private int mapTotal = 0;
private int reduceFinished = 0;
private int reduceTotal = 0;
// waiting for AM container
private boolean isAMContainerRunning = false;
private Container amContainer;
// finished
private boolean isFinished = false;
// resource for AM container
private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
@ -131,83 +127,34 @@ public class MRAMSimulator extends AMSimulator {
for (ContainerSimulator cs : containerList) {
if (cs.getType().equals("map")) {
cs.setPriority(PRIORITY_MAP);
pendingMaps.add(cs);
allMaps.add(cs);
} else if (cs.getType().equals("reduce")) {
cs.setPriority(PRIORITY_REDUCE);
pendingReduces.add(cs);
allReduces.add(cs);
}
}
allMaps.addAll(pendingMaps);
allReduces.addAll(pendingReduces);
mapTotal = pendingMaps.size();
reduceTotal = pendingReduces.size();
LOG.info(MessageFormat
.format("Added new job with {0} mapper and {1} reducers",
allMaps.size(), allReduces.size()));
mapTotal = allMaps.size();
reduceTotal = allReduces.size();
totalContainers = mapTotal + reduceTotal;
}
@Override
public void firstStep() throws Exception {
super.firstStep();
requestAMContainer();
}
/**
* send out request for AM container
*/
protected void requestAMContainer()
throws YarnException, IOException, InterruptedException {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest amRequest = createResourceRequest(
BuilderUtils.newResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
MR_AM_CONTAINER_RESOURCE_VCORES),
ResourceRequest.ANY, 1, 1);
ask.add(amRequest);
LOG.debug(MessageFormat.format("Application {0} sends out allocate " +
"request for its AM", appId));
final AllocateRequest request = this.createAllocateRequest(ask);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
.get(appAttemptId.getApplicationId())
.getRMAppAttempt(appAttemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
AllocateResponse response = ugi.doAs(
new PrivilegedExceptionAction<AllocateResponse>() {
@Override
public AllocateResponse run() throws Exception {
return rm.getApplicationMasterService().allocate(request);
}
});
if (response != null) {
responseQueue.put(response);
public synchronized void notifyAMContainerLaunched(Container masterContainer)
throws Exception {
if (null != masterContainer) {
restart();
super.notifyAMContainerLaunched(masterContainer);
}
}
@Override
@SuppressWarnings("unchecked")
protected void processResponseQueue()
throws InterruptedException, YarnException, IOException {
// Check whether receive the am container
if (!isAMContainerRunning) {
if (!responseQueue.isEmpty()) {
AllocateResponse response = responseQueue.take();
if (response != null
&& !response.getAllocatedContainers().isEmpty()) {
// Get AM container
Container container = response.getAllocatedContainers().get(0);
se.getNmMap().get(container.getNodeId())
.addNewContainer(container, -1L);
// Start AM container
amContainer = container;
LOG.debug(MessageFormat.format("Application {0} starts its " +
"AM container ({1}).", appId, amContainer.getId()));
isAMContainerRunning = true;
}
}
return;
}
protected void processResponseQueue() throws Exception {
while (! responseQueue.isEmpty()) {
AllocateResponse response = responseQueue.take();
@ -228,12 +175,16 @@ public class MRAMSimulator extends AMSimulator {
assignedReduces.remove(containerId);
reduceFinished ++;
finishedContainers ++;
} else {
} else if (amContainer.getId().equals(containerId)){
// am container released event
isFinished = true;
LOG.info(MessageFormat.format("Application {0} goes to " +
"finish.", appId));
}
if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) {
lastStep();
}
} else {
// container to be killed
if (assignedMaps.containsKey(containerId)) {
@ -244,10 +195,9 @@ public class MRAMSimulator extends AMSimulator {
LOG.debug(MessageFormat.format("Application {0} has one " +
"reducer killed ({1}).", appId, containerId));
pendingFailedReduces.add(assignedReduces.remove(containerId));
} else {
} else if (amContainer.getId().equals(containerId)){
LOG.info(MessageFormat.format("Application {0}'s AM is " +
"going to be killed. Restarting...", appId));
restart();
"going to be killed. Waiting for rescheduling...", appId));
}
}
}
@ -255,11 +205,8 @@ public class MRAMSimulator extends AMSimulator {
// check finished
if (isAMContainerRunning &&
(mapFinished == mapTotal) &&
(reduceFinished == reduceTotal)) {
// to release the AM container
se.getNmMap().get(amContainer.getNodeId())
.cleanupContainer(amContainer.getId());
(mapFinished >= mapTotal) &&
(reduceFinished >= reduceTotal)) {
isAMContainerRunning = false;
LOG.debug(MessageFormat.format("Application {0} sends out event " +
"to clean up its AM container.", appId));
@ -293,21 +240,38 @@ public class MRAMSimulator extends AMSimulator {
*/
private void restart()
throws YarnException, IOException, InterruptedException {
// clear
finishedContainers = 0;
// clear
isFinished = false;
mapFinished = 0;
reduceFinished = 0;
pendingFailedMaps.clear();
pendingMaps.clear();
pendingReduces.clear();
pendingFailedReduces.clear();
pendingMaps.addAll(allMaps);
pendingReduces.addAll(pendingReduces);
isAMContainerRunning = false;
// Only add totalMaps - finishedMaps
int added = 0;
for (ContainerSimulator cs : allMaps) {
if (added >= mapTotal - mapFinished) {
break;
}
pendingMaps.add(cs);
}
// And same, only add totalReduces - finishedReduces
added = 0;
for (ContainerSimulator cs : allReduces) {
if (added >= reduceTotal - reduceFinished) {
break;
}
pendingReduces.add(cs);
}
amContainer = null;
// resent am container request
requestAMContainer();
}
private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left, List<ContainerSimulator> right) {
List<ContainerSimulator> list = new ArrayList<>();
list.addAll(left);
list.addAll(right);
return list;
}
@Override
@ -319,44 +283,48 @@ public class MRAMSimulator extends AMSimulator {
// send out request
List<ResourceRequest> ask = null;
if (isAMContainerRunning) {
if (mapFinished != mapTotal) {
// map phase
if (! pendingMaps.isEmpty()) {
ask = packageRequests(pendingMaps, PRIORITY_MAP);
LOG.debug(MessageFormat.format("Application {0} sends out " +
"request for {1} mappers.", appId, pendingMaps.size()));
scheduledMaps.addAll(pendingMaps);
pendingMaps.clear();
} else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) {
ask = packageRequests(pendingFailedMaps, PRIORITY_MAP);
LOG.debug(MessageFormat.format("Application {0} sends out " +
"requests for {1} failed mappers.", appId,
pendingFailedMaps.size()));
scheduledMaps.addAll(pendingFailedMaps);
pendingFailedMaps.clear();
}
} else if (reduceFinished != reduceTotal) {
// reduce phase
if (! pendingReduces.isEmpty()) {
ask = packageRequests(pendingReduces, PRIORITY_REDUCE);
LOG.debug(MessageFormat.format("Application {0} sends out " +
"requests for {1} reducers.", appId, pendingReduces.size()));
scheduledReduces.addAll(pendingReduces);
pendingReduces.clear();
} else if (! pendingFailedReduces.isEmpty()
&& scheduledReduces.isEmpty()) {
ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE);
LOG.debug(MessageFormat.format("Application {0} sends out " +
"request for {1} failed reducers.", appId,
pendingFailedReduces.size()));
scheduledReduces.addAll(pendingFailedReduces);
pendingFailedReduces.clear();
}
if (mapFinished != mapTotal) {
// map phase
if (!pendingMaps.isEmpty()) {
ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
PRIORITY_MAP);
LOG.debug(MessageFormat
.format("Application {0} sends out " + "request for {1} mappers.",
appId, pendingMaps.size()));
scheduledMaps.addAll(pendingMaps);
pendingMaps.clear();
} else if (!pendingFailedMaps.isEmpty()) {
ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps),
PRIORITY_MAP);
LOG.debug(MessageFormat.format(
"Application {0} sends out " + "requests for {1} failed mappers.",
appId, pendingFailedMaps.size()));
scheduledMaps.addAll(pendingFailedMaps);
pendingFailedMaps.clear();
}
} else if (reduceFinished != reduceTotal) {
// reduce phase
if (!pendingReduces.isEmpty()) {
ask = packageRequests(mergeLists(pendingReduces, scheduledReduces),
PRIORITY_REDUCE);
LOG.debug(MessageFormat
.format("Application {0} sends out " + "requests for {1} reducers.",
appId, pendingReduces.size()));
scheduledReduces.addAll(pendingReduces);
pendingReduces.clear();
} else if (!pendingFailedReduces.isEmpty()) {
ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces),
PRIORITY_REDUCE);
LOG.debug(MessageFormat.format(
"Application {0} sends out " + "request for {1} failed reducers.",
appId, pendingFailedReduces.size()));
scheduledReduces.addAll(pendingFailedReduces);
pendingFailedReduces.clear();
}
}
if (ask == null) {
ask = new ArrayList<ResourceRequest>();
ask = new ArrayList<>();
}
final AllocateRequest request = createAllocateRequest(ask);

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import java.util.Map;
public class MockAMLauncher extends ApplicationMasterLauncher
implements EventHandler<AMLauncherEvent> {
private static final Log LOG = LogFactory.getLog(
MockAMLauncher.class);
Map<String, AMSimulator> amMap;
SLSRunner se;
public MockAMLauncher(SLSRunner se, RMContext rmContext,
Map<String, AMSimulator> amMap) {
super(rmContext);
this.amMap = amMap;
this.se = se;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Do nothing
}
@Override
protected void serviceStart() throws Exception {
// Do nothing
}
private void setupAMRMToken(RMAppAttempt appAttempt) {
// Setup AMRMToken
Token<AMRMTokenIdentifier> amrmToken =
super.context.getAMRMTokenSecretManager().createAndGetAMRMToken(
appAttempt.getAppAttemptId());
((RMAppAttemptImpl) appAttempt).setAMRMToken(amrmToken);
}
@Override
@SuppressWarnings("unchecked")
public void handle(AMLauncherEvent event) {
if (AMLauncherEventType.LAUNCH == event.getType()) {
ApplicationId appId =
event.getAppAttempt().getAppAttemptId().getApplicationId();
// find AMSimulator
for (AMSimulator ams : amMap.values()) {
if (ams.getApplicationId() != null && ams.getApplicationId().equals(
appId)) {
try {
Container amContainer = event.getAppAttempt().getMasterContainer();
setupAMRMToken(event.getAppAttempt());
// Notify RMAppAttempt to change state
super.context.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
ams.notifyAMContainerLaunched(
event.getAppAttempt().getMasterContainer());
LOG.info("Notify AM launcher launched:" + amContainer.getId());
se.getNmMap().get(amContainer.getNodeId())
.addNewContainer(amContainer, 100000000L);
return;
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
}
throw new YarnRuntimeException(
"Didn't find any AMSimulator for applicationId=" + appId);
}
}
}

View File

@ -556,6 +556,30 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
}
}
);
metrics.register("variable.cluster.reserved.memory",
new Gauge<Long>() {
@Override
public Long getValue() {
if(getRootQueueMetrics() == null) {
return 0L;
} else {
return getRootQueueMetrics().getReservedMB();
}
}
}
);
metrics.register("variable.cluster.reserved.vcores",
new Gauge<Integer>() {
@Override
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0;
} else {
return getRootQueueMetrics().getReservedVirtualCores();
}
}
}
);
}
private void registerContainerAppNumMetrics() {