YARN-2389. Added functionality for schedulers to kill all applications in a queue. Contributed by Subramaniam Venkatraman Krishnan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618294 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-08-15 23:53:57 +00:00
parent ddf108449d
commit c3084d6c16
4 changed files with 127 additions and 3 deletions

View File

@ -140,6 +140,9 @@ Release 2.6.0 - UNRELEASED
YARN-1370. Fair scheduler to re-populate container allocation state. YARN-1370. Fair scheduler to re-populate container allocation state.
(Anubhav Dhoot via kasha) (Anubhav Dhoot via kasha)
YARN-2389. Added functionality for schedulers to kill all applications in a
queue. (Subramaniam Venkatraman Krishnan via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -348,4 +350,23 @@ public abstract class AbstractYarnScheduler
.handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
} }
} }
@Override
public synchronized void killAllAppsInQueue(String queueName)
throws YarnException {
// check if queue is a valid
List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
if (apps == null) {
String errMsg = "The specified Queue: " + queueName + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate kill events for each pending/running app
for (ApplicationAttemptId app : apps) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
}
}
} }

View File

@ -212,4 +212,12 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @throws YarnException * @throws YarnException
*/ */
void moveAllApps(String sourceQueue, String destQueue) throws YarnException; void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
/**
* Terminate all applications in the specified queue.
*
* @param queueName the name of queue to be drained
* @throws YarnException
*/
void killAllAppsInQueue(String queueName) throws YarnException;
} }

View File

@ -1074,11 +1074,10 @@ public class TestCapacityScheduler {
queue = queue =
scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
.getQueueName(); .getQueueName();
System.out.println(queue);
Assert.assertTrue(queue.equals("b1")); Assert.assertTrue(queue.equals("b1"));
appsInB = scheduler.getAppsInQueue("b"); appsInB = scheduler.getAppsInQueue("b");
assertTrue(appsInA.contains(appAttemptId)); assertTrue(appsInB.contains(appAttemptId));
assertEquals(1, appsInB.size()); assertEquals(1, appsInB.size());
appsInRoot = scheduler.getAppsInQueue("root"); appsInRoot = scheduler.getAppsInQueue("root");
@ -1140,6 +1139,7 @@ public class TestCapacityScheduler {
assertTrue(appsInA1.isEmpty()); assertTrue(appsInA1.isEmpty());
appsInA = scheduler.getAppsInQueue("a"); appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(appAttemptId));
assertEquals(1, appsInA.size()); assertEquals(1, appsInA.size());
appsInRoot = scheduler.getAppsInQueue("root"); appsInRoot = scheduler.getAppsInQueue("root");
@ -1664,7 +1664,7 @@ public class TestCapacityScheduler {
Assert.assertTrue(queue.equals("b1")); Assert.assertTrue(queue.equals("b1"));
appsInB = scheduler.getAppsInQueue("b"); appsInB = scheduler.getAppsInQueue("b");
assertTrue(appsInA.contains(appAttemptId)); assertTrue(appsInB.contains(appAttemptId));
assertEquals(1, appsInB.size()); assertEquals(1, appsInB.size());
appsInRoot = scheduler.getAppsInQueue("root"); appsInRoot = scheduler.getAppsInQueue("root");
@ -1798,4 +1798,96 @@ public class TestCapacityScheduler {
rm.stop(); rm.stop();
} }
@Test
public void testKillAllAppsInQueue() throws Exception {
MockRM rm = setUpMove();
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
ApplicationAttemptId appAttemptId =
rm.getApplicationReport(app.getApplicationId())
.getCurrentApplicationAttemptId();
// check preconditions
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(1, appsInA1.size());
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(appAttemptId));
assertEquals(1, appsInA.size());
String queue =
scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
.getQueueName();
Assert.assertTrue(queue.equals("a1"));
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(appAttemptId));
assertEquals(1, appsInRoot.size());
// now kill the app
scheduler.killAllAppsInQueue("a1");
// check postconditions
rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.isEmpty());
appsInA1 = scheduler.getAppsInQueue("a1");
assertTrue(appsInA1.isEmpty());
appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.isEmpty());
rm.stop();
}
@Test
public void testKillAllAppsInvalidSource() throws Exception {
MockRM rm = setUpMove();
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
ApplicationAttemptId appAttemptId =
rm.getApplicationReport(app.getApplicationId())
.getCurrentApplicationAttemptId();
// check preconditions
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(1, appsInA1.size());
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(appAttemptId));
assertEquals(1, appsInA.size());
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(appAttemptId));
assertEquals(1, appsInRoot.size());
// now kill the app
try {
scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
Assert.fail();
} catch (YarnException e) {
// expected
}
// check postconditions, app should still be in a1
appsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(1, appsInA1.size());
appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(appAttemptId));
assertEquals(1, appsInA.size());
appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(appAttemptId));
assertEquals(1, appsInRoot.size());
rm.stop();
}
} }