YARN-2378. Added support for moving applications across queues in CapacityScheduler. Contributed by Subramaniam Venkatraman Krishnan
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
16b7bd4e6c
commit
7360cec692
|
@ -50,6 +50,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2277. Added cross-origin support for the timeline server web services.
|
||||
(Jonathan Eagles via zjshen)
|
||||
|
||||
YARN-2378. Added support for moving applications across queues in
|
||||
CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
||||
|
|
|
@ -166,6 +166,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppEventType.APP_REJECTED,
|
||||
new FinalSavingTransition(new AppRejectedTransition(),
|
||||
RMAppState.FAILED))
|
||||
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
||||
RMAppEventType.MOVE, new RMAppMoveTransition())
|
||||
|
||||
// Transitions from SUBMITTED state
|
||||
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
||||
|
@ -243,7 +245,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// ignorable transitions
|
||||
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
||||
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
|
||||
RMAppEventType.APP_NEW_SAVED))
|
||||
RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
|
||||
|
||||
// Transitions from FINISHING state
|
||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
||||
|
@ -254,9 +256,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// ignorable transitions
|
||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||
EnumSet.of(RMAppEventType.NODE_UPDATE,
|
||||
// ignore Kill as we have already saved the final Finished state in
|
||||
// state store.
|
||||
RMAppEventType.KILL))
|
||||
// ignore Kill/Move as we have already saved the final Finished state
|
||||
// in state store.
|
||||
RMAppEventType.KILL, RMAppEventType.MOVE))
|
||||
|
||||
// Transitions from KILLING state
|
||||
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
||||
|
@ -274,7 +276,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppEventType.ATTEMPT_FINISHED,
|
||||
RMAppEventType.ATTEMPT_FAILED,
|
||||
RMAppEventType.APP_UPDATE_SAVED,
|
||||
RMAppEventType.KILL))
|
||||
RMAppEventType.KILL, RMAppEventType.MOVE))
|
||||
|
||||
// Transitions from FINISHED state
|
||||
// ignorable transitions
|
||||
|
@ -286,7 +288,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppEventType.NODE_UPDATE,
|
||||
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||
RMAppEventType.ATTEMPT_FINISHED,
|
||||
RMAppEventType.KILL))
|
||||
RMAppEventType.KILL, RMAppEventType.MOVE))
|
||||
|
||||
// Transitions from FAILED state
|
||||
// ignorable transitions
|
||||
|
@ -294,7 +296,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppEventType.APP_RUNNING_ON_NODE,
|
||||
new AppRunningOnNodeTransition())
|
||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
|
||||
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
|
||||
RMAppEventType.MOVE))
|
||||
|
||||
// Transitions from KILLED state
|
||||
// ignorable transitions
|
||||
|
@ -307,7 +310,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
EnumSet.of(RMAppEventType.APP_ACCEPTED,
|
||||
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
|
||||
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
|
||||
RMAppEventType.NODE_UPDATE))
|
||||
RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
|
||||
|
||||
.installTopology();
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
|
@ -48,6 +50,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public abstract class AbstractYarnScheduler
|
||||
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
|
||||
|
@ -317,4 +321,31 @@ public abstract class AbstractYarnScheduler
|
|||
public SchedulerNode getSchedulerNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void moveAllApps(String sourceQueue, String destQueue)
|
||||
throws YarnException {
|
||||
// check if destination queue is a valid leaf queue
|
||||
try {
|
||||
getQueueInfo(destQueue, false, false);
|
||||
} catch (IOException e) {
|
||||
LOG.warn(e);
|
||||
throw new YarnException(e);
|
||||
}
|
||||
// check if source queue is a valid
|
||||
List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
|
||||
if (apps == null) {
|
||||
String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
|
||||
LOG.warn(errMsg);
|
||||
throw new YarnException(errMsg);
|
||||
}
|
||||
// generate move events for each pending/running app
|
||||
for (ApplicationAttemptId app : apps) {
|
||||
SettableFuture<Object> future = SettableFuture.create();
|
||||
this.rmContext
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
.handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ public class AppSchedulingInfo {
|
|||
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
final ApplicationId applicationId;
|
||||
private final String queueName;
|
||||
private String queueName;
|
||||
Queue queue;
|
||||
final String user;
|
||||
// TODO making containerIdCounter long
|
||||
|
@ -410,6 +410,7 @@ public class AppSchedulingInfo {
|
|||
activeUsersManager = newQueue.getActiveUsersManager();
|
||||
activeUsersManager.activateApplication(user, applicationId);
|
||||
this.queue = newQueue;
|
||||
this.queueName = newQueue.getQueueName();
|
||||
}
|
||||
|
||||
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
||||
|
|
|
@ -202,4 +202,14 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
@Evolving
|
||||
public String moveApplication(ApplicationId appId, String newQueue)
|
||||
throws YarnException;
|
||||
|
||||
/**
|
||||
* Completely drain sourceQueue of applications, by moving all of them to
|
||||
* destQueue.
|
||||
*
|
||||
* @param sourceQueue
|
||||
* @param destQueue
|
||||
* @throws YarnException
|
||||
*/
|
||||
void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
|
||||
}
|
||||
|
|
|
@ -238,4 +238,22 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
* @param apps the collection to add the applications to
|
||||
*/
|
||||
public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
|
||||
|
||||
/**
|
||||
* Detach a container from this queue
|
||||
* @param clusterResource the current cluster resource
|
||||
* @param application application to which the container was assigned
|
||||
* @param container the container to detach
|
||||
*/
|
||||
public void detachContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, RMContainer container);
|
||||
|
||||
/**
|
||||
* Attach a container to this queue
|
||||
* @param clusterResource the current cluster resource
|
||||
* @param application application to which the container was assigned
|
||||
* @param container the container to attach
|
||||
*/
|
||||
public void attachContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, RMContainer container);
|
||||
}
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
|
@ -547,6 +548,8 @@ public class CapacityScheduler extends
|
|||
.handle(new RMAppRejectedEvent(applicationId, ace.toString()));
|
||||
return;
|
||||
}
|
||||
// update the metrics
|
||||
queue.getMetrics().submitApp(user);
|
||||
SchedulerApplication<FiCaSchedulerApp> application =
|
||||
new SchedulerApplication<FiCaSchedulerApp>(queue, user);
|
||||
applications.put(applicationId, application);
|
||||
|
@ -1131,4 +1134,59 @@ public class CapacityScheduler extends
|
|||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String moveApplication(ApplicationId appId,
|
||||
String targetQueueName) throws YarnException {
|
||||
FiCaSchedulerApp app =
|
||||
getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
|
||||
String sourceQueueName = app.getQueue().getQueueName();
|
||||
LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
|
||||
LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
|
||||
// Validation check - ACLs, submission limits for user & queue
|
||||
String user = app.getUser();
|
||||
try {
|
||||
dest.submitApplication(appId, user, targetQueueName);
|
||||
} catch (AccessControlException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
// Move all live containers
|
||||
for (RMContainer rmContainer : app.getLiveContainers()) {
|
||||
source.detachContainer(clusterResource, app, rmContainer);
|
||||
// attach the Container to another queue
|
||||
dest.attachContainer(clusterResource, app, rmContainer);
|
||||
}
|
||||
// Detach the application..
|
||||
source.finishApplicationAttempt(app, sourceQueueName);
|
||||
source.getParent().finishApplication(appId, app.getUser());
|
||||
// Finish app & update metrics
|
||||
app.move(dest);
|
||||
// Submit to a new queue
|
||||
dest.submitApplicationAttempt(app, user);
|
||||
applications.get(appId).setQueue(dest);
|
||||
LOG.info("App: " + app.getApplicationId() + " successfully moved from "
|
||||
+ sourceQueueName + " to: " + targetQueueName);
|
||||
return targetQueueName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the String provided in input is the name of an existing,
|
||||
* LeafQueue, if successful returns the queue.
|
||||
*
|
||||
* @param queue
|
||||
* @return the LeafQueue
|
||||
* @throws YarnException
|
||||
*/
|
||||
private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
|
||||
CSQueue ret = this.getQueue(queue);
|
||||
if (ret == null) {
|
||||
throw new YarnException("The specified Queue: " + queue
|
||||
+ " doesn't exist");
|
||||
}
|
||||
if (!(ret instanceof LeafQueue)) {
|
||||
throw new YarnException("The specified Queue: " + queue
|
||||
+ " is not a Leaf Queue. Move is supported only for Leaf Queues.");
|
||||
}
|
||||
return (LeafQueue) ret;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -643,7 +643,10 @@ public class LeafQueue implements CSQueue {
|
|||
addApplicationAttempt(application, user);
|
||||
}
|
||||
|
||||
metrics.submitAppAttempt(userName);
|
||||
// We don't want to update metrics for move app
|
||||
if (application.isPending()) {
|
||||
metrics.submitAppAttempt(userName);
|
||||
}
|
||||
getParent().submitApplicationAttempt(application, userName);
|
||||
}
|
||||
|
||||
|
@ -701,7 +704,6 @@ public class LeafQueue implements CSQueue {
|
|||
throw ace;
|
||||
}
|
||||
|
||||
metrics.submitApp(userName);
|
||||
}
|
||||
|
||||
private synchronized void activateApplications() {
|
||||
|
@ -1620,8 +1622,43 @@ public class LeafQueue implements CSQueue {
|
|||
@Override
|
||||
public void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps) {
|
||||
for (FiCaSchedulerApp pendingApp : pendingApplications) {
|
||||
apps.add(pendingApp.getApplicationAttemptId());
|
||||
}
|
||||
for (FiCaSchedulerApp app : activeApplications) {
|
||||
apps.add(app.getApplicationAttemptId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void attachContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, RMContainer rmContainer) {
|
||||
if (application != null) {
|
||||
allocateResource(clusterResource, application, rmContainer.getContainer()
|
||||
.getResource());
|
||||
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
||||
+ " resource=" + rmContainer.getContainer().getResource()
|
||||
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
|
||||
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
||||
+ usedResources + " cluster=" + clusterResource);
|
||||
// Inform the parent queue
|
||||
getParent().attachContainer(clusterResource, application, rmContainer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void detachContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, RMContainer rmContainer) {
|
||||
if (application != null) {
|
||||
releaseResource(clusterResource, application, rmContainer.getContainer()
|
||||
.getResource());
|
||||
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
||||
+ " resource=" + rmContainer.getContainer().getResource()
|
||||
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
|
||||
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
||||
+ usedResources + " cluster=" + clusterResource);
|
||||
// Inform the parent queue
|
||||
getParent().detachContainer(clusterResource, application, rmContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -791,4 +791,37 @@ public class ParentQueue implements CSQueue {
|
|||
queue.collectSchedulerApplications(apps);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void attachContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, RMContainer rmContainer) {
|
||||
if (application != null) {
|
||||
allocateResource(clusterResource, rmContainer.getContainer()
|
||||
.getResource());
|
||||
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
|
||||
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
|
||||
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
|
||||
+ clusterResource);
|
||||
// Inform the parent
|
||||
if (parent != null) {
|
||||
parent.attachContainer(clusterResource, application, rmContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void detachContainer(Resource clusterResource,
|
||||
FiCaSchedulerApp application, RMContainer rmContainer) {
|
||||
if (application != null) {
|
||||
releaseResource(clusterResource, rmContainer.getContainer().getResource());
|
||||
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
|
||||
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
|
||||
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
|
||||
+ clusterResource);
|
||||
// Inform the parent
|
||||
if (parent != null) {
|
||||
parent.detachContainer(clusterResource, application, rmContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
|
@ -100,6 +102,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
|
@ -1014,4 +1020,782 @@ public class TestCapacityScheduler {
|
|||
// Now with updated ResourceRequest, a container is allocated for AM.
|
||||
Assert.assertTrue(containers.size() == 1);
|
||||
}
|
||||
|
||||
private MockRM setUpMove() {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
return rm;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppBasic() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
|
||||
// submit an app
|
||||
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||
ApplicationAttemptId appAttemptId =
|
||||
rm.getApplicationReport(app.getApplicationId())
|
||||
.getCurrentApplicationAttemptId();
|
||||
|
||||
// check preconditions
|
||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
String queue =
|
||||
scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
Assert.assertTrue(queue.equals("a1"));
|
||||
|
||||
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInA.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertTrue(appsInB1.isEmpty());
|
||||
|
||||
List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInB.isEmpty());
|
||||
|
||||
// now move the app
|
||||
scheduler.moveApplication(app.getApplicationId(), "b1");
|
||||
|
||||
// check postconditions
|
||||
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertEquals(1, appsInB1.size());
|
||||
queue =
|
||||
scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
System.out.println(queue);
|
||||
Assert.assertTrue(queue.equals("b1"));
|
||||
|
||||
appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInB.size());
|
||||
|
||||
appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertTrue(appsInA1.isEmpty());
|
||||
|
||||
appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.isEmpty());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppSameParent() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
|
||||
// submit an app
|
||||
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||
ApplicationAttemptId appAttemptId =
|
||||
rm.getApplicationReport(app.getApplicationId())
|
||||
.getCurrentApplicationAttemptId();
|
||||
|
||||
// check preconditions
|
||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
String queue =
|
||||
scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
Assert.assertTrue(queue.equals("a1"));
|
||||
|
||||
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInA.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2");
|
||||
assertTrue(appsInA2.isEmpty());
|
||||
|
||||
// now move the app
|
||||
scheduler.moveApplication(app.getApplicationId(), "a2");
|
||||
|
||||
// check postconditions
|
||||
appsInA2 = scheduler.getAppsInQueue("a2");
|
||||
assertEquals(1, appsInA2.size());
|
||||
queue =
|
||||
scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
Assert.assertTrue(queue.equals("a2"));
|
||||
|
||||
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertTrue(appsInA1.isEmpty());
|
||||
|
||||
appsInA = scheduler.getAppsInQueue("a");
|
||||
assertEquals(1, appsInA.size());
|
||||
|
||||
appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
|
||||
|
||||
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
|
||||
// Register node1
|
||||
String host_0 = "host_0";
|
||||
NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(4 * GB, 1));
|
||||
|
||||
// Register node2
|
||||
String host_1 = "host_1";
|
||||
NodeManager nm_1 =
|
||||
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(2 * GB, 1));
|
||||
|
||||
// ResourceRequest priorities
|
||||
Priority priority_0 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(0);
|
||||
Priority priority_1 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(1);
|
||||
|
||||
// Submit application_0
|
||||
Application application_0 =
|
||||
new Application("user_0", "a1", resourceManager);
|
||||
application_0.submit(); // app + app attempt event sent to scheduler
|
||||
|
||||
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||
application_0.addNodeManager(host_1, 1234, nm_1);
|
||||
|
||||
Resource capability_0_0 = Resources.createResource(1 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_1, capability_0_0);
|
||||
|
||||
Resource capability_0_1 = Resources.createResource(2 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_0, capability_0_1);
|
||||
|
||||
Task task_0_0 =
|
||||
new Task(application_0, priority_1, new String[] { host_0, host_1 });
|
||||
application_0.addTask(task_0_0);
|
||||
|
||||
// Submit application_1
|
||||
Application application_1 =
|
||||
new Application("user_1", "b2", resourceManager);
|
||||
application_1.submit(); // app + app attempt event sent to scheduler
|
||||
|
||||
application_1.addNodeManager(host_0, 1234, nm_0);
|
||||
application_1.addNodeManager(host_1, 1234, nm_1);
|
||||
|
||||
Resource capability_1_0 = Resources.createResource(1 * GB, 1);
|
||||
application_1.addResourceRequestSpec(priority_1, capability_1_0);
|
||||
|
||||
Resource capability_1_1 = Resources.createResource(2 * GB, 1);
|
||||
application_1.addResourceRequestSpec(priority_0, capability_1_1);
|
||||
|
||||
Task task_1_0 =
|
||||
new Task(application_1, priority_1, new String[] { host_0, host_1 });
|
||||
application_1.addTask(task_1_0);
|
||||
|
||||
// Send resource requests to the scheduler
|
||||
application_0.schedule(); // allocate
|
||||
application_1.schedule(); // allocate
|
||||
|
||||
// task_0_0 task_1_0 allocated, used=2G
|
||||
nodeUpdate(nm_0);
|
||||
|
||||
// nothing allocated
|
||||
nodeUpdate(nm_1);
|
||||
|
||||
// Get allocations from the scheduler
|
||||
application_0.schedule(); // task_0_0
|
||||
checkApplicationResourceUsage(1 * GB, application_0);
|
||||
|
||||
application_1.schedule(); // task_1_0
|
||||
checkApplicationResourceUsage(1 * GB, application_1);
|
||||
|
||||
checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G
|
||||
// available
|
||||
checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available
|
||||
|
||||
// move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
|
||||
// total cap)
|
||||
scheduler.moveApplication(application_0.getApplicationId(), "b1");
|
||||
|
||||
// 2GB 1C
|
||||
Task task_1_1 =
|
||||
new Task(application_1, priority_0,
|
||||
new String[] { ResourceRequest.ANY });
|
||||
application_1.addTask(task_1_1);
|
||||
|
||||
application_1.schedule();
|
||||
|
||||
// 2GB 1C
|
||||
Task task_0_1 =
|
||||
new Task(application_0, priority_0, new String[] { host_0, host_1 });
|
||||
application_0.addTask(task_0_1);
|
||||
|
||||
application_0.schedule();
|
||||
|
||||
// prev 2G used free 2G
|
||||
nodeUpdate(nm_0);
|
||||
|
||||
// prev 0G used free 2G
|
||||
nodeUpdate(nm_1);
|
||||
|
||||
// Get allocations from the scheduler
|
||||
application_1.schedule();
|
||||
checkApplicationResourceUsage(3 * GB, application_1);
|
||||
|
||||
// Get allocations from the scheduler
|
||||
application_0.schedule();
|
||||
checkApplicationResourceUsage(3 * GB, application_0);
|
||||
|
||||
checkNodeResourceUsage(4 * GB, nm_0);
|
||||
checkNodeResourceUsage(2 * GB, nm_1);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppSuccess() throws Exception {
|
||||
|
||||
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
|
||||
|
||||
// Register node1
|
||||
String host_0 = "host_0";
|
||||
NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(5 * GB, 1));
|
||||
|
||||
// Register node2
|
||||
String host_1 = "host_1";
|
||||
NodeManager nm_1 =
|
||||
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(5 * GB, 1));
|
||||
|
||||
// ResourceRequest priorities
|
||||
Priority priority_0 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(0);
|
||||
Priority priority_1 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(1);
|
||||
|
||||
// Submit application_0
|
||||
Application application_0 =
|
||||
new Application("user_0", "a1", resourceManager);
|
||||
application_0.submit(); // app + app attempt event sent to scheduler
|
||||
|
||||
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||
application_0.addNodeManager(host_1, 1234, nm_1);
|
||||
|
||||
Resource capability_0_0 = Resources.createResource(3 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_1, capability_0_0);
|
||||
|
||||
Resource capability_0_1 = Resources.createResource(2 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_0, capability_0_1);
|
||||
|
||||
Task task_0_0 =
|
||||
new Task(application_0, priority_1, new String[] { host_0, host_1 });
|
||||
application_0.addTask(task_0_0);
|
||||
|
||||
// Submit application_1
|
||||
Application application_1 =
|
||||
new Application("user_1", "b2", resourceManager);
|
||||
application_1.submit(); // app + app attempt event sent to scheduler
|
||||
|
||||
application_1.addNodeManager(host_0, 1234, nm_0);
|
||||
application_1.addNodeManager(host_1, 1234, nm_1);
|
||||
|
||||
Resource capability_1_0 = Resources.createResource(1 * GB, 1);
|
||||
application_1.addResourceRequestSpec(priority_1, capability_1_0);
|
||||
|
||||
Resource capability_1_1 = Resources.createResource(2 * GB, 1);
|
||||
application_1.addResourceRequestSpec(priority_0, capability_1_1);
|
||||
|
||||
Task task_1_0 =
|
||||
new Task(application_1, priority_1, new String[] { host_0, host_1 });
|
||||
application_1.addTask(task_1_0);
|
||||
|
||||
// Send resource requests to the scheduler
|
||||
application_0.schedule(); // allocate
|
||||
application_1.schedule(); // allocate
|
||||
|
||||
// b2 can only run 1 app at a time
|
||||
scheduler.moveApplication(application_0.getApplicationId(), "b2");
|
||||
|
||||
nodeUpdate(nm_0);
|
||||
|
||||
nodeUpdate(nm_1);
|
||||
|
||||
// Get allocations from the scheduler
|
||||
application_0.schedule(); // task_0_0
|
||||
checkApplicationResourceUsage(0 * GB, application_0);
|
||||
|
||||
application_1.schedule(); // task_1_0
|
||||
checkApplicationResourceUsage(1 * GB, application_1);
|
||||
|
||||
// task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
|
||||
// not scheduled
|
||||
checkNodeResourceUsage(1 * GB, nm_0);
|
||||
checkNodeResourceUsage(0 * GB, nm_1);
|
||||
|
||||
// lets move application_0 to a queue where it can run
|
||||
scheduler.moveApplication(application_0.getApplicationId(), "a2");
|
||||
application_0.schedule();
|
||||
|
||||
nodeUpdate(nm_1);
|
||||
|
||||
// Get allocations from the scheduler
|
||||
application_0.schedule(); // task_0_0
|
||||
checkApplicationResourceUsage(3 * GB, application_0);
|
||||
|
||||
checkNodeResourceUsage(1 * GB, nm_0);
|
||||
checkNodeResourceUsage(3 * GB, nm_1);
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = YarnException.class)
|
||||
public void testMoveAppViolateQueueState() throws Exception {
|
||||
|
||||
resourceManager = new ResourceManager();
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(csConf);
|
||||
StringBuilder qState = new StringBuilder();
|
||||
qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
|
||||
.append(CapacitySchedulerConfiguration.DOT)
|
||||
.append(CapacitySchedulerConfiguration.STATE);
|
||||
csConf.set(qState.toString(), QueueState.STOPPED.name());
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
resourceManager.init(conf);
|
||||
resourceManager.getRMContext().getContainerTokenSecretManager()
|
||||
.rollMasterKey();
|
||||
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
|
||||
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
|
||||
mockContext = mock(RMContext.class);
|
||||
when(mockContext.getConfigurationProvider()).thenReturn(
|
||||
new LocalConfigurationProvider());
|
||||
|
||||
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
|
||||
|
||||
// Register node1
|
||||
String host_0 = "host_0";
|
||||
NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(6 * GB, 1));
|
||||
|
||||
// ResourceRequest priorities
|
||||
Priority priority_0 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(0);
|
||||
Priority priority_1 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(1);
|
||||
|
||||
// Submit application_0
|
||||
Application application_0 =
|
||||
new Application("user_0", "a1", resourceManager);
|
||||
application_0.submit(); // app + app attempt event sent to scheduler
|
||||
|
||||
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||
|
||||
Resource capability_0_0 = Resources.createResource(3 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_1, capability_0_0);
|
||||
|
||||
Resource capability_0_1 = Resources.createResource(2 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_0, capability_0_1);
|
||||
|
||||
Task task_0_0 =
|
||||
new Task(application_0, priority_1, new String[] { host_0 });
|
||||
application_0.addTask(task_0_0);
|
||||
|
||||
// Send resource requests to the scheduler
|
||||
application_0.schedule(); // allocate
|
||||
|
||||
// task_0_0 allocated
|
||||
nodeUpdate(nm_0);
|
||||
|
||||
// Get allocations from the scheduler
|
||||
application_0.schedule(); // task_0_0
|
||||
checkApplicationResourceUsage(3 * GB, application_0);
|
||||
|
||||
checkNodeResourceUsage(3 * GB, nm_0);
|
||||
// b2 queue contains 3GB consumption app,
|
||||
// add another 3GB will hit max capacity limit on queue b
|
||||
scheduler.moveApplication(application_0.getApplicationId(), "b1");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppQueueMetricsCheck() throws Exception {
|
||||
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
|
||||
|
||||
// Register node1
|
||||
String host_0 = "host_0";
|
||||
NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(5 * GB, 1));
|
||||
|
||||
// Register node2
|
||||
String host_1 = "host_1";
|
||||
NodeManager nm_1 =
|
||||
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(5 * GB, 1));
|
||||
|
||||
// ResourceRequest priorities
|
||||
Priority priority_0 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(0);
|
||||
Priority priority_1 =
|
||||
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||
.create(1);
|
||||
|
||||
// Submit application_0
|
||||
Application application_0 =
|
||||
new Application("user_0", "a1", resourceManager);
|
||||
application_0.submit(); // app + app attempt event sent to scheduler
|
||||
|
||||
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||
application_0.addNodeManager(host_1, 1234, nm_1);
|
||||
|
||||
Resource capability_0_0 = Resources.createResource(3 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_1, capability_0_0);
|
||||
|
||||
Resource capability_0_1 = Resources.createResource(2 * GB, 1);
|
||||
application_0.addResourceRequestSpec(priority_0, capability_0_1);
|
||||
|
||||
Task task_0_0 =
|
||||
new Task(application_0, priority_1, new String[] { host_0, host_1 });
|
||||
application_0.addTask(task_0_0);
|
||||
|
||||
// Submit application_1
|
||||
Application application_1 =
|
||||
new Application("user_1", "b2", resourceManager);
|
||||
application_1.submit(); // app + app attempt event sent to scheduler
|
||||
|
||||
application_1.addNodeManager(host_0, 1234, nm_0);
|
||||
application_1.addNodeManager(host_1, 1234, nm_1);
|
||||
|
||||
Resource capability_1_0 = Resources.createResource(1 * GB, 1);
|
||||
application_1.addResourceRequestSpec(priority_1, capability_1_0);
|
||||
|
||||
Resource capability_1_1 = Resources.createResource(2 * GB, 1);
|
||||
application_1.addResourceRequestSpec(priority_0, capability_1_1);
|
||||
|
||||
Task task_1_0 =
|
||||
new Task(application_1, priority_1, new String[] { host_0, host_1 });
|
||||
application_1.addTask(task_1_0);
|
||||
|
||||
// Send resource requests to the scheduler
|
||||
application_0.schedule(); // allocate
|
||||
application_1.schedule(); // allocate
|
||||
|
||||
nodeUpdate(nm_0);
|
||||
|
||||
nodeUpdate(nm_1);
|
||||
|
||||
CapacityScheduler cs =
|
||||
(CapacityScheduler) resourceManager.getResourceScheduler();
|
||||
CSQueue origRootQ = cs.getRootQueue();
|
||||
CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ);
|
||||
int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
|
||||
int origNumAppsRoot = origRootQ.getNumApplications();
|
||||
|
||||
scheduler.moveApplication(application_0.getApplicationId(), "a2");
|
||||
|
||||
CSQueue newRootQ = cs.getRootQueue();
|
||||
int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
|
||||
int newNumAppsRoot = newRootQ.getNumApplications();
|
||||
CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ);
|
||||
CapacitySchedulerLeafQueueInfo origOldA1 =
|
||||
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
|
||||
CapacitySchedulerLeafQueueInfo origNewA1 =
|
||||
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
|
||||
CapacitySchedulerLeafQueueInfo targetOldA2 =
|
||||
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
|
||||
CapacitySchedulerLeafQueueInfo targetNewA2 =
|
||||
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
|
||||
// originally submitted here
|
||||
assertEquals(1, origOldA1.getNumApplications());
|
||||
assertEquals(1, origNumAppsA);
|
||||
assertEquals(2, origNumAppsRoot);
|
||||
// after the move
|
||||
assertEquals(0, origNewA1.getNumApplications());
|
||||
assertEquals(1, newNumAppsA);
|
||||
assertEquals(2, newNumAppsRoot);
|
||||
// original consumption on a1
|
||||
assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory());
|
||||
assertEquals(1, origOldA1.getResourcesUsed().getvCores());
|
||||
assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move
|
||||
assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
|
||||
// app moved here with live containers
|
||||
assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory());
|
||||
assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
|
||||
// it was empty before the move
|
||||
assertEquals(0, targetOldA2.getNumApplications());
|
||||
assertEquals(0, targetOldA2.getResourcesUsed().getMemory());
|
||||
assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
|
||||
// after the app moved here
|
||||
assertEquals(1, targetNewA2.getNumApplications());
|
||||
// 1 container on original queue before move
|
||||
assertEquals(1, origOldA1.getNumContainers());
|
||||
// after the move the resource released
|
||||
assertEquals(0, origNewA1.getNumContainers());
|
||||
// and moved to the new queue
|
||||
assertEquals(1, targetNewA2.getNumContainers());
|
||||
// which originally didn't have any
|
||||
assertEquals(0, targetOldA2.getNumContainers());
|
||||
// 1 user with 3GB
|
||||
assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
|
||||
.getResourcesUsed().getMemory());
|
||||
// 1 user with 1 core
|
||||
assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
|
||||
.getResourcesUsed().getvCores());
|
||||
// user ha no more running app in the orig queue
|
||||
assertEquals(0, origNewA1.getUsers().getUsersList().size());
|
||||
// 1 user with 3GB
|
||||
assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
|
||||
.getResourcesUsed().getMemory());
|
||||
// 1 user with 1 core
|
||||
assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
|
||||
.getResourcesUsed().getvCores());
|
||||
|
||||
// Get allocations from the scheduler
|
||||
application_0.schedule(); // task_0_0
|
||||
checkApplicationResourceUsage(3 * GB, application_0);
|
||||
|
||||
application_1.schedule(); // task_1_0
|
||||
checkApplicationResourceUsage(1 * GB, application_1);
|
||||
|
||||
// task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
|
||||
// not scheduled
|
||||
checkNodeResourceUsage(4 * GB, nm_0);
|
||||
checkNodeResourceUsage(0 * GB, nm_1);
|
||||
|
||||
}
|
||||
|
||||
private int getNumAppsInQueue(String name, List<CSQueue> queues) {
|
||||
for (CSQueue queue : queues) {
|
||||
if (queue.getQueueName().equals(name)) {
|
||||
return queue.getNumApplications();
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
private CapacitySchedulerQueueInfo getQueueInfo(String name,
|
||||
CapacitySchedulerQueueInfoList info) {
|
||||
if (info != null) {
|
||||
for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
|
||||
if (queueInfo.getQueueName().equals(name)) {
|
||||
return queueInfo;
|
||||
} else {
|
||||
CapacitySchedulerQueueInfo result =
|
||||
getQueueInfo(name, queueInfo.getQueues());
|
||||
if (result == null) {
|
||||
continue;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAllApps() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
|
||||
// submit an app
|
||||
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||
ApplicationAttemptId appAttemptId =
|
||||
rm.getApplicationReport(app.getApplicationId())
|
||||
.getCurrentApplicationAttemptId();
|
||||
|
||||
// check preconditions
|
||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInA.size());
|
||||
String queue =
|
||||
scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
Assert.assertTrue(queue.equals("a1"));
|
||||
|
||||
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertTrue(appsInB1.isEmpty());
|
||||
|
||||
List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInB.isEmpty());
|
||||
|
||||
// now move the app
|
||||
scheduler.moveAllApps("a1", "b1");
|
||||
|
||||
// check postconditions
|
||||
Thread.sleep(1000);
|
||||
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertEquals(1, appsInB1.size());
|
||||
queue =
|
||||
scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
Assert.assertTrue(queue.equals("b1"));
|
||||
|
||||
appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInB.size());
|
||||
|
||||
appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertTrue(appsInA1.isEmpty());
|
||||
|
||||
appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.isEmpty());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAllAppsInvalidDestination() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
|
||||
// submit an app
|
||||
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||
ApplicationAttemptId appAttemptId =
|
||||
rm.getApplicationReport(app.getApplicationId())
|
||||
.getCurrentApplicationAttemptId();
|
||||
|
||||
// check preconditions
|
||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInA.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertTrue(appsInB1.isEmpty());
|
||||
|
||||
List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInB.isEmpty());
|
||||
|
||||
// now move the app
|
||||
try {
|
||||
scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// check postconditions, app should still be in a1
|
||||
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
|
||||
appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInA.size());
|
||||
|
||||
appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertTrue(appsInB1.isEmpty());
|
||||
|
||||
appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInB.isEmpty());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAllAppsInvalidSource() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
|
||||
// submit an app
|
||||
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||
ApplicationAttemptId appAttemptId =
|
||||
rm.getApplicationReport(app.getApplicationId())
|
||||
.getCurrentApplicationAttemptId();
|
||||
|
||||
// check preconditions
|
||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInA.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertTrue(appsInB1.isEmpty());
|
||||
|
||||
List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInB.isEmpty());
|
||||
|
||||
// now move the app
|
||||
try {
|
||||
scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// check postconditions, app should still be in a1
|
||||
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
|
||||
appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(appAttemptId));
|
||||
assertEquals(1, appsInA.size());
|
||||
|
||||
appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(appAttemptId));
|
||||
assertEquals(1, appsInRoot.size());
|
||||
|
||||
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||
assertTrue(appsInB1.isEmpty());
|
||||
|
||||
appsInB = scheduler.getAppsInQueue("b");
|
||||
assertTrue(appsInB.isEmpty());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue