YARN-1498. Common scheduler changes for moving apps between queues (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563021 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
067d52b98c
commit
981679e6d7
|
@ -9,6 +9,9 @@ Trunk - Unreleased
|
||||||
YARN-1496. Protocol additions to allow moving apps between queues (Sandy
|
YARN-1496. Protocol additions to allow moving apps between queues (Sandy
|
||||||
Ryza)
|
Ryza)
|
||||||
|
|
||||||
|
YARN-1498. Common scheduler changes for moving apps between queues (Sandy
|
||||||
|
Ryza)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class AppSchedulingInfo {
|
||||||
private Set<String> blacklist = new HashSet<String>();
|
private Set<String> blacklist = new HashSet<String>();
|
||||||
|
|
||||||
//private final ApplicationStore store;
|
//private final ApplicationStore store;
|
||||||
private final ActiveUsersManager activeUsersManager;
|
private ActiveUsersManager activeUsersManager;
|
||||||
|
|
||||||
/* Allocated by scheduler */
|
/* Allocated by scheduler */
|
||||||
boolean pending = true; // for app metrics
|
boolean pending = true; // for app metrics
|
||||||
|
@ -171,11 +171,10 @@ public class AppSchedulingInfo {
|
||||||
.getNumContainers() : 0;
|
.getNumContainers() : 0;
|
||||||
Resource lastRequestCapability = lastRequest != null ? lastRequest
|
Resource lastRequestCapability = lastRequest != null ? lastRequest
|
||||||
.getCapability() : Resources.none();
|
.getCapability() : Resources.none();
|
||||||
metrics.incrPendingResources(user, request.getNumContainers()
|
metrics.incrPendingResources(user, request.getNumContainers(),
|
||||||
- lastRequestContainers, Resources.subtractFrom( // save a clone
|
request.getCapability());
|
||||||
Resources.multiply(request.getCapability(), request
|
metrics.decrPendingResources(user, lastRequestContainers,
|
||||||
.getNumContainers()), Resources.multiply(lastRequestCapability,
|
lastRequestCapability);
|
||||||
lastRequestContainers)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -262,6 +261,7 @@ public class AppSchedulingInfo {
|
||||||
pending = false;
|
pending = false;
|
||||||
metrics.runAppAttempt(applicationId, user);
|
metrics.runAppAttempt(applicationId, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("allocate: applicationId=" + applicationId
|
LOG.debug("allocate: applicationId=" + applicationId
|
||||||
+ " container=" + container.getId()
|
+ " container=" + container.getId()
|
||||||
|
@ -269,7 +269,7 @@ public class AppSchedulingInfo {
|
||||||
+ " user=" + user
|
+ " user=" + user
|
||||||
+ " resource=" + request.getCapability());
|
+ " resource=" + request.getCapability());
|
||||||
}
|
}
|
||||||
metrics.allocateResources(user, 1, request.getCapability());
|
metrics.allocateResources(user, 1, request.getCapability(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -359,6 +359,26 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized public void move(Queue newQueue) {
|
||||||
|
QueueMetrics oldMetrics = queue.getMetrics();
|
||||||
|
QueueMetrics newMetrics = newQueue.getMetrics();
|
||||||
|
for (Map<String, ResourceRequest> asks : requests.values()) {
|
||||||
|
ResourceRequest request = asks.get(ResourceRequest.ANY);
|
||||||
|
if (request != null) {
|
||||||
|
oldMetrics.decrPendingResources(user, request.getNumContainers(),
|
||||||
|
request.getCapability());
|
||||||
|
newMetrics.incrPendingResources(user, request.getNumContainers(),
|
||||||
|
request.getCapability());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
oldMetrics.moveAppFrom(this);
|
||||||
|
newMetrics.moveAppTo(this);
|
||||||
|
activeUsersManager.deactivateApplication(user, applicationId);
|
||||||
|
activeUsersManager = newQueue.getActiveUsersManager();
|
||||||
|
activeUsersManager.activateApplication(user, applicationId);
|
||||||
|
this.queue = newQueue;
|
||||||
|
}
|
||||||
|
|
||||||
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
||||||
// clear pending resources metrics for the application
|
// clear pending resources metrics for the application
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
|
@ -366,8 +386,7 @@ public class AppSchedulingInfo {
|
||||||
ResourceRequest request = asks.get(ResourceRequest.ANY);
|
ResourceRequest request = asks.get(ResourceRequest.ANY);
|
||||||
if (request != null) {
|
if (request != null) {
|
||||||
metrics.decrPendingResources(user, request.getNumContainers(),
|
metrics.decrPendingResources(user, request.getNumContainers(),
|
||||||
Resources.multiply(request.getCapability(), request
|
request.getCapability());
|
||||||
.getNumContainers()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
metrics.finishAppAttempt(applicationId, pending, user);
|
metrics.finishAppAttempt(applicationId, pending, user);
|
||||||
|
|
|
@ -58,4 +58,6 @@ public interface Queue {
|
||||||
List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
|
List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
|
||||||
|
|
||||||
boolean hasAccess(QueueACL acl, UserGroupInformation user);
|
boolean hasAccess(QueueACL acl, UserGroupInformation user);
|
||||||
|
|
||||||
|
public ActiveUsersManager getActiveUsersManager();
|
||||||
}
|
}
|
||||||
|
|
|
@ -281,6 +281,36 @@ public class QueueMetrics implements MetricsSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void moveAppFrom(AppSchedulingInfo app) {
|
||||||
|
if (app.isPending()) {
|
||||||
|
appsPending.decr();
|
||||||
|
} else {
|
||||||
|
appsRunning.decr();
|
||||||
|
}
|
||||||
|
QueueMetrics userMetrics = getUserMetrics(app.getUser());
|
||||||
|
if (userMetrics != null) {
|
||||||
|
userMetrics.moveAppFrom(app);
|
||||||
|
}
|
||||||
|
if (parent != null) {
|
||||||
|
parent.moveAppFrom(app);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void moveAppTo(AppSchedulingInfo app) {
|
||||||
|
if (app.isPending()) {
|
||||||
|
appsPending.incr();
|
||||||
|
} else {
|
||||||
|
appsRunning.incr();
|
||||||
|
}
|
||||||
|
QueueMetrics userMetrics = getUserMetrics(app.getUser());
|
||||||
|
if (userMetrics != null) {
|
||||||
|
userMetrics.moveAppTo(app);
|
||||||
|
}
|
||||||
|
if (parent != null) {
|
||||||
|
parent.moveAppTo(app);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set available resources. To be called by scheduler periodically as
|
* Set available resources. To be called by scheduler periodically as
|
||||||
* resources become available.
|
* resources become available.
|
||||||
|
@ -324,8 +354,8 @@ public class QueueMetrics implements MetricsSource {
|
||||||
|
|
||||||
private void _incrPendingResources(int containers, Resource res) {
|
private void _incrPendingResources(int containers, Resource res) {
|
||||||
pendingContainers.incr(containers);
|
pendingContainers.incr(containers);
|
||||||
pendingMB.incr(res.getMemory());
|
pendingMB.incr(res.getMemory() * containers);
|
||||||
pendingVCores.incr(res.getVirtualCores());
|
pendingVCores.incr(res.getVirtualCores() * containers);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void decrPendingResources(String user, int containers, Resource res) {
|
public void decrPendingResources(String user, int containers, Resource res) {
|
||||||
|
@ -341,22 +371,25 @@ public class QueueMetrics implements MetricsSource {
|
||||||
|
|
||||||
private void _decrPendingResources(int containers, Resource res) {
|
private void _decrPendingResources(int containers, Resource res) {
|
||||||
pendingContainers.decr(containers);
|
pendingContainers.decr(containers);
|
||||||
pendingMB.decr(res.getMemory());
|
pendingMB.decr(res.getMemory() * containers);
|
||||||
pendingVCores.decr(res.getVirtualCores());
|
pendingVCores.decr(res.getVirtualCores() * containers);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void allocateResources(String user, int containers, Resource res) {
|
public void allocateResources(String user, int containers, Resource res,
|
||||||
|
boolean decrPending) {
|
||||||
allocatedContainers.incr(containers);
|
allocatedContainers.incr(containers);
|
||||||
aggregateContainersAllocated.incr(containers);
|
aggregateContainersAllocated.incr(containers);
|
||||||
allocatedMB.incr(res.getMemory() * containers);
|
allocatedMB.incr(res.getMemory() * containers);
|
||||||
allocatedVCores.incr(res.getVirtualCores() * containers);
|
allocatedVCores.incr(res.getVirtualCores() * containers);
|
||||||
_decrPendingResources(containers, Resources.multiply(res, containers));
|
if (decrPending) {
|
||||||
|
_decrPendingResources(containers, res);
|
||||||
|
}
|
||||||
QueueMetrics userMetrics = getUserMetrics(user);
|
QueueMetrics userMetrics = getUserMetrics(user);
|
||||||
if (userMetrics != null) {
|
if (userMetrics != null) {
|
||||||
userMetrics.allocateResources(user, containers, res);
|
userMetrics.allocateResources(user, containers, res, decrPending);
|
||||||
}
|
}
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
parent.allocateResources(user, containers, res);
|
parent.allocateResources(user, containers, res, decrPending);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ import com.google.common.collect.Multiset;
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class SchedulerApplicationAttempt {
|
public class SchedulerApplicationAttempt {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(SchedulerApplicationAttempt.class);
|
.getLog(SchedulerApplicationAttempt.class);
|
||||||
|
@ -91,7 +91,7 @@ public abstract class SchedulerApplicationAttempt {
|
||||||
protected Map<Priority, Long> lastScheduledContainer =
|
protected Map<Priority, Long> lastScheduledContainer =
|
||||||
new HashMap<Priority, Long>();
|
new HashMap<Priority, Long>();
|
||||||
|
|
||||||
protected final Queue queue;
|
protected Queue queue;
|
||||||
protected boolean isStopped = false;
|
protected boolean isStopped = false;
|
||||||
|
|
||||||
protected final RMContext rmContext;
|
protected final RMContext rmContext;
|
||||||
|
@ -431,4 +431,25 @@ public abstract class SchedulerApplicationAttempt {
|
||||||
this.appSchedulingInfo
|
this.appSchedulingInfo
|
||||||
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
|
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void move(Queue newQueue) {
|
||||||
|
QueueMetrics oldMetrics = queue.getMetrics();
|
||||||
|
QueueMetrics newMetrics = newQueue.getMetrics();
|
||||||
|
String user = getUser();
|
||||||
|
for (RMContainer liveContainer : liveContainers.values()) {
|
||||||
|
Resource resource = liveContainer.getContainer().getResource();
|
||||||
|
oldMetrics.releaseResources(user, 1, resource);
|
||||||
|
newMetrics.allocateResources(user, 1, resource, false);
|
||||||
|
}
|
||||||
|
for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
|
||||||
|
for (RMContainer reservedContainer : map.values()) {
|
||||||
|
Resource resource = reservedContainer.getReservedResource();
|
||||||
|
oldMetrics.unreserveResource(user, resource);
|
||||||
|
newMetrics.reserveResource(user, resource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
appSchedulingInfo.move(newQueue);
|
||||||
|
this.queue = newQueue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
|
@ -54,11 +55,14 @@ public class FSLeafQueue extends FSQueue {
|
||||||
private long lastTimeAtMinShare;
|
private long lastTimeAtMinShare;
|
||||||
private long lastTimeAtHalfFairShare;
|
private long lastTimeAtHalfFairShare;
|
||||||
|
|
||||||
|
private final ActiveUsersManager activeUsersManager;
|
||||||
|
|
||||||
public FSLeafQueue(String name, FairScheduler scheduler,
|
public FSLeafQueue(String name, FairScheduler scheduler,
|
||||||
FSParentQueue parent) {
|
FSParentQueue parent) {
|
||||||
super(name, scheduler, parent);
|
super(name, scheduler, parent);
|
||||||
this.lastTimeAtMinShare = scheduler.getClock().getTime();
|
this.lastTimeAtMinShare = scheduler.getClock().getTime();
|
||||||
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
|
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
|
||||||
|
activeUsersManager = new ActiveUsersManager(getMetrics());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addApp(FSSchedulerApp app, boolean runnable) {
|
public void addApp(FSSchedulerApp app, boolean runnable) {
|
||||||
|
@ -245,4 +249,9 @@ public class FSLeafQueue extends FSQueue {
|
||||||
public int getNumRunnableApps() {
|
public int getNumRunnableApps() {
|
||||||
return runnableAppScheds.size();
|
return runnableAppScheds.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActiveUsersManager getActiveUsersManager() {
|
||||||
|
return activeUsersManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -194,4 +194,10 @@ public class FSParentQueue extends FSQueue {
|
||||||
childQueue.collectSchedulerApplications(apps);
|
childQueue.collectSchedulerApplications(apps);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActiveUsersManager getActiveUsersManager() {
|
||||||
|
// Should never be called since all applications are submitted to LeafQueues
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,6 +184,11 @@ public class FifoScheduler extends AbstractYarnScheduler implements
|
||||||
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||||
return getQueueAcls().get(acl).isUserAllowed(user);
|
return getQueueAcls().get(acl).isUserAllowed(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActiveUsersManager getActiveUsersManager() {
|
||||||
|
return activeUsersManager;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class TestQueueMetrics {
|
||||||
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
||||||
|
|
||||||
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
|
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
|
||||||
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
|
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
|
||||||
// Available resources is set externally, as it depends on dynamic
|
// Available resources is set externally, as it depends on dynamic
|
||||||
// configurable cluster/queue resources
|
// configurable cluster/queue resources
|
||||||
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
||||||
|
@ -81,7 +81,7 @@ public class TestQueueMetrics {
|
||||||
metrics.runAppAttempt(app.getApplicationId(), user);
|
metrics.runAppAttempt(app.getApplicationId(), user);
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
||||||
|
|
||||||
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
|
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
|
||||||
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
||||||
|
|
||||||
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
|
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
|
||||||
|
@ -171,7 +171,7 @@ public class TestQueueMetrics {
|
||||||
|
|
||||||
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
|
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
|
||||||
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
|
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
|
||||||
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
|
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
|
||||||
// Available resources is set externally, as it depends on dynamic
|
// Available resources is set externally, as it depends on dynamic
|
||||||
// configurable cluster/queue resources
|
// configurable cluster/queue resources
|
||||||
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
||||||
|
@ -181,7 +181,7 @@ public class TestQueueMetrics {
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
||||||
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
|
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
|
||||||
|
|
||||||
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
|
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
|
||||||
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
||||||
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
|
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ public class TestQueueMetrics {
|
||||||
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
|
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
|
||||||
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
|
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
|
||||||
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
|
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
|
||||||
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
|
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
|
||||||
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
||||||
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
||||||
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
|
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
|
||||||
|
@ -242,7 +242,7 @@ public class TestQueueMetrics {
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
||||||
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
|
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
|
||||||
|
|
||||||
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
|
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
|
||||||
metrics.reserveResource(user, Resources.createResource(3*GB, 3));
|
metrics.reserveResource(user, Resources.createResource(3*GB, 3));
|
||||||
// Available resources is set externally, as it depends on dynamic
|
// Available resources is set externally, as it depends on dynamic
|
||||||
// configurable cluster/queue resources
|
// configurable cluster/queue resources
|
||||||
|
|
|
@ -0,0 +1,160 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestSchedulerApplicationAttempt {
|
||||||
|
|
||||||
|
private static final NodeId nodeId = NodeId.newInstance("somehost", 5);
|
||||||
|
|
||||||
|
private Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
QueueMetrics.clearQueueMetrics();
|
||||||
|
DefaultMetricsSystem.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMove() {
|
||||||
|
final String user = "user1";
|
||||||
|
Queue parentQueue = createQueue("parent", null);
|
||||||
|
Queue oldQueue = createQueue("old", parentQueue);
|
||||||
|
Queue newQueue = createQueue("new", parentQueue);
|
||||||
|
QueueMetrics parentMetrics = parentQueue.getMetrics();
|
||||||
|
QueueMetrics oldMetrics = oldQueue.getMetrics();
|
||||||
|
QueueMetrics newMetrics = newQueue.getMetrics();
|
||||||
|
|
||||||
|
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
|
||||||
|
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
|
||||||
|
user, oldQueue, oldQueue.getActiveUsersManager(), null);
|
||||||
|
oldMetrics.submitApp(user);
|
||||||
|
|
||||||
|
// Resource request
|
||||||
|
Resource requestedResource = Resource.newInstance(1536, 2);
|
||||||
|
Priority requestedPriority = Priority.newInstance(2);
|
||||||
|
ResourceRequest request = ResourceRequest.newInstance(requestedPriority,
|
||||||
|
ResourceRequest.ANY, requestedResource, 3);
|
||||||
|
app.updateResourceRequests(Arrays.asList(request));
|
||||||
|
|
||||||
|
// Allocated container
|
||||||
|
RMContainer container1 = createRMContainer(appAttId, 1, requestedResource);
|
||||||
|
app.liveContainers.put(container1.getContainerId(), container1);
|
||||||
|
SchedulerNode node = createNode();
|
||||||
|
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, requestedPriority,
|
||||||
|
request, container1.getContainer());
|
||||||
|
|
||||||
|
// Reserved container
|
||||||
|
Priority prio1 = Priority.newInstance(1);
|
||||||
|
Resource reservedResource = Resource.newInstance(2048, 3);
|
||||||
|
RMContainer container2 = createReservedRMContainer(appAttId, 1, reservedResource,
|
||||||
|
node.getNodeID(), prio1);
|
||||||
|
Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>();
|
||||||
|
reservations.put(node.getNodeID(), container2);
|
||||||
|
app.reservedContainers.put(prio1, reservations);
|
||||||
|
oldMetrics.reserveResource(user, reservedResource);
|
||||||
|
|
||||||
|
checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
|
||||||
|
checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
|
||||||
|
|
||||||
|
app.move(newQueue);
|
||||||
|
|
||||||
|
checkQueueMetrics(oldMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
checkQueueMetrics(newMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
|
||||||
|
checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkQueueMetrics(QueueMetrics metrics, int activeApps,
|
||||||
|
int runningApps, int allocMb, int allocVcores, int reservedMb,
|
||||||
|
int reservedVcores, int pendingMb, int pendingVcores) {
|
||||||
|
assertEquals(activeApps, metrics.getActiveApps());
|
||||||
|
assertEquals(runningApps, metrics.getAppsRunning());
|
||||||
|
assertEquals(allocMb, metrics.getAllocatedMB());
|
||||||
|
assertEquals(allocVcores, metrics.getAllocatedVirtualCores());
|
||||||
|
assertEquals(reservedMb, metrics.getReservedMB());
|
||||||
|
assertEquals(reservedVcores, metrics.getReservedVirtualCores());
|
||||||
|
assertEquals(pendingMb, metrics.getPendingMB());
|
||||||
|
assertEquals(pendingVcores, metrics.getPendingVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
private SchedulerNode createNode() {
|
||||||
|
SchedulerNode node = mock(SchedulerNode.class);
|
||||||
|
when(node.getNodeName()).thenReturn("somehost");
|
||||||
|
when(node.getRackName()).thenReturn("somerack");
|
||||||
|
when(node.getNodeID()).thenReturn(nodeId);
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RMContainer createReservedRMContainer(ApplicationAttemptId appAttId,
|
||||||
|
int id, Resource resource, NodeId nodeId, Priority reservedPriority) {
|
||||||
|
RMContainer container = createRMContainer(appAttId, id, resource);
|
||||||
|
when(container.getReservedResource()).thenReturn(resource);
|
||||||
|
when(container.getReservedPriority()).thenReturn(reservedPriority);
|
||||||
|
when(container.getReservedNode()).thenReturn(nodeId);
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
|
||||||
|
Resource resource) {
|
||||||
|
ContainerId containerId = ContainerId.newInstance(appAttId, id);
|
||||||
|
RMContainer rmContainer = mock(RMContainer.class);
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
when(container.getResource()).thenReturn(resource);
|
||||||
|
when(container.getNodeId()).thenReturn(nodeId);
|
||||||
|
when(rmContainer.getContainer()).thenReturn(container);
|
||||||
|
when(rmContainer.getContainerId()).thenReturn(containerId);
|
||||||
|
return rmContainer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Queue createQueue(String name, Queue parent) {
|
||||||
|
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
|
||||||
|
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||||
|
Queue queue = mock(Queue.class);
|
||||||
|
when(queue.getMetrics()).thenReturn(metrics);
|
||||||
|
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
||||||
|
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
|
||||||
|
ApplicationAttemptId attId =
|
||||||
|
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
|
||||||
|
return attId;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue