YARN-2003. Support for Application priority : Changes in RM and Capacity Scheduler. (Sunil G via wangda)

(cherry picked from commit c39ca541f4)
This commit is contained in:
Wangda Tan 2015-07-21 09:56:59 -07:00
parent cd3692fe4b
commit 3dd113fa7c
30 changed files with 664 additions and 55 deletions

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -950,5 +951,14 @@ public class ResourceSchedulerWrapper
ContainerStatus containerStatus, RMContainerEventType event) { ContainerStatus containerStatus, RMContainerEventType event) {
// do nothing // do nothing
} }
@Override
public Priority checkAndGetApplicationPriority(Priority priority,
String user, String queueName, ApplicationId applicationId)
throws YarnException {
// TODO Dummy implementation.
return Priority.newInstance(0);
}
} }

View File

@ -86,6 +86,9 @@ Release 2.8.0 - UNRELEASED
YARN-3116. RM notifies NM whether a container is an AM container or normal YARN-3116. RM notifies NM whether a container is an AM container or normal
task container. (Giovanni Matteo Fumarola via zjshen) task container. (Giovanni Matteo Fumarola via zjshen)
YARN-2003. Support for Application priority : Changes in RM and Capacity
Scheduler. (Sunil G via wangda)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -1928,6 +1928,11 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTALIZED_NODELABEL_CONFIGURATION_TYPE; CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
YARN_PREFIX + "cluster.max-application-priority";
public static final int DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY = 0;
@Private @Private
public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get( return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -329,13 +330,18 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
ResourceRequest amReq = ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery); validateAndCreateResourceRequest(submissionContext, isRecovery);
// Verify and get the update application priority and set back to
// submissionContext
Priority appPriority = rmContext.getScheduler()
.checkAndGetApplicationPriority(submissionContext.getPriority(), user,
submissionContext.getQueue(), applicationId);
submissionContext.setPriority(appPriority);
// Create RMApp // Create RMApp
RMAppImpl application = RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf,
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext.getQueue(), submissionContext, this.scheduler,
submissionContext, this.scheduler, this.masterService, this.masterService, submitTime, submissionContext.getApplicationType(),
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags(), amReq); submissionContext.getApplicationTags(), amReq);
// Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with same applicationId will fail here

View File

@ -924,17 +924,15 @@ public class RMAppImpl implements RMApp, Recoverable {
// No existent attempts means the attempt associated with this app was not // No existent attempts means the attempt associated with this app was not
// started or started but not yet saved. // started or started but not yet saved.
if (app.attempts.isEmpty()) { if (app.attempts.isEmpty()) {
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext.getQueue(), app.user, app.submissionContext, false));
app.submissionContext.getReservationID()));
return RMAppState.SUBMITTED; return RMAppState.SUBMITTED;
} }
// Add application to scheduler synchronously to guarantee scheduler // Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers. // knows applications before AM or NM re-registers.
app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext.getQueue(), app.user, true, app.submissionContext, true));
app.submissionContext.getReservationID()));
// recover attempts // recover attempts
app.recoverAppAttempts(); app.recoverAppAttempts();
@ -960,9 +958,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppTransition { RMAppTransition {
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.handler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext.getQueue(), app.user, app.submissionContext, false));
app.submissionContext.getReservationID()));
} }
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -691,4 +692,13 @@ public abstract class AbstractYarnScheduler
} }
return null; return null;
} }
@Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException {
// Dummy Implementation till Application Priority changes are done in
// specific scheduler.
return Priority.newInstance(0);
}
} }

View File

@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -110,4 +111,11 @@ public interface Queue {
* new resource asked * new resource asked
*/ */
public void decPendingResource(String nodeLabel, Resource resourceToDec); public void decPendingResource(String nodeLabel, Resource resourceToDec);
/**
* Get the Default Application Priority for this queue
*
* @return default application priority
*/
public Priority getDefaultApplicationPriority();
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@Private @Private
@ -28,10 +29,18 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
private Queue queue; private Queue queue;
private final String user; private final String user;
private T currentAttempt; private T currentAttempt;
private volatile Priority priority;
public SchedulerApplication(Queue queue, String user) { public SchedulerApplication(Queue queue, String user) {
this.queue = queue; this.queue = queue;
this.user = user; this.user = user;
this.priority = null;
}
public SchedulerApplication(Queue queue, String user, Priority priority) {
this.queue = queue;
this.user = user;
this.priority = priority;
} }
public Queue getQueue() { public Queue getQueue() {
@ -58,4 +67,17 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
queue.getMetrics().finishApp(user, rmAppFinalState); queue.getMetrics().finishApp(user, rmAppFinalState);
} }
public Priority getPriority() {
return priority;
}
public void setPriority(Priority priority) {
this.priority = priority;
// Also set priority in current running attempt
if (null != currentAttempt) {
currentAttempt.setPriority(priority);
}
}
} }

View File

@ -98,6 +98,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private boolean amRunning = false; private boolean amRunning = false;
private LogAggregationContext logAggregationContext; private LogAggregationContext logAggregationContext;
private Priority appPriority = null;
protected ResourceUsage attemptResourceUsage = new ResourceUsage(); protected ResourceUsage attemptResourceUsage = new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
@ -727,6 +729,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return this.attemptResourceUsage; return this.attemptResourceUsage;
} }
@Override
public Priority getPriority() {
return appPriority;
}
public void setPriority(Priority appPriority) {
this.appPriority = appPriority;
}
@Override @Override
public String getId() { public String getId() {
return getApplicationId().toString(); return getApplicationId().toString();

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -286,4 +287,23 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @return an EnumSet containing the resource types * @return an EnumSet containing the resource types
*/ */
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes(); public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes();
/**
*
* Verify whether a submitted application priority is valid as per configured
* Queue
*
* @param priorityFromContext
* Submitted Application priority.
* @param user
* User who submitted the Application
* @param queueName
* Name of the Queue
* @param applicationId
* Application ID
* @return Updated Priority from scheduler
*/
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException;
} }

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueState;
@ -574,4 +575,10 @@ public abstract class AbstractCSQueue implements CSQueue {
// sorry, you cannot access // sorry, you cannot access
return false; return false;
} }
@Override
public Priority getDefaultApplicationPriority() {
// TODO add dummy implementation
return null;
}
} }

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -159,6 +160,9 @@ public class CapacityScheduler extends
new Comparator<FiCaSchedulerApp>() { new Comparator<FiCaSchedulerApp>() {
@Override @Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
if (!a1.getPriority().equals(a2.getPriority())) {
return a1.getPriority().compareTo(a2.getPriority());
}
return a1.getApplicationId().compareTo(a2.getApplicationId()); return a1.getApplicationId().compareTo(a2.getApplicationId());
} }
}; };
@ -226,6 +230,7 @@ public class CapacityScheduler extends
private RMNodeLabelsManager labelManager; private RMNodeLabelsManager labelManager;
private SchedulerHealth schedulerHealth = new SchedulerHealth(); private SchedulerHealth schedulerHealth = new SchedulerHealth();
long lastNodeUpdateTime; long lastNodeUpdateTime;
private Priority maxClusterLevelAppPriority;
/** /**
* EXPERT * EXPERT
*/ */
@ -326,6 +331,9 @@ public class CapacityScheduler extends
if (scheduleAsynchronously) { if (scheduleAsynchronously) {
asyncSchedulerThread = new AsyncScheduleThread(this); asyncSchedulerThread = new AsyncScheduleThread(this);
} }
maxClusterLevelAppPriority = Priority.newInstance(yarnConf.getInt(
YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
LOG.info("Initialized CapacityScheduler with " + LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " + "calculator=" + getResourceCalculator().getClass() + ", " +
@ -692,7 +700,7 @@ public class CapacityScheduler extends
} }
private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) { String queueName, String user, boolean isAppRecovering, Priority priority) {
if (mappings != null && mappings.size() > 0) { if (mappings != null && mappings.size() > 0) {
try { try {
@ -761,7 +769,7 @@ public class CapacityScheduler extends
// update the metrics // update the metrics
queue.getMetrics().submitApp(user); queue.getMetrics().submitApp(user);
SchedulerApplication<FiCaSchedulerApp> application = SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(queue, user); new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
applications.put(applicationId, application); applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName); + ", in queue: " + queueName);
@ -783,9 +791,9 @@ public class CapacityScheduler extends
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue(); CSQueue queue = (CSQueue) application.getQueue();
FiCaSchedulerApp attempt = FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
new FiCaSchedulerApp(applicationAttemptId, application.getUser(), application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
queue, queue.getActiveUsersManager(), rmContext); application.getPriority());
if (transferStateFromPreviousAttempt) { if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt()); .getCurrentAppAttempt());
@ -1307,7 +1315,8 @@ public class CapacityScheduler extends
addApplication(appAddedEvent.getApplicationId(), addApplication(appAddedEvent.getApplicationId(),
queueName, queueName,
appAddedEvent.getUser(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering()); appAddedEvent.getIsAppRecovering(),
appAddedEvent.getApplicatonPriority());
} }
} }
break; break;
@ -1833,4 +1842,56 @@ public class CapacityScheduler extends
private synchronized void setLastNodeUpdateTime(long time) { private synchronized void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time; this.lastNodeUpdateTime = time;
} }
@Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException {
Priority appPriority = null;
// ToDo: Verify against priority ACLs
// Verify the scenario where priority is null from submissionContext.
if (null == priorityFromContext) {
// Get the default priority for the Queue. If Queue is non-existent, then
// use default priority
priorityFromContext = getDefaultPriorityForQueue(queueName);
LOG.info("Application '" + applicationId
+ "' is submitted without priority "
+ "hence considering default queue/cluster priority:"
+ priorityFromContext.getPriority());
}
// Verify whether submitted priority is lesser than max priority
// in the cluster. If it is out of found, defining a max cap.
if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) {
priorityFromContext = Priority
.newInstance(getMaxClusterLevelAppPriority().getPriority());
}
appPriority = priorityFromContext;
LOG.info("Priority '" + appPriority.getPriority()
+ "' is acceptable in queue :" + queueName + "for application:"
+ applicationId + "for the user: " + user);
return appPriority;
}
private Priority getDefaultPriorityForQueue(String queueName) {
Queue queue = getQueue(queueName);
if (null == queue) {
// Return with default application priority
return Priority.newInstance(CapacitySchedulerConfiguration
.DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
}
return Priority.newInstance(queue.getDefaultApplicationPriority()
.getPriority());
}
public Priority getMaxClusterLevelAppPriority() {
return maxClusterLevelAppPriority;
}
} }

View File

@ -206,6 +206,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private @Private
public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
@Private
public static final String DEFAULT_APPLICATION_PRIORITY = "default-application-priority";
@Private
public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
@Private @Private
public static class QueueMapping { public static class QueueMapping {
@ -947,4 +953,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return configuredNodeLabels; return configuredNodeLabels;
} }
public Integer getDefaultApplicationPriorityConfPerQueue(String queue) {
Integer defaultPriority = getInt(getQueuePrefix(queue)
+ DEFAULT_APPLICATION_PRIORITY,
DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
return defaultPriority;
}
} }

View File

@ -98,6 +98,8 @@ public class LeafQueue extends AbstractCSQueue {
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>(); new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private Priority defaultAppPriorityPerQueue;
Set<FiCaSchedulerApp> pendingApplications; Set<FiCaSchedulerApp> pendingApplications;
private float minimumAllocationFactor; private float minimumAllocationFactor;
@ -220,6 +222,9 @@ public class LeafQueue extends AbstractCSQueue {
} }
} }
defaultAppPriorityPerQueue = Priority.newInstance(conf
.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
LOG.info("Initializing " + queueName + "\n" + LOG.info("Initializing " + queueName + "\n" +
"capacity = " + queueCapacities.getCapacity() + "capacity = " + queueCapacities.getCapacity() +
" [= (float) configuredCapacity / 100 ]" + "\n" + " [= (float) configuredCapacity / 100 ]" + "\n" +
@ -265,7 +270,8 @@ public class LeafQueue extends AbstractCSQueue {
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"reservationsContinueLooking = " + "reservationsContinueLooking = " +
reservationsContinueLooking + "\n" + reservationsContinueLooking + "\n" +
"preemptionDisabled = " + getPreemptionDisabled() + "\n"); "preemptionDisabled = " + getPreemptionDisabled() + "\n" +
"defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue);
} }
@Override @Override
@ -2061,6 +2067,11 @@ public class LeafQueue extends AbstractCSQueue {
this.orderingPolicy = orderingPolicy; this.orderingPolicy = orderingPolicy;
} }
@Override
public Priority getDefaultApplicationPriority() {
return defaultAppPriorityPerQueue;
}
/* /*
* Holds shared values used by all applications in * Holds shared values used by all applications in
* the queue to calculate headroom on demand * the queue to calculate headroom on demand

View File

@ -72,6 +72,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) { RMContext rmContext) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
Priority.newInstance(0));
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, Priority appPriority) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext); super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
@ -87,6 +94,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
} }
setAMResource(amResource); setAMResource(amResource);
setPriority(appPriority);
} }
synchronized public boolean containerCompleted(RMContainer rmContainer, synchronized public boolean containerCompleted(RMContainer rmContainer,

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
public class AppAddedSchedulerEvent extends SchedulerEvent { public class AppAddedSchedulerEvent extends SchedulerEvent {
@ -28,25 +30,35 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
private final String user; private final String user;
private final ReservationId reservationID; private final ReservationId reservationID;
private final boolean isAppRecovering; private final boolean isAppRecovering;
private final Priority appPriority;
public AppAddedSchedulerEvent( public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
ApplicationId applicationId, String queue, String user) { String user) {
this(applicationId, queue, user, false, null); this(applicationId, queue, user, false, null, Priority.newInstance(0));
} }
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, ReservationId reservationID) { String user, ReservationId reservationID, Priority appPriority) {
this(applicationId, queue, user, false, reservationID); this(applicationId, queue, user, false, reservationID, appPriority);
}
public AppAddedSchedulerEvent(String user,
ApplicationSubmissionContext submissionContext, boolean isAppRecovering) {
this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(),
submissionContext.getPriority());
} }
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering, ReservationId reservationID) { String user, boolean isAppRecovering, ReservationId reservationID,
Priority appPriority) {
super(SchedulerEventType.APP_ADDED); super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId; this.applicationId = applicationId;
this.queue = queue; this.queue = queue;
this.user = user; this.user = user;
this.reservationID = reservationID; this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering; this.isAppRecovering = isAppRecovering;
this.appPriority = appPriority;
} }
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
@ -68,4 +80,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
public ReservationId getReservationID() { public ReservationId getReservationID() {
return reservationID; return reservationID;
} }
public Priority getApplicatonPriority() {
return appPriority;
}
} }

View File

@ -331,6 +331,12 @@ public abstract class FSQueue implements Queue, Schedulable {
public void decPendingResource(String nodeLabel, Resource resourceToDec) { public void decPendingResource(String nodeLabel, Resource resourceToDec) {
} }
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FSParentQueue
return null;
}
public boolean fitsInMaxShare(Resource additionalResource) { public boolean fitsInMaxShare(Resource additionalResource) {
Resource usagePlusAddition = Resource usagePlusAddition =
Resources.add(getResourceUsage(), additionalResource); Resources.add(getResourceUsage(), additionalResource);

View File

@ -210,6 +210,12 @@ public class FifoScheduler extends
@Override @Override
public void decPendingResource(String nodeLabel, Resource resourceToDec) { public void decPendingResource(String nodeLabel, Resource resourceToDec) {
} }
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FIFO scheduler
return null;
}
}; };
public FifoScheduler() { public FifoScheduler() {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*; import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
@ -30,6 +31,10 @@ public class FifoComparator
@Override @Override
public int compare(SchedulableEntity r1, SchedulableEntity r2) { public int compare(SchedulableEntity r1, SchedulableEntity r2) {
if (r1.getPriority() != null
&& !r1.getPriority().equals(r2.getPriority())) {
return r1.getPriority().compareTo(r2.getPriority());
}
int res = r1.compareInputOrderTo(r2); int res = r1.compareInputOrderTo(r2);
return res; return res;
} }

View File

@ -48,4 +48,9 @@ public interface SchedulableEntity {
*/ */
public ResourceUsage getSchedulingResourceUsage(); public ResourceUsage getSchedulingResourceUsage();
/**
* Get the priority of the application
*/
public Priority getPriority();
} }

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -289,6 +290,15 @@ public class MockRM extends ResourceManager {
return submitApp(masterMemory, false); return submitApp(masterMemory, false);
} }
public RMApp submitApp(int masterMemory, Priority priority) throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
false, false, null, 0, null, true, priority);
}
public RMApp submitApp(int masterMemory, boolean unmanaged) public RMApp submitApp(int masterMemory, boolean unmanaged)
throws Exception { throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
@ -327,7 +337,7 @@ public class MockRM extends ResourceManager {
return submitApp(resource, name, user, acls, false, queue, return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
true, false, false, null, 0, null, true); true, false, false, null, 0, null, true, null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
@ -370,18 +380,19 @@ public class MockRM extends ResourceManager {
resource.setMemory(masterMemory); resource.setMemory(masterMemory);
return submitApp(resource, name, user, acls, unmanaged, queue, return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null, 0, null, true); false, null, 0, null, true, Priority.newInstance(0));
} }
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
throws Exception { throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemory(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser() return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, .getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, attemptFailuresValidityInterval, null, true); false, null, attemptFailuresValidityInterval, null, true, priority);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
@ -391,20 +402,22 @@ public class MockRM extends ResourceManager {
ApplicationId applicationId) throws Exception { ApplicationId applicationId) throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemory(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, name, user, acls, unmanaged, queue, return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, 0, null, true); isAppIdProvided, applicationId, 0, null, true, priority);
} }
public RMApp submitApp(int masterMemory, public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception { LogAggregationContext logAggregationContext) throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemory(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser() return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, .getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, logAggregationContext, true); false, null, 0, logAggregationContext, true, priority);
} }
public RMApp submitApp(Resource capability, String name, String user, public RMApp submitApp(Resource capability, String name, String user,
@ -412,7 +425,8 @@ public class MockRM extends ResourceManager {
int maxAppAttempts, Credentials ts, String appType, int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval, ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete) LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority)
throws Exception { throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
@ -435,6 +449,9 @@ public class MockRM extends ResourceManager {
if (queue != null) { if (queue != null) {
sub.setQueue(queue); sub.setQueue(queue);
} }
if (priority != null) {
sub.setPriority(priority);
}
sub.setApplicationType(appType); sub.setApplicationType(appType);
ContainerLaunchContext clc = Records ContainerLaunchContext clc = Records
.newRecord(ContainerLaunchContext.class); .newRecord(ContainerLaunchContext.class);

View File

@ -219,6 +219,7 @@ public class TestAppManager{
rmContext = mockRMContext(1, now - 10); rmContext = mockRMContext(1, now - 10);
ResourceScheduler scheduler = mockResourceScheduler(); ResourceScheduler scheduler = mockResourceScheduler();
((RMContextImpl)rmContext).setScheduler(scheduler);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
ApplicationMasterService masterService = ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler); new ApplicationMasterService(rmContext, scheduler);

View File

@ -1056,7 +1056,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
RMApp app0 = rm1.submitApp(resource, "", UserGroupInformation RMApp app0 = rm1.submitApp(resource, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null, .getCurrentUser().getShortUserName(), null, false, null,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true,
false, null, 0, null, true); false, null, 0, null, true, null);
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
am0.allocate("127.0.0.1", 1000, 2, new ArrayList<ContainerId>()); am0.allocate("127.0.0.1", 1000, 2, new ArrayList<ContainerId>());

View File

@ -1025,6 +1025,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
when(app.getReservedContainers()).thenReturn(reservedContainers); when(app.getReservedContainers()).thenReturn(reservedContainers);
when(app.getApplicationAttemptId()).thenReturn(appAttemptId); when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
when(app.getApplicationId()).thenReturn(appId); when(app.getApplicationId()).thenReturn(appId);
when(app.getPriority()).thenReturn(Priority.newInstance(0));
// add to LeafQueue // add to LeafQueue
LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);

View File

@ -155,6 +155,7 @@ public class TestApplicationLimits {
doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser(); doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource(); doReturn(amResource).when(application).getAMResource();
doReturn(Priority.newInstance(0)).when(application).getPriority();
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
return application; return application;
} }

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestApplicationPriority {
private static final Log LOG = LogFactory
.getLog(TestApplicationPriority.class);
private final int GB = 1024;
private YarnConfiguration conf;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
}
@Test
public void testApplicationOrderingWithPriority() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue q = (LeafQueue) cs.getQueue("default");
Assert.assertNotNull(q);
String host = "127.0.0.1";
RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(16 * GB), 1,
host);
cs.handle(new NodeAddedSchedulerEvent(node));
// add app 1 start
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
RMAppAttemptMetrics attemptMetric1 = new RMAppAttemptMetrics(appAttemptId1,
rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
rm.getRMContext().getRMApps().put(appId1, app1);
SchedulerEvent addAppEvent1 = new AppAddedSchedulerEvent(appId1, "default",
"user", null, Priority.newInstance(5));
cs.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 = new AppAttemptAddedSchedulerEvent(
appAttemptId1, false);
cs.handle(addAttemptEvent1);
// add app1 end
// add app2 begin
ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2);
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
appId2, 1);
RMAppAttemptMetrics attemptMetric2 = new RMAppAttemptMetrics(appAttemptId2,
rm.getRMContext());
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
rm.getRMContext().getRMApps().put(appId2, app2);
SchedulerEvent addAppEvent2 = new AppAddedSchedulerEvent(appId2, "default",
"user", null, Priority.newInstance(8));
cs.handle(addAppEvent2);
SchedulerEvent addAttemptEvent2 = new AppAttemptAddedSchedulerEvent(
appAttemptId2, false);
cs.handle(addAttemptEvent2);
// add app end
// Now, the first assignment will be for app2 since app2 is of highest
// priority
assertEquals(q.getApplications().size(), 2);
assertEquals(q.getApplications().iterator().next()
.getApplicationAttemptId(), appAttemptId2);
rm.stop();
}
@Test
public void testApplicationPriorityAllocation() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Set Max Application Priority as 10
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
MockRM rm = new MockRM(conf);
rm.start();
Priority appPriority1 = Priority.newInstance(5);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
RMApp app1 = rm.submitApp(1 * GB, appPriority1);
// kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt();
// add request for containers
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 7);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler, 7 containers will be allocated for App1
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
alloc1Response = am1.schedule();
}
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(7, allocated1.size());
Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
// check node report, 15 GB used (1 AM and 7 containers) and 1 GB available
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory());
// Submit the second app App2 with priority 8 (Higher than App1)
Priority appPriority2 = Priority.newInstance(8);
RMApp app2 = rm.submitApp(1 * GB, appPriority2);
// kick the scheduler, 1 GB which was free is given to AM of App2
nm1.nodeHeartbeat(true);
MockAM am2 = rm.sendAMLaunched(app2.getCurrentAppAttempt()
.getAppAttemptId());
am2.registerAppAttempt();
// check node report, 16 GB used and 0 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app1.getApplicationId()).getCurrentAppAttempt();
// kill 2 containers to free up some space
int counter = 0;
for (Container c : allocated1) {
if (++counter > 2) {
break;
}
cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check node report, 12 GB used and 4 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
// add request for containers App1
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 10);
am1.schedule(); // send the request for App1
// add request for containers App2
am2.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 3);
AllocateResponse alloc1Response4 = am2.schedule(); // send the request
// kick the scheduler, since App2 priority is more than App1, it will get
// remaining cluster space.
nm1.nodeHeartbeat(true);
while (alloc1Response4.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 2...");
Thread.sleep(100);
alloc1Response4 = am2.schedule();
}
// check node report, 16 GB used and 0 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
rm.stop();
}
@Test
public void testPriorityWithPendingApplications() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Set Max Application Priority as 10
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
MockRM rm = new MockRM(conf);
rm.start();
Priority appPriority1 = Priority.newInstance(5);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * GB);
RMApp app1 = rm.submitApp(1 * GB, appPriority1);
// kick the scheduler, 1 GB given to AM1, remaining 7GB on nm1
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt();
// add request for containers
am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 7);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler, 7 containers will be allocated for App1
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
alloc1Response = am1.schedule();
}
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(7, allocated1.size());
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
// check node report, 8 GB used (1 AM and 7 containers) and 0 GB available
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
Assert.assertEquals(8 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
// Submit the second app App2 with priority 7
Priority appPriority2 = Priority.newInstance(7);
RMApp app2 = rm.submitApp(1 * GB, appPriority2);
// Submit the third app App3 with priority 8
Priority appPriority3 = Priority.newInstance(8);
RMApp app3 = rm.submitApp(1 * GB, appPriority3);
// Submit the second app App4 with priority 6
Priority appPriority4 = Priority.newInstance(6);
RMApp app4 = rm.submitApp(1 * GB, appPriority4);
// Only one app can run as AM resource limit restricts it. Kill app1,
// If app3 (highest priority among rest) gets active, it indicates that
// priority is working with pendingApplications.
rm.killApp(app1.getApplicationId());
// kick the scheduler, app3 (high among pending) gets free space
nm1.nodeHeartbeat(true);
MockAM am3 = rm.sendAMLaunched(app3.getCurrentAppAttempt()
.getAppAttemptId());
am3.registerAppAttempt();
// check node report, 1 GB used and 7 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(7 * GB, report_nm1.getAvailableResource().getMemory());
rm.stop();
}
@Test
public void testMaxPriorityValidation() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Set Max Application Priority as 10
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
Priority maxPriority = Priority.newInstance(10);
MockRM rm = new MockRM(conf);
rm.start();
Priority appPriority1 = Priority.newInstance(15);
rm.registerNode("127.0.0.1:1234", 8 * GB);
RMApp app1 = rm.submitApp(1 * GB, appPriority1);
// Application submission should be successful and verify priority
Assert.assertEquals(app1.getApplicationSubmissionContext().getPriority(),
maxPriority);
rm.stop();
}
}

