From 24102a4e6cf771b214dd22d42d862cbc11045675 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 15 Aug 2014 06:02:16 +0000 Subject: [PATCH] Merge r1618106 from trunk. YARN-2378. Added support for moving applications across queues in CapacityScheduler. Contributed by Subramaniam Venkatraman Krishnan git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1618107 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 19 +- .../scheduler/AbstractYarnScheduler.java | 31 + .../scheduler/AppSchedulingInfo.java | 3 +- .../scheduler/YarnScheduler.java | 10 + .../scheduler/capacity/CSQueue.java | 18 + .../scheduler/capacity/CapacityScheduler.java | 58 ++ .../scheduler/capacity/LeafQueue.java | 41 +- .../scheduler/capacity/ParentQueue.java | 33 + .../capacity/TestCapacityScheduler.java | 784 ++++++++++++++++++ 10 files changed, 989 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7e771dfe32f..da380b24a30 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -26,6 +26,9 @@ Release 2.6.0 - UNRELEASED YARN-2277. Added cross-origin support for the timeline server web services. (Jonathan Eagles via zjshen) + YARN-2378. Added support for moving applications across queues in + CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 45c89592af8..48cf4602991 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -166,6 +166,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), RMAppState.FAILED)) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.MOVE, new RMAppMoveTransition()) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, @@ -243,7 +245,7 @@ public class RMAppImpl implements RMApp, Recoverable { // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, - RMAppEventType.APP_NEW_SAVED)) + RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, @@ -254,9 +256,9 @@ public class RMAppImpl implements RMApp, Recoverable { // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, - // ignore Kill as we have already saved the final Finished state in - // state store. - RMAppEventType.KILL)) + // ignore Kill/Move as we have already saved the final Finished state + // in state store. + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from KILLING state .addTransition(RMAppState.KILLING, RMAppState.KILLING, @@ -274,7 +276,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.APP_UPDATE_SAVED, - RMAppEventType.KILL)) + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from FINISHED state // ignorable transitions @@ -286,7 +288,7 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL)) + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from FAILED state // ignorable transitions @@ -294,7 +296,8 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, + RMAppEventType.MOVE)) // Transitions from KILLED state // ignorable transitions @@ -307,7 +310,7 @@ public class RMAppImpl implements RMApp, Recoverable { EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.NODE_UPDATE)) + RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE)) .installTopology(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5764c8c7a65..e5f22b8fc41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; @@ -48,6 +50,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.util.concurrent.SettableFuture; + @SuppressWarnings("unchecked") public abstract class AbstractYarnScheduler @@ -317,4 +321,31 @@ public abstract class AbstractYarnScheduler public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } + + @Override + public synchronized void moveAllApps(String sourceQueue, String destQueue) + throws YarnException { + // check if destination queue is a valid leaf queue + try { + getQueueInfo(destQueue, false, false); + } catch (IOException e) { + LOG.warn(e); + throw new YarnException(e); + } + // check if source queue is a valid + List apps = getAppsInQueue(sourceQueue); + if (apps == null) { + String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + // generate move events for each pending/running app + for (ApplicationAttemptId app : apps) { + SettableFuture future = SettableFuture.create(); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 3c6f210b4b6..e871f429e16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -54,7 +54,7 @@ public class AppSchedulingInfo { private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); private final ApplicationAttemptId applicationAttemptId; final ApplicationId applicationId; - private final String queueName; + private String queueName; Queue queue; final String user; // TODO making containerIdCounter long @@ -410,6 +410,7 @@ public class AppSchedulingInfo { activeUsersManager = newQueue.getActiveUsersManager(); activeUsersManager.activateApplication(user, applicationId); this.queue = newQueue; + this.queueName = newQueue.getQueueName(); } synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 21eba396985..79dc773b410 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -202,4 +202,14 @@ public interface YarnScheduler extends EventHandler { @Evolving public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; + + /** + * Completely drain sourceQueue of applications, by moving all of them to + * destQueue. + * + * @param sourceQueue + * @param destQueue + * @throws YarnException + */ + void moveAllApps(String sourceQueue, String destQueue) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index ccb71e21236..04c2fd51bf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -238,4 +238,22 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @param apps the collection to add the applications to */ public void collectSchedulerApplications(Collection apps); + + /** + * Detach a container from this queue + * @param clusterResource the current cluster resource + * @param application application to which the container was assigned + * @param container the container to detach + */ + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); + + /** + * Attach a container to this queue + * @param clusterResource the current cluster resource + * @param application application to which the container was assigned + * @param container the container to attach + */ + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 65ff81c0216..1b31a1a6bbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -547,6 +548,8 @@ public class CapacityScheduler extends .handle(new RMAppRejectedEvent(applicationId, ace.toString())); return; } + // update the metrics + queue.getMetrics().submitApp(user); SchedulerApplication application = new SchedulerApplication(queue, user); applications.put(applicationId, application); @@ -1131,4 +1134,59 @@ public class CapacityScheduler extends throw new IOException(e); } } + + @Override + public synchronized String moveApplication(ApplicationId appId, + String targetQueueName) throws YarnException { + FiCaSchedulerApp app = + getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0)); + String sourceQueueName = app.getQueue().getQueueName(); + LeafQueue source = getAndCheckLeafQueue(sourceQueueName); + LeafQueue dest = getAndCheckLeafQueue(targetQueueName); + // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); + try { + dest.submitApplication(appId, user, targetQueueName); + } catch (AccessControlException e) { + throw new YarnException(e); + } + // Move all live containers + for (RMContainer rmContainer : app.getLiveContainers()) { + source.detachContainer(clusterResource, app, rmContainer); + // attach the Container to another queue + dest.attachContainer(clusterResource, app, rmContainer); + } + // Detach the application.. + source.finishApplicationAttempt(app, sourceQueueName); + source.getParent().finishApplication(appId, app.getUser()); + // Finish app & update metrics + app.move(dest); + // Submit to a new queue + dest.submitApplicationAttempt(app, user); + applications.get(appId).setQueue(dest); + LOG.info("App: " + app.getApplicationId() + " successfully moved from " + + sourceQueueName + " to: " + targetQueueName); + return targetQueueName; + } + + /** + * Check that the String provided in input is the name of an existing, + * LeafQueue, if successful returns the queue. + * + * @param queue + * @return the LeafQueue + * @throws YarnException + */ + private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { + CSQueue ret = this.getQueue(queue); + if (ret == null) { + throw new YarnException("The specified Queue: " + queue + + " doesn't exist"); + } + if (!(ret instanceof LeafQueue)) { + throw new YarnException("The specified Queue: " + queue + + " is not a Leaf Queue. Move is supported only for Leaf Queues."); + } + return (LeafQueue) ret; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 4fd8b490577..5c93c5fc026 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -643,7 +643,10 @@ public class LeafQueue implements CSQueue { addApplicationAttempt(application, user); } - metrics.submitAppAttempt(userName); + // We don't want to update metrics for move app + if (application.isPending()) { + metrics.submitAppAttempt(userName); + } getParent().submitApplicationAttempt(application, userName); } @@ -701,7 +704,6 @@ public class LeafQueue implements CSQueue { throw ace; } - metrics.submitApp(userName); } private synchronized void activateApplications() { @@ -1620,8 +1622,43 @@ public class LeafQueue implements CSQueue { @Override public void collectSchedulerApplications( Collection apps) { + for (FiCaSchedulerApp pendingApp : pendingApplications) { + apps.add(pendingApp.getApplicationAttemptId()); + } for (FiCaSchedulerApp app : activeApplications) { apps.add(app.getApplicationAttemptId()); } } + + @Override + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + allocateResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().attachContainer(clusterResource, application, rmContainer); + } + } + + @Override + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + releaseResource(clusterResource, application, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + + " resource=" + rmContainer.getContainer().getResource() + + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + + usedResources + " cluster=" + clusterResource); + // Inform the parent queue + getParent().detachContainer(clusterResource, application, rmContainer); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index d83eed34cb3..8c654b7ded1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -791,4 +791,37 @@ public class ParentQueue implements CSQueue { queue.collectSchedulerApplications(apps); } } + + @Override + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + allocateResource(clusterResource, rmContainer.getContainer() + .getResource()); + LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + clusterResource); + // Inform the parent + if (parent != null) { + parent.attachContainer(clusterResource, application, rmContainer); + } + } + } + + @Override + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer rmContainer) { + if (application != null) { + releaseResource(clusterResource, rmContainer.getContainer().getResource()); + LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + clusterResource); + // Inform the parent + if (parent != null) { + parent.detachContainer(clusterResource, application, rmContainer); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 0efd48fa28d..45c4950202d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -100,6 +102,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; @@ -1014,4 +1020,782 @@ public class TestCapacityScheduler { // Now with updated ResourceRequest, a container is allocated for AM. Assert.assertTrue(containers.size() == 1); } + + private MockRM setUpMove() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + return rm; + } + + @Test + public void testMoveAppBasic() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "b1"); + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + System.out.println(queue); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAppSameParent() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List appsInA2 = scheduler.getAppsInQueue("a2"); + assertTrue(appsInA2.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "a2"); + + // check postconditions + appsInA2 = scheduler.getAppsInQueue("a2"); + assertEquals(1, appsInA2.size()); + queue = + scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a2")); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + + @Test + public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // task_0_0 task_1_0 allocated, used=2G + nodeUpdate(nm_0); + + // nothing allocated + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(1 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G + // available + checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available + + // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5% + // total cap) + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + // 2GB 1C + Task task_1_1 = + new Task(application_1, priority_0, + new String[] { ResourceRequest.ANY }); + application_1.addTask(task_1_1); + + application_1.schedule(); + + // 2GB 1C + Task task_0_1 = + new Task(application_0, priority_0, new String[] { host_0, host_1 }); + application_0.addTask(task_0_1); + + application_0.schedule(); + + // prev 2G used free 2G + nodeUpdate(nm_0); + + // prev 0G used free 2G + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + + // Get allocations from the scheduler + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(2 * GB, nm_1); + + } + + @Test + public void testMoveAppSuccess() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // b2 can only run 1 app at a time + scheduler.moveApplication(application_0.getApplicationId(), "b2"); + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(0 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + // lets move application_0 to a queue where it can run + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + application_0.schedule(); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm_1); + + } + + @Test(expected = YarnException.class) + public void testMoveAppViolateQueueState() throws Exception { + + resourceManager = new ResourceManager(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + StringBuilder qState = new StringBuilder(); + qState.append(CapacitySchedulerConfiguration.PREFIX).append(B) + .append(CapacitySchedulerConfiguration.DOT) + .append(CapacitySchedulerConfiguration.STATE); + csConf.set(qState.toString(), QueueState.STOPPED.name()); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start(); + mockContext = mock(RMContext.class); + when(mockContext.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(6 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + + // task_0_0 allocated + nodeUpdate(nm_0); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(3 * GB, nm_0); + // b2 queue contains 3GB consumption app, + // add another 3GB will hit max capacity limit on queue b + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + } + + @Test + public void testMoveAppQueueMetricsCheck() throws Exception { + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + CapacityScheduler cs = + (CapacityScheduler) resourceManager.getResourceScheduler(); + CSQueue origRootQ = cs.getRootQueue(); + CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ); + int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues()); + int origNumAppsRoot = origRootQ.getNumApplications(); + + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + + CSQueue newRootQ = cs.getRootQueue(); + int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues()); + int newNumAppsRoot = newRootQ.getNumApplications(); + CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ); + CapacitySchedulerLeafQueueInfo origOldA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo origNewA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetOldA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetNewA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues()); + // originally submitted here + assertEquals(1, origOldA1.getNumApplications()); + assertEquals(1, origNumAppsA); + assertEquals(2, origNumAppsRoot); + // after the move + assertEquals(0, origNewA1.getNumApplications()); + assertEquals(1, newNumAppsA); + assertEquals(2, newNumAppsRoot); + // original consumption on a1 + assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory()); + assertEquals(1, origOldA1.getResourcesUsed().getvCores()); + assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move + assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move + // app moved here with live containers + assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory()); + assertEquals(1, targetNewA2.getResourcesUsed().getvCores()); + // it was empty before the move + assertEquals(0, targetOldA2.getNumApplications()); + assertEquals(0, targetOldA2.getResourcesUsed().getMemory()); + assertEquals(0, targetOldA2.getResourcesUsed().getvCores()); + // after the app moved here + assertEquals(1, targetNewA2.getNumApplications()); + // 1 container on original queue before move + assertEquals(1, origOldA1.getNumContainers()); + // after the move the resource released + assertEquals(0, origNewA1.getNumContainers()); + // and moved to the new queue + assertEquals(1, targetNewA2.getNumContainers()); + // which originally didn't have any + assertEquals(0, targetOldA2.getNumContainers()); + // 1 user with 3GB + assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + // user ha no more running app in the orig queue + assertEquals(0, origNewA1.getUsers().getUsersList().size()); + // 1 user with 3GB + assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + } + + private int getNumAppsInQueue(String name, List queues) { + for (CSQueue queue : queues) { + if (queue.getQueueName().equals(name)) { + return queue.getNumApplications(); + } + } + return -1; + } + + private CapacitySchedulerQueueInfo getQueueInfo(String name, + CapacitySchedulerQueueInfoList info) { + if (info != null) { + for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) { + if (queueInfo.getQueueName().equals(name)) { + return queueInfo; + } else { + CapacitySchedulerQueueInfo result = + getQueueInfo(name, queueInfo.getQueues()); + if (result == null) { + continue; + } + return result; + } + } + } + return null; + } + + @Test + public void testMoveAllApps() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + // check postconditions + Thread.sleep(1000); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidDestination() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("a1", "DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("DOES_NOT_EXIST", "b1"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + }