YARN-1913. With Fair Scheduler, cluster can logjam when all resources are consumed by AMs (Wei Yan via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1599401 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-06-03 00:58:09 +00:00
parent b712ed2b23
commit f9c5c308e4
14 changed files with 299 additions and 41 deletions

View File

@ -117,6 +117,9 @@ Release 2.5.0 - UNRELEASED
YARN-1474. Make sechedulers services. (Tsuyoshi Ozawa via kasha)
YARN-1913. With Fair Scheduler, cluster can logjam when all resources are
consumed by AMs (Wei Yan via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -76,6 +77,8 @@ public class SchedulerApplicationAttempt {
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
private Resource amResource;
private boolean unmanagedAM = true;
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@ -106,6 +109,19 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager);
this.queue = queue;
if (rmContext != null && rmContext.getRMApps() != null &&
rmContext.getRMApps()
.containsKey(applicationAttemptId.getApplicationId())) {
ApplicationSubmissionContext appSubmissionContext =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
.getApplicationSubmissionContext();
if (appSubmissionContext != null) {
amResource = appSubmissionContext.getResource();
unmanagedAM = appSubmissionContext.getUnmanagedAM();
}
}
}
/**
@ -168,6 +184,14 @@ public String getQueueName() {
return appSchedulingInfo.getQueueName();
}
public Resource getAMResource() {
return amResource;
}
public boolean getUnmanagedAM() {
return unmanagedAM;
}
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}

View File

@ -53,6 +53,10 @@ public class AllocationConfiguration {
private final int userMaxAppsDefault;
private final int queueMaxAppsDefault;
// Maximum resource share for each leaf queue that can be used to run AMs
final Map<String, Float> queueMaxAMShares;
private final float queueMaxAMShareDefault;
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
@ -84,8 +88,9 @@ public class AllocationConfiguration {
public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
int queueMaxAppsDefault,
Map<String, ResourceWeights> queueWeights,
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
int queueMaxAppsDefault, float queueMaxAMShareDefault,
Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
@ -97,9 +102,11 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
this.queueMaxAMShares = queueMaxAMShares;
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
this.queueMaxAMShareDefault = queueMaxAMShareDefault;
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@ -116,8 +123,10 @@ public AllocationConfiguration(Configuration conf) {
queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
queueMaxAMShares = new HashMap<String, Float>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAMShareDefault = 1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@ -184,6 +193,11 @@ public int getQueueMaxApps(String queue) {
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
}
public float getQueueMaxAMShare(String queue) {
Float maxAMShare = queueMaxAMShares.get(queue);
return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
}
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.

View File

@ -209,6 +209,7 @@ public synchronized void reloadAllocations() throws IOException,
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@ -216,6 +217,7 @@ public synchronized void reloadAllocations() throws IOException,
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
float queueMaxAMShareDefault = 1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@ -282,6 +284,11 @@ public synchronized void reloadAllocations() throws IOException,
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
} else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.min(val, 1.0f);
queueMaxAMShareDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
@ -306,8 +313,8 @@ public synchronized void reloadAllocations() throws IOException,
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
minSharePreemptionTimeouts, queueAcls,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
}
@ -322,8 +329,8 @@ public synchronized void reloadAllocations() throws IOException,
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
newPlacementPolicy, configuredQueues);
@ -338,7 +345,8 @@ public synchronized void reloadAllocations() throws IOException,
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@ -370,6 +378,11 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("maxAMShare".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.min(val, 1.0f);
queueMaxAMShares.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
@ -392,8 +405,9 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
minSharePreemptionTimeouts, queueAcls, configuredQueues);
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}

View File

@ -267,6 +267,12 @@ private Resource assignContainer(FSSchedulerNode node,
node.allocateContainer(app.getApplicationId(),
allocatedContainer);
// If this container is used to run AM, update the leaf queue's AM usage
if (app.getLiveContainers().size() == 1 &&
!app.getUnmanagedAM()) {
queue.addAMResourceUsage(container.getResource());
}
return container.getResource();
} else {
// The desired container won't fit here, so reserve
@ -297,6 +303,14 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
app.addSchedulingOpportunity(priority);
// Check the AM resource usage for the leaf queue
if (app.getLiveContainers().size() == 0
&& !app.getUnmanagedAM()) {
if (!queue.canRunAppAM(app.getAMResource())) {
return Resources.none();
}
}
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,

View File

@ -55,6 +55,9 @@ public class FSLeafQueue extends FSQueue {
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
// Track the AM resource usage for this queue
private Resource amResourceUsage;
private final ActiveUsersManager activeUsersManager;
public FSLeafQueue(String name, FairScheduler scheduler,
@ -63,6 +66,7 @@ public FSLeafQueue(String name, FairScheduler scheduler,
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
amResourceUsage = Resource.newInstance(0, 0);
}
public void addApp(FSSchedulerApp app, boolean runnable) {
@ -86,6 +90,10 @@ void addAppSchedulable(AppSchedulable appSched) {
*/
public boolean removeApp(FSSchedulerApp app) {
if (runnableAppScheds.remove(app.getAppSchedulable())) {
// Update AM resource usage
if (app.getAMResource() != null) {
Resources.subtractFrom(amResourceUsage, app.getAMResource());
}
return true;
} else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
return false;
@ -284,4 +292,26 @@ public int getNumRunnableApps() {
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
/**
* Check whether this queue can run this application master under the
* maxAMShare limit
*
* @param amResource
* @return true if this queue can run
*/
public boolean canRunAppAM(Resource amResource) {
float maxAMShare =
scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
return !policy
.checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
}
public void addAMResourceUsage(Resource amResource) {
if (amResource != null) {
Resources.addTo(amResourceUsage, amResource);
}
}
}

View File

@ -149,4 +149,15 @@ public abstract void computeShares(
*/
public abstract boolean checkIfUsageOverFairShare(
Resource usage, Resource fairShare);
/**
* Check if a leaf queue's AM resource usage over its limit under this policy
*
* @param usage {@link Resource} the resource used by application masters
* @param maxAMResource {@link Resource} the maximum allowed resource for
* application masters
* @return true if AM resource usage is over the limit
*/
public abstract boolean checkIfAMResourceUsageOverLimit(
Resource usage, Resource maxAMResource);
}

View File

@ -74,6 +74,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return !Resources.fitsIn(usage, fairShare);
}
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return !Resources.fitsIn(usage, maxAMResource);
}
@Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);

View File

@ -124,6 +124,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
}
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;

View File

@ -94,6 +94,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
"as FifoPolicy only works for FSLeafQueue.");
}
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;

View File

@ -20,14 +20,21 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
@ -169,4 +176,20 @@ protected void createSchedulingRequestExistingApplication(
ask.add(request);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
}
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
String queue, String user, Resource amResource) {
RMContext rmContext = resourceManager.getRMContext();
RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, amResource, null), null, null,
0, null, null);
rmContext.getRMApps().put(attId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
attId.getApplicationId(), queue, user);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(attId, false);
scheduler.handle(attempAddedEvent);
}
}

View File

@ -174,9 +174,10 @@ public void testAllocationFileParsing() throws Exception {
out.println("<queue name=\"queueC\">");
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
out.println("</queue>");
// Give queue D a limit of 3 running apps
// Give queue D a limit of 3 running apps and 0.4f maxAMShare
out.println("<queue name=\"queueD\">");
out.println("<maxRunningApps>3</maxRunningApps>");
out.println("<maxAMShare>0.4</maxAMShare>");
out.println("</queue>");
// Give queue E a preemption timeout of one minute
out.println("<queue name=\"queueE\">");
@ -194,6 +195,8 @@ public void testAllocationFileParsing() throws Exception {
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of apps per user to 5
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
// Set default limit of AMResourceShare to 0.5f
out.println("<queueMaxAMShareDefault>0.5f</queueMaxAMShareDefault>");
// Give user1 a limit of 10 jobs
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>10</maxRunningApps>");
@ -240,6 +243,13 @@ public void testAllocationFileParsing() throws Exception {
assertEquals(10, queueConf.getUserMaxApps("user1"));
assertEquals(5, queueConf.getUserMaxApps("user2"));
assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01);
assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01);
assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01);
// Root should get * ACL
assertEquals("*", queueConf.getQueueAcl("root",
QueueACL.ADMINISTER_QUEUE).getAclString());

View File