View File

@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@ -898,13 +899,17 @@ public class TestCapacityScheduler {
ApplicationId id1 = ApplicationId.newInstance(1, 1); ApplicationId id1 = ApplicationId.newInstance(1, 1);
ApplicationId id2 = ApplicationId.newInstance(1, 2); ApplicationId id2 = ApplicationId.newInstance(1, 2);
ApplicationId id3 = ApplicationId.newInstance(2, 1); ApplicationId id3 = ApplicationId.newInstance(2, 1);
Priority priority = Priority.newInstance(0);
//same clusterId //same clusterId
FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class); FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class);
when(app1.getApplicationId()).thenReturn(id1); when(app1.getApplicationId()).thenReturn(id1);
when(app1.getPriority()).thenReturn(priority);
FiCaSchedulerApp app2 = Mockito.mock(FiCaSchedulerApp.class); FiCaSchedulerApp app2 = Mockito.mock(FiCaSchedulerApp.class);
when(app2.getApplicationId()).thenReturn(id2); when(app2.getApplicationId()).thenReturn(id2);
when(app2.getPriority()).thenReturn(priority);
FiCaSchedulerApp app3 = Mockito.mock(FiCaSchedulerApp.class); FiCaSchedulerApp app3 = Mockito.mock(FiCaSchedulerApp.class);
when(app3.getApplicationId()).thenReturn(id3); when(app3.getApplicationId()).thenReturn(id3);
when(app3.getPriority()).thenReturn(priority);
assertTrue(appComparator.compare(app1, app2) < 0); assertTrue(appComparator.compare(app1, app2) < 0);
//different clusterId //different clusterId
assertTrue(appComparator.compare(app1, app3) < 0); assertTrue(appComparator.compare(app1, app3) < 0);

View File

@ -31,6 +31,7 @@ public class MockSchedulableEntity implements SchedulableEntity {
private String id; private String id;
private long serial = 0; private long serial = 0;
private Priority priority;
public MockSchedulableEntity() { } public MockSchedulableEntity() { }
@ -75,4 +76,12 @@ public class MockSchedulableEntity implements SchedulableEntity {
return 1;//let other types go before this, if any return 1;//let other types go before this, if any
} }
@Override
public Priority getPriority() {
return priority;
}
public void setApplicationPriority(Priority priority) {
this.priority = priority;
}
} }

View File

@ -1048,13 +1048,13 @@ public class TestDelegationTokenRenewer {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200); resource.setMemory(200);
RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2, RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2,
credentials, null, true, false, false, null, 0, null, false); credentials, null, true, false, false, null, 0, null, false, null);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING); rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// submit app2 with the same token, set cancelTokenWhenComplete to true; // submit app2 with the same token, set cancelTokenWhenComplete to true;
RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2, RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2,
credentials, null, true, false, false, null, 0, null, true); credentials, null, true, false, false, null, 0, null, true, null);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING); rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2); MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
@ -1114,7 +1114,7 @@ public class TestDelegationTokenRenewer {
resource.setMemory(200); resource.setMemory(200);
RMApp app1 = RMApp app1 =
rm.submitApp(resource, "name", "user", null, false, null, 2, credentials, rm.submitApp(resource, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, true); null, true, false, false, null, 0, null, true, null);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING); rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
@ -1122,7 +1122,7 @@ public class TestDelegationTokenRenewer {
Assert.assertNotNull(dttr); Assert.assertNotNull(dttr);
Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId())); Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2, RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2,
credentials, null, true, false, false, null, 0, null, true); credentials, null, true, false, false, null, 0, null, true, null);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING); rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
@ -1139,7 +1139,7 @@ public class TestDelegationTokenRenewer {
Assert.assertFalse(Renewer.cancelled); Assert.assertFalse(Renewer.cancelled);
RMApp app3 = rm.submitApp(resource, "name", "user", null, false, null, 2, RMApp app3 = rm.submitApp(resource, "name", "user", null, false, null, 2,
credentials, null, true, false, false, null, 0, null, true); credentials, null, true, false, false, null, 0, null, true, null);
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1); MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1);
rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING); rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING);
Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); Assert.assertTrue(renewer.getAllTokens().containsKey(token1));

View File

@ -759,10 +759,10 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo(); ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationId(appId); appInfo.setApplicationId(appId);
appInfo.setApplicationName(appName); appInfo.setApplicationName(appName);
appInfo.setPriority(3);
appInfo.setMaxAppAttempts(2); appInfo.setMaxAppAttempts(2);
appInfo.setQueue(queueName); appInfo.setQueue(queueName);
appInfo.setApplicationType(appType); appInfo.setApplicationType(appType);
appInfo.setPriority(0);
HashMap<String, LocalResourceInfo> lr = new HashMap<>(); HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo(); LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt")); y.setUrl(new URI("http://www.test.com/file.txt"));