YARN-9760. Support configuring application priorities on a workflow level. Contributed by Varun Saxena

(cherry picked from commit eebd313d76ed742fe82292bd8c0184970cdc5692)
This commit is contained in:
Jonathan Hung 2019-10-08 11:16:19 -07:00
parent 9b4aba49d1
commit ad3c98456d
10 changed files with 562 additions and 30 deletions

View File

@ -424,6 +424,32 @@ public class StringUtils {
return values;
}
/**
* Returns a collection of strings, trimming leading and trailing whitespace
* on each value. Duplicates are not removed.
*
* @param str
* String separated by delim.
* @param delim
* Delimiter to separate the values in str.
* @return Collection of string values.
*/
public static Collection<String> getTrimmedStringCollection(String str,
String delim) {
List<String> values = new ArrayList<String>();
if (str == null)
return values;
StringTokenizer tokenizer = new StringTokenizer(str, delim);
while (tokenizer.hasMoreTokens()) {
String next = tokenizer.nextToken();
if (next == null || next.trim().isEmpty()) {
continue;
}
values.add(next.trim());
}
return values;
}
/**
* Splits a comma separated value <code>String</code>, trimming leading and
* trailing whitespace on each value. Duplicate and empty values are removed.

View File

@ -91,10 +91,14 @@ import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass;
@ -450,6 +454,53 @@ public class TestMRJobs {
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
}
@Test(timeout = 300000)
public void testJobWithWorkflowPriority() throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
CapacityScheduler scheduler = (CapacityScheduler) mrCluster
.getResourceManager().getResourceScheduler();
CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
csConf.set(CapacitySchedulerConfiguration.WORKFLOW_PRIORITY_MAPPINGS,
WorkflowPriorityMappingsManager.getWorkflowPriorityMappingStr(
Arrays.asList(new WorkflowPriorityMapping(
"wf1", "root.default", Priority.newInstance(1)))));
csConf.setBoolean(CapacitySchedulerConfiguration.
ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true);
scheduler.reinitialize(csConf, scheduler.getRMContext());
// set master address to local to test that local mode applied if framework
// equals local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf
.setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5);
sleepConf.set(MRJobConfig.JOB_TAGS,
YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + "wf1");
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
// VERY_HIGH priority should get overwritten by workflow priority mapping
job.setPriority(JobPriority.VERY_HIGH);
job.submit();
waitForPriorityToUpdate(job, JobPriority.VERY_LOW);
// Verify the priority from job itself
Assert.assertEquals(JobPriority.VERY_LOW, job.getPriority());
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
}
private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus)
throws IOException, InterruptedException {
// Max wait time to get the priority update can be kept as 20sec (100 *

View File

@ -3933,6 +3933,13 @@ public class YarnConfiguration extends Configuration {
public static final String NM_CONTAINERS_LAUNCHER_CLASS =
NM_PREFIX + "containers-launcher.class";
// Configuration for the prefix of the tag which contains workflow ID,
// followed by the prefix.
public static final String YARN_WORKFLOW_ID_TAG_PREFIX =
YARN_PREFIX + "workflow-id.tag-prefix";
public static final String DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX =
"workflowid:";
public YarnConfiguration() {
super();
}

View File

@ -4112,4 +4112,15 @@
<name>yarn.node-labels.exclusive-enforced-partitions</name>
<value></value>
</property>
<property>
<description>
Prefix used to identify the YARN tag which contains workflow ID. If a tag coming in application
submission context has this prefix, whatever follows the prefix will be considered as workflow ID
associated with the application. This configuration is used by features such as workflow priority
for identifying the workflow associated with an application.
</description>
<name>yarn.workflow-id.tag-prefix</name>
<value>workflowid:</value>
</property>
</configuration>

View File

@ -217,4 +217,28 @@
</description>
</property>
<property>
<name>yarn.scheduler.capacity.workflow-priority-mappings</name>
<value></value>
<description>
A list of mappings that will be used to override application priority.
The syntax for this list is
[workflowId]:[full_queue_name]:[priority][,next mapping]*
where an application submitted (or mapped to) queue "full_queue_name"
and workflowId "workflowId" (as specified in application submission
context) will be given priority "priority".
</description>
</property>
<property>
<name>yarn.scheduler.capacity.workflow-priority-mappings-override.enable</name>
<value>false</value>
<description>
If a priority mapping is present, will it override the value specified
by the user? This can be used by administrators to give applications a
priority that is different than the one specified by the user.
The default is false.
</description>
</property>
</configuration>

View File

@ -180,6 +180,8 @@ public class CapacityScheduler extends
private CapacitySchedulerQueueManager queueManager;
private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@ -361,6 +363,8 @@ public class CapacityScheduler extends
this.labelManager, this.appPriorityACLManager);
this.queueManager.setCapacitySchedulerContext(this);
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
initializeQueues(this.conf);
@ -768,6 +772,8 @@ public class CapacityScheduler extends
updatePlacementRules();
this.workflowPriorityMappingsMgr.initialize(this);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, this.getRootQueue());
}
@ -778,6 +784,8 @@ public class CapacityScheduler extends
this.queueManager.reinitializeQueues(newConf);
updatePlacementRules();
this.workflowPriorityMappingsMgr.initialize(this);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, this.getRootQueue());
}
@ -987,6 +995,17 @@ public class CapacityScheduler extends
}
}
try {
priority = workflowPriorityMappingsMgr.mapWorkflowPriorityForApp(
applicationId, queue, user, priority);
} catch (YarnException e) {
String message = "Failed to submit application " + applicationId +
" submitted by user " + user + " reason: " + e.getMessage();
this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
applicationId, RMAppEventType.APP_REJECTED, message));
return;
}
// Submit to the queue
try {
queue.submitApplication(applicationId, user, queueName);
@ -3069,6 +3088,10 @@ public class CapacityScheduler extends
return this.queueManager;
}
public WorkflowPriorityMappingsManager getWorkflowPriorityMappingsManager() {
return this.workflowPriorityMappingsMgr;
}
/**
* Try to move a reserved container to a targetNode.
* If the targetNode is reserved by another application (other than this one).

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingP
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy;
@ -70,7 +71,6 @@ import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
import java.util.StringTokenizer;
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
@ -277,6 +277,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
@Private
public static final String WORKFLOW_PRIORITY_MAPPINGS =
PREFIX + "workflow-priority-mappings";
@Private
public static final String ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE =
WORKFLOW_PRIORITY_MAPPINGS + "-override.enable";
@Private
public static final boolean DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE = false;
@Private
public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
@ -1031,7 +1042,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
getTrimmedStringCollection(queueMappingName);
for (String mappingValue : mappingsString) {
String[] mapping =
getTrimmedStringCollection(mappingValue, ":")
StringUtils.getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 2 || mapping[1].length() == 0) {
throw new IllegalArgumentException(
@ -1069,30 +1080,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setStrings(mappingRuleProp, StringUtils.join(",", queueMappingStrs));
}
/**
* Returns a collection of strings, trimming leading and trailing whitespeace
* on each value
*
* @param str
* String to parse
* @param delim
* delimiter to separate the values
* @return Collection of parsed elements.
*/
private static Collection<String> getTrimmedStringCollection(String str,
String delim) {
List<String> values = new ArrayList<String>();
if (str == null)
return values;
StringTokenizer tokenizer = new StringTokenizer(str, delim);
while (tokenizer.hasMoreTokens()) {
String next = tokenizer.nextToken();
if (next == null || next.trim().isEmpty()) {
continue;
}
values.add(next.trim());
}
return values;
public boolean getOverrideWithWorkflowPriorityMappings() {
return getBoolean(ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE,
DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE);
}
public Collection<String> getWorkflowPriorityMappings() {
return getTrimmedStringCollection(WORKFLOW_PRIORITY_MAPPINGS);
}
/**
@ -1107,7 +1101,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
getTrimmedStringCollection(QUEUE_MAPPING);
for (String mappingValue : mappingsString) {
String[] mapping =
getTrimmedStringCollection(mappingValue, ":")
StringUtils.getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 3 || mapping[1].length() == 0
|| mapping[2].length() == 0) {
@ -1168,6 +1162,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
}
@Private
@VisibleForTesting
void setWorkflowPriorityMappings(
List<WorkflowPriorityMapping> workflowPriorityMappings) {
setStrings(WORKFLOW_PRIORITY_MAPPINGS, WorkflowPriorityMappingsManager
.getWorkflowPriorityMappingStr(workflowPriorityMappings));
}
public boolean isReservable(String queue) {
boolean isReservable =
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
@Private
@VisibleForTesting
public class WorkflowPriorityMappingsManager {
private static final Logger LOG =
LoggerFactory.getLogger(WorkflowPriorityMappingsManager.class);
private static final String WORKFLOW_PART_SEPARATOR = ":";
private static final String WORKFLOW_SEPARATOR = ",";
private CapacityScheduler scheduler;
private CapacitySchedulerConfiguration conf;
private boolean overrideWithPriorityMappings = false;
// Map of queue to a map of workflow ID to priority
private Map<String, Map<String, WorkflowPriorityMapping>> priorityMappings =
new HashMap<String, Map<String, WorkflowPriorityMapping>>();
public static class WorkflowPriorityMapping {
String workflowID;
String queue;
Priority priority;
public WorkflowPriorityMapping(String workflowID, String queue,
Priority priority) {
this.workflowID = workflowID;
this.queue = queue;
this.priority = priority;
}
public Priority getPriority() {
return this.priority;
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof WorkflowPriorityMapping) {
WorkflowPriorityMapping other = (WorkflowPriorityMapping) obj;
return (other.workflowID.equals(workflowID) &&
other.queue.equals(queue) &&
other.priority.equals(priority));
} else {
return false;
}
}
public String toString() {
return workflowID + WORKFLOW_PART_SEPARATOR + queue
+ WORKFLOW_PART_SEPARATOR + priority.getPriority();
}
}
@VisibleForTesting
public void initialize(CapacityScheduler scheduler) throws IOException {
this.scheduler = scheduler;
this.conf = scheduler.getConfiguration();
boolean overrideWithWorkflowPriorityMappings =
conf.getOverrideWithWorkflowPriorityMappings();
LOG.info("Initialized workflow priority mappings, override: "
+ overrideWithWorkflowPriorityMappings);
this.overrideWithPriorityMappings = overrideWithWorkflowPriorityMappings;
this.priorityMappings = getWorkflowPriorityMappings();
}
/**
* Get workflow ID to priority mappings for a queue.
*
* @return workflowID to priority mappings for a queue
*/
public Map<String, Map<String, WorkflowPriorityMapping>>
getWorkflowPriorityMappings() {
Map<String, Map<String, WorkflowPriorityMapping>> mappings =
new HashMap<String, Map<String, WorkflowPriorityMapping>>();
Collection<String> workflowMappings = conf.getWorkflowPriorityMappings();
for (String workflowMapping : workflowMappings) {
WorkflowPriorityMapping mapping =
getWorkflowMappingFromString(workflowMapping);
if (mapping != null) {
if (!mappings.containsKey(mapping.queue)) {
mappings.put(mapping.queue,
new HashMap<String, WorkflowPriorityMapping>());
}
mappings.get(mapping.queue).put(mapping.workflowID, mapping);
}
}
return mappings;
}
private WorkflowPriorityMapping getWorkflowMappingFromString(
String mappingString) {
if (mappingString == null) {
return null;
}
String[] mappingArray = StringUtils
.getTrimmedStringCollection(mappingString, WORKFLOW_PART_SEPARATOR)
.toArray(new String[] {});
if (mappingArray.length != 3 || mappingArray[0].length() == 0
|| mappingArray[1].length() == 0 || mappingArray[2].length() == 0) {
throw new IllegalArgumentException(
"Illegal workflow priority mapping " + mappingString);
}
WorkflowPriorityMapping mapping;
try {
mapping = new WorkflowPriorityMapping(mappingArray[0], mappingArray[1],
Priority.newInstance(Integer.parseInt(mappingArray[2])));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Illegal workflow priority for mapping " + mappingString);
}
return mapping;
}
public Priority getMappedPriority(String workflowID, CSQueue queue) {
// Recursively fetch the priority mapping for the given workflow tracing
// up the queue hierarchy until the first match.
if (queue.equals(scheduler.getRootQueue())) {
return null;
}
String queuePath = queue.getQueuePath();
if (priorityMappings.containsKey(queuePath)
&& priorityMappings.get(queuePath).containsKey(workflowID)) {
return priorityMappings.get(queuePath).get(workflowID).priority;
} else {
queue = queue.getParent();
return getMappedPriority(workflowID, queue);
}
}
public Priority mapWorkflowPriorityForApp(ApplicationId applicationId,
CSQueue queue, String user, Priority priority) throws YarnException {
if (overrideWithPriorityMappings) {
// Set the correct workflow priority
RMApp rmApp = scheduler.getRMContext().getRMApps().get(applicationId);
if (rmApp != null && rmApp.getApplicationTags() != null
&& rmApp.getApplicationSubmissionContext() != null) {
String workflowTagPrefix = scheduler.getConf().get(
YarnConfiguration.YARN_WORKFLOW_ID_TAG_PREFIX,
YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX);
String workflowID = null;
for(String tag : rmApp.getApplicationTags()) {
if (tag.trim().startsWith(workflowTagPrefix)) {
workflowID = tag.trim().substring(workflowTagPrefix.length());
}
}
if (workflowID != null && !workflowID.isEmpty()
&& priorityMappings != null && priorityMappings.size() > 0) {
Priority mappedPriority = getMappedPriority(workflowID, queue);
if (mappedPriority != null) {
LOG.info("Application " + applicationId + " user " + user
+ " workflow " + workflowID + " queue " + queue.getQueueName()
+ " mapping [" + priority + "] to [" + mappedPriority
+ "] override " + overrideWithPriorityMappings);
// If workflow ID exists in workflow mapping, change this
// application's priority to mapped value. Else, use queue
// default priority.
priority = mappedPriority;
priority = scheduler.checkAndGetApplicationPriority(
priority, UserGroupInformation.createRemoteUser(user),
queue.getQueueName(), applicationId);
rmApp.getApplicationSubmissionContext().setPriority(priority);
((RMAppImpl)rmApp).setApplicationPriority(priority);
}
}
}
}
return priority;
}
public static String getWorkflowPriorityMappingStr(
List<WorkflowPriorityMapping> workflowPriorityMappings) {
if (workflowPriorityMappings == null) {
return "";
}
List<String> workflowPriorityMappingStrs = new ArrayList<>();
for (WorkflowPriorityMapping mapping : workflowPriorityMappings) {
workflowPriorityMappingStrs.add(mapping.toString());
}
return StringUtils.join(WORKFLOW_SEPARATOR, workflowPriorityMappingStrs);
}
}

View File

@ -523,14 +523,21 @@ public class MockRM extends ResourceManager {
public RMApp submitApp(int masterMemory, Set<String> appTags)
throws Exception {
return submitApp(masterMemory, null, false, null, Priority.newInstance(0),
appTags);
}
public RMApp submitApp(int masterMemory, String queue,
boolean isAppIdProvided, ApplicationId appId, Priority priority,
Set<String> appTags) throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0);
ResourceRequest amResourceRequest = ResourceRequest.newInstance(
Priority.newInstance(0), ResourceRequest.ANY, resource, 1);
return submitApp(Collections.singletonList(amResourceRequest), "",
UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
false, false, null, 0, null, true, Priority.newInstance(0), null,
false, isAppIdProvided, appId, 0, null, true, priority, null,
null, null, appTags);
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class TestCapacitySchedulerWorkflowPriorityMapping
extends CapacitySchedulerTestBase {
private MockRM mockRM = null;
private static void setWorkFlowPriorityMappings(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(
CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setCapacity(A1, A1_CAPACITY);
conf.setCapacity(A2, A2_CAPACITY);
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setCapacity(B2, B2_CAPACITY);
conf.setCapacity(B3, B3_CAPACITY);
List<WorkflowPriorityMapping> mappings = Arrays.asList(
new WorkflowPriorityMapping("workflow1", B, Priority.newInstance(2)),
new WorkflowPriorityMapping("workflow2", A1, Priority.newInstance(3)),
new WorkflowPriorityMapping("workflow3", A, Priority.newInstance(4)));
conf.setWorkflowPriorityMappings(mappings);
}
@Test
public void testWorkflowPriorityMappings() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(CapacitySchedulerConfiguration
.ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true);
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
// Initialize workflow priority mappings.
setWorkFlowPriorityMappings(conf);
mockRM = new MockRM(conf);
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
mockRM.start();
cs.start();
Map<String, Map<String, Object>> expected = ImmutableMap.of(
A, ImmutableMap.of("workflow3",
new WorkflowPriorityMapping(
"workflow3", A, Priority.newInstance(4))),
B, ImmutableMap.of("workflow1",
new WorkflowPriorityMapping(
"workflow1", B, Priority.newInstance(2))),
A1, ImmutableMap.of("workflow2",
new WorkflowPriorityMapping(
"workflow2", A1, Priority.newInstance(3))));
assertEquals(expected, cs.getWorkflowPriorityMappingsManager()
.getWorkflowPriorityMappings());
// Maps to rule corresponding to parent queue "a" for workflow3.
mockRM.submitApp(1, "a2", true, ApplicationId.newInstance(0,1),
Priority.newInstance(0), ImmutableSet.of(
YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+ "workflow3"));
RMApp app =
mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,1));
assertEquals(4, app.getApplicationSubmissionContext().getPriority()
.getPriority());
// Does not match any rule as rule for queue + workflow does not exist.
// Priority passed in the app is taken up.
mockRM.submitApp(1, "a1", true, ApplicationId.newInstance(0,2),
Priority.newInstance(6), ImmutableSet.of(
YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+ "workflow1"));
app =
mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,2));
assertEquals(6, app.getApplicationSubmissionContext().getPriority()
.getPriority());
// Maps to rule corresponding to parent queue "a1" for workflow2.
mockRM.submitApp(1, "a1", true, ApplicationId.newInstance(0,3),
Priority.newInstance(0), ImmutableSet.of(
YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+ "workflow2"));
app =
mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,3));
assertEquals(3, app.getApplicationSubmissionContext().getPriority()
.getPriority());
// Maps to rule corresponding to parent queue "b" for workflow1.
mockRM.submitApp(1, "b3", true, ApplicationId.newInstance(0,4),
Priority.newInstance(0), ImmutableSet.of(
YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+ "workflow1"));
app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,4));
assertEquals(2, app.getApplicationSubmissionContext().getPriority()
.getPriority());
// Disable workflow priority mappings override and reinitialize scheduler.
conf.setBoolean(CapacitySchedulerConfiguration
.ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, false);
cs.reinitialize(conf, mockRM.getRMContext());
mockRM.submitApp(1, "a2", true, ApplicationId.newInstance(0,5),
Priority.newInstance(0), ImmutableSet.of(
YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
+ "workflow3"));
app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,5));
assertEquals(0, app.getApplicationSubmissionContext().getPriority()
.getPriority());
}
}