YARN-1913. With Fair Scheduler, cluster can logjam when all resources are consumed by AMs (Wei Yan via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1599400 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-06-03 00:56:48 +00:00
parent a29d2d3371
commit 16caa3fd18
14 changed files with 299 additions and 41 deletions

View File

@ -132,6 +132,9 @@ Release 2.5.0 - UNRELEASED
YARN-1474. Make schedulers services. (Tsuyoshi Ozawa via kasha)
YARN-1913. With Fair Scheduler, cluster can logjam when all resources are
consumed by AMs (Wei Yan via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -76,6 +77,8 @@ public class SchedulerApplicationAttempt {
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
private Resource amResource;
private boolean unmanagedAM = true;
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@ -106,6 +109,19 @@ public class SchedulerApplicationAttempt {
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager);
this.queue = queue;
if (rmContext != null && rmContext.getRMApps() != null &&
rmContext.getRMApps()
.containsKey(applicationAttemptId.getApplicationId())) {
ApplicationSubmissionContext appSubmissionContext =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
.getApplicationSubmissionContext();
if (appSubmissionContext != null) {
amResource = appSubmissionContext.getResource();
unmanagedAM = appSubmissionContext.getUnmanagedAM();
}
}
}
/**
@ -168,6 +184,14 @@ public class SchedulerApplicationAttempt {
return appSchedulingInfo.getQueueName();
}
public Resource getAMResource() {
return amResource;
}
public boolean getUnmanagedAM() {
return unmanagedAM;
}
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}

View File

@ -53,6 +53,10 @@ public class AllocationConfiguration {
private final int userMaxAppsDefault;
private final int queueMaxAppsDefault;
// Maximum resource share for each leaf queue that can be used to run AMs
final Map<String, Float> queueMaxAMShares;
private final float queueMaxAMShareDefault;
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
@ -84,8 +88,9 @@ public class AllocationConfiguration {
public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
int queueMaxAppsDefault,
Map<String, ResourceWeights> queueWeights,
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
int queueMaxAppsDefault, float queueMaxAMShareDefault,
Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
@ -97,9 +102,11 @@ public class AllocationConfiguration {
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
this.queueMaxAMShares = queueMaxAMShares;
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
this.queueMaxAMShareDefault = queueMaxAMShareDefault;
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@ -116,8 +123,10 @@ public class AllocationConfiguration {
queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
queueMaxAMShares = new HashMap<String, Float>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAMShareDefault = 1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@ -184,6 +193,11 @@ public class AllocationConfiguration {
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
}
public float getQueueMaxAMShare(String queue) {
Float maxAMShare = queueMaxAMShares.get(queue);
return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
}
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.

View File

@ -209,6 +209,7 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@ -216,6 +217,7 @@ public class AllocationFileLoaderService extends AbstractService {
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
float queueMaxAMShareDefault = 1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@ -282,6 +284,11 @@ public class AllocationFileLoaderService extends AbstractService {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
} else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.min(val, 1.0f);
queueMaxAMShareDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
@ -306,8 +313,8 @@ public class AllocationFileLoaderService extends AbstractService {
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
minSharePreemptionTimeouts, queueAcls,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
}
@ -322,8 +329,8 @@ public class AllocationFileLoaderService extends AbstractService {
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
newPlacementPolicy, configuredQueues);
@ -338,7 +345,8 @@ public class AllocationFileLoaderService extends AbstractService {
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@ -370,6 +378,11 @@ public class AllocationFileLoaderService extends AbstractService {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("maxAMShare".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.min(val, 1.0f);
queueMaxAMShares.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
@ -392,8 +405,9 @@ public class AllocationFileLoaderService extends AbstractService {
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
minSharePreemptionTimeouts, queueAcls, configuredQueues);
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}

View File

@ -267,6 +267,12 @@ public class AppSchedulable extends Schedulable {
node.allocateContainer(app.getApplicationId(),
allocatedContainer);
// If this container is used to run AM, update the leaf queue's AM usage
if (app.getLiveContainers().size() == 1 &&
!app.getUnmanagedAM()) {
queue.addAMResourceUsage(container.getResource());
}
return container.getResource();
} else {
// The desired container won't fit here, so reserve
@ -297,6 +303,14 @@ public class AppSchedulable extends Schedulable {
app.addSchedulingOpportunity(priority);
// Check the AM resource usage for the leaf queue
if (app.getLiveContainers().size() == 0
&& !app.getUnmanagedAM()) {
if (!queue.canRunAppAM(app.getAMResource())) {
return Resources.none();
}
}
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,

View File

@ -55,6 +55,9 @@ public class FSLeafQueue extends FSQueue {
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
// Track the AM resource usage for this queue
private Resource amResourceUsage;
private final ActiveUsersManager activeUsersManager;
public FSLeafQueue(String name, FairScheduler scheduler,
@ -63,6 +66,7 @@ public class FSLeafQueue extends FSQueue {
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
amResourceUsage = Resource.newInstance(0, 0);
}
public void addApp(FSSchedulerApp app, boolean runnable) {
@ -86,6 +90,10 @@ public class FSLeafQueue extends FSQueue {
*/
public boolean removeApp(FSSchedulerApp app) {
if (runnableAppScheds.remove(app.getAppSchedulable())) {
// Update AM resource usage
if (app.getAMResource() != null) {
Resources.subtractFrom(amResourceUsage, app.getAMResource());
}
return true;
} else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
return false;
@ -284,4 +292,26 @@ public class FSLeafQueue extends FSQueue {
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
/**
* Check whether this queue can run this application master under the
* maxAMShare limit
*
* @param amResource
* @return true if this queue can run
*/
public boolean canRunAppAM(Resource amResource) {
float maxAMShare =
scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
return !policy
.checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
}
public void addAMResourceUsage(Resource amResource) {
if (amResource != null) {
Resources.addTo(amResourceUsage, amResource);
}
}
}

View File

@ -149,4 +149,15 @@ public abstract class SchedulingPolicy {
*/
public abstract boolean checkIfUsageOverFairShare(
Resource usage, Resource fairShare);
/**
* Check if a leaf queue's AM resource usage over its limit under this policy
*
* @param usage {@link Resource} the resource used by application masters
* @param maxAMResource {@link Resource} the maximum allowed resource for
* application masters
* @return true if AM resource usage is over the limit
*/
public abstract boolean checkIfAMResourceUsageOverLimit(
Resource usage, Resource maxAMResource);
}

View File

@ -74,6 +74,11 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
return !Resources.fitsIn(usage, fairShare);
}
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return !Resources.fitsIn(usage, maxAMResource);
}
@Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);

View File

@ -124,6 +124,11 @@ public class FairSharePolicy extends SchedulingPolicy {
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
}
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;

View File

@ -94,6 +94,11 @@ public class FifoPolicy extends SchedulingPolicy {
"as FifoPolicy only works for FSLeafQueue.");
}
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;

View File

@ -20,14 +20,21 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
@ -169,4 +176,20 @@ public class FairSchedulerTestBase {
ask.add(request);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
}
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
String queue, String user, Resource amResource) {
RMContext rmContext = resourceManager.getRMContext();
RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, amResource, null), null, null,
0, null, null);
rmContext.getRMApps().put(attId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
attId.getApplicationId(), queue, user);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(attId, false);
scheduler.handle(attempAddedEvent);
}
}

View File

@ -174,9 +174,10 @@ public class TestAllocationFileLoaderService {
out.println("<queue name=\"queueC\">");
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
out.println("</queue>");
// Give queue D a limit of 3 running apps
// Give queue D a limit of 3 running apps and 0.4f maxAMShare
out.println("<queue name=\"queueD\">");
out.println("<maxRunningApps>3</maxRunningApps>");
out.println("<maxAMShare>0.4</maxAMShare>");
out.println("</queue>");
// Give queue E a preemption timeout of one minute
out.println("<queue name=\"queueE\">");
@ -194,6 +195,8 @@ public class TestAllocationFileLoaderService {
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of apps per user to 5
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
// Set default limit of AMResourceShare to 0.5f
out.println("<queueMaxAMShareDefault>0.5f</queueMaxAMShareDefault>");
// Give user1 a limit of 10 jobs
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>10</maxRunningApps>");
@ -240,6 +243,13 @@ public class TestAllocationFileLoaderService {
assertEquals(10, queueConf.getUserMaxApps("user1"));
assertEquals(5, queueConf.getUserMaxApps("user2"));
assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01);
assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01);
// Root should get * ACL
assertEquals("*", queueConf.getQueueAcl("root",
QueueACL.ADMINISTER_QUEUE).getAclString());

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@ -73,12 +72,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
@ -510,26 +509,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, null, null), null, null, 0, null, null);
appsMap.put(appAttemptId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
"user1");
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attempAddedEvent);
createApplicationWithAMResource(appAttemptId, "default", "user1", null);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
.getRunnableAppSchedulables().size());
assertEquals("root.user1", rmApp.getQueue());
assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
.get(appAttemptId.getApplicationId()).getQueue());
}
@Test
@ -538,21 +525,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, null, null), null, null, 0, null, null);
appsMap.put(appAttemptId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
"user2");
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attempAddedEvent);
createApplicationWithAMResource(appAttemptId, "default", "user2", null);
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
@ -2329,6 +2303,121 @@ public class TestFairScheduler extends FairSchedulerTestBase {
verifyQueueNumRunnable("queue1", 2, 1);
}
@Test
public void testQueueMaxAMShare() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queue1\">");
out.println("<maxAMShare>0.2</maxAMShare>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node =
MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
0, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(nodeEvent);
scheduler.update();
assertEquals("Queue queue1's fair share should be 10240",
10240, scheduler.getQueueManager().getLeafQueue("queue1", true)
.getFairShare().getMemory());
Resource amResource1 = Resource.newInstance(1024, 1);
Resource amResource2 = Resource.newInstance(2048, 2);
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
// Exceeds no limits
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(attId1, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application1's AM requests 1024 MB memory",
1024, app1.getAMResource().getMemory());
assertEquals("Application1's AM should be running",
1, app1.getLiveContainers().size());
// Exceeds no limits
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
createApplicationWithAMResource(attId2, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application2's AM requests 1024 MB memory",
1024, app2.getAMResource().getMemory());
assertEquals("Application2's AM should be running",
1, app2.getLiveContainers().size());
// Exceeds queue limit
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
createApplicationWithAMResource(attId3, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3);
FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application3's AM requests 1024 MB memory",
1024, app3.getAMResource().getMemory());
assertEquals("Application3's AM should not be running",
0, app3.getLiveContainers().size());
// Still can run non-AM container
createSchedulingRequestExistingApplication(1024, 1, attId1);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application1 should have two running containers",
2, app1.getLiveContainers().size());
// Remove app1, app3's AM should become running
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false);
scheduler.update();
scheduler.handle(appRemovedEvent1);
scheduler.handle(updateEvent);
assertEquals("Application1's AM should be finished",
0, app1.getLiveContainers().size());
assertEquals("Application3's AM should be running",
1, app3.getLiveContainers().size());
// Exceeds queue limit
ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
createApplicationWithAMResource(attId4, "queue1", "user1", amResource2);
createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4);
FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application4's AM requests 2048 MB memory",
2048, app4.getAMResource().getMemory());
assertEquals("Application4's AM should not be running",
0, app4.getLiveContainers().size());
// Remove app2 and app3, app4's AM should become running
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent2);
scheduler.handle(appRemovedEvent3);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application2's AM should be finished",
0, app2.getLiveContainers().size());
assertEquals("Application3's AM should be finished",
0, app3.getLiveContainers().size());
assertEquals("Application4's AM should be running",
1, app4.getLiveContainers().size());
}
@Test
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

View File

@ -237,6 +237,11 @@ Allocation file format
* maxRunningApps: limit the number of apps from the queue to run at once
* maxAMShare: limit the fraction of the queue's fair share that can be used
to run application masters. This property can only be used for leaf queues.
Default value is 1.0f, which means AMs in the leaf queue can take up to 100%
of both the memory and CPU fair share.
* weight: to share the cluster non-proportionally with other queues. Weights
default to 1, and a queue with weight 2 should receive approximately twice
as many resources as a queue with the default weight.
@ -279,6 +284,9 @@ Allocation file format
* <<A queueMaxAppsDefault element>>, which sets the default running app limit
for queues; overriden by maxRunningApps element in each queue.
* <<A queueMaxAMShareDefault element>>, which sets the default AM resource
limit for queue; overriden by maxAMShare element in each queue.
* <<A defaultQueueSchedulingPolicy element>>, which sets the default scheduling
policy for queues; overriden by the schedulingPolicy element in each queue
if specified. Defaults to "fair".
@ -328,6 +336,7 @@ Allocation file format
<minResources>10000 mb,0vcores</minResources>
<maxResources>90000 mb,0vcores</maxResources>
<maxRunningApps>50</maxRunningApps>
<maxAMShare>0.1</maxAMShare>
<weight>2.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<queue name="sample_sub_queue">
@ -336,6 +345,8 @@ Allocation file format
</queue>
</queue>
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
<!—- Queue secondary_group_queue is a parent queue and may have
user queues under it -—>
<queue name=“secondary_group_queue” type=“parent”>