@ -64,7 +64,6 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@ -73,12 +72,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
@ -510,26 +509,14 @@ public void testUserAsDefaultQueue() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, null, null), null, null, 0, null, null);
appsMap.put(appAttemptId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
"user1");
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attempAddedEvent);
createApplicationWithAMResource(appAttemptId, "default", "user1", null);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
.getRunnableAppSchedulables().size());
assertEquals("root.user1", rmApp.getQueue());
assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
.get(appAttemptId.getApplicationId()).getQueue());
}
@Test
@ -538,21 +525,8 @@ public void testNotUserAsDefaultQueue() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMContext rmContext = resourceManager.getRMContext();
Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
null, null, null, false, false, 0, null, null), null, null, 0, null, null);
appsMap.put(appAttemptId.getApplicationId(), rmApp);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
"user2");
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attempAddedEvent);
createApplicationWithAMResource(appAttemptId, "default", "user2", null);
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
@ -2329,6 +2303,121 @@ public void testUserAndQueueMaxRunningApps() throws Exception {
verifyQueueNumRunnable("queue1", 2, 1);
}
@Test
public void testQueueMaxAMShare() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queue1\">");
out.println("<maxAMShare>0.2</maxAMShare>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node =
MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
0, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(nodeEvent);
scheduler.update();
assertEquals("Queue queue1's fair share should be 10240",
10240, scheduler.getQueueManager().getLeafQueue("queue1", true)
.getFairShare().getMemory());
Resource amResource1 = Resource.newInstance(1024, 1);
Resource amResource2 = Resource.newInstance(2048, 2);
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
// Exceeds no limits
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(attId1, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application1's AM requests 1024 MB memory",
1024, app1.getAMResource().getMemory());
assertEquals("Application1's AM should be running",
1, app1.getLiveContainers().size());
// Exceeds no limits
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
createApplicationWithAMResource(attId2, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application2's AM requests 1024 MB memory",
1024, app2.getAMResource().getMemory());
assertEquals("Application2's AM should be running",
1, app2.getLiveContainers().size());
// Exceeds queue limit
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
createApplicationWithAMResource(attId3, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3);
FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application3's AM requests 1024 MB memory",
1024, app3.getAMResource().getMemory());
assertEquals("Application3's AM should not be running",
0, app3.getLiveContainers().size());
// Still can run non-AM container
createSchedulingRequestExistingApplication(1024, 1, attId1);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application1 should have two running containers",
2, app1.getLiveContainers().size());
// Remove app1, app3's AM should become running
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false);
scheduler.update();
scheduler.handle(appRemovedEvent1);
scheduler.handle(updateEvent);
assertEquals("Application1's AM should be finished",
0, app1.getLiveContainers().size());
assertEquals("Application3's AM should be running",
1, app3.getLiveContainers().size());
// Exceeds queue limit
ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
createApplicationWithAMResource(attId4, "queue1", "user1", amResource2);
createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4);
FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application4's AM requests 2048 MB memory",
2048, app4.getAMResource().getMemory());
assertEquals("Application4's AM should not be running",
0, app4.getLiveContainers().size());
// Remove app2 and app3, app4's AM should become running
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent2);
scheduler.handle(appRemovedEvent3);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application2's AM should be finished",
0, app2.getLiveContainers().size());
assertEquals("Application3's AM should be finished",
0, app3.getLiveContainers().size());
assertEquals("Application4's AM should be running",
1, app4.getLiveContainers().size());
}
@Test
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

View File

@ -237,6 +237,11 @@ Allocation file format
* maxRunningApps: limit the number of apps from the queue to run at once
* maxAMShare: limit the fraction of the queue's fair share that can be used
to run application masters. This property can only be used for leaf queues.
Default value is 1.0f, which means AMs in the leaf queue can take up to 100%
of both the memory and CPU fair share.
* weight: to share the cluster non-proportionally with other queues. Weights
default to 1, and a queue with weight 2 should receive approximately twice
as many resources as a queue with the default weight.
@ -279,6 +284,9 @@ Allocation file format
* <<A queueMaxAppsDefault element>>, which sets the default running app limit
for queues; overriden by maxRunningApps element in each queue.
* <<A queueMaxAMShareDefault element>>, which sets the default AM resource
limit for queue; overriden by maxAMShare element in each queue.
* <<A defaultQueueSchedulingPolicy element>>, which sets the default scheduling
policy for queues; overriden by the schedulingPolicy element in each queue
if specified. Defaults to "fair".
@ -328,6 +336,7 @@ Allocation file format
<minResources>10000 mb,0vcores</minResources>
<maxResources>90000 mb,0vcores</maxResources>
<maxRunningApps>50</maxRunningApps>
<maxAMShare>0.1</maxAMShare>
<weight>2.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<queue name="sample_sub_queue">
@ -336,6 +345,8 @@ Allocation file format
</queue>
</queue>
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
<!—- Queue secondary_group_queue is a parent queue and may have
user queues under it -—>
<queue name=“secondary_group_queue” type=“parent”>