YARN-6207. Move application across queues should handle delayed event processing. Contributed by Bibin A Chundatt.
This commit is contained in:
parent
afc2c438c1
commit
7e68257ffa
|
@ -1069,6 +1069,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
QueueMetrics newMetrics = newQueue.getMetrics();
|
||||
String newQueueName = newQueue.getQueueName();
|
||||
String user = getUser();
|
||||
|
||||
for (RMContainer liveContainer : liveContainers.values()) {
|
||||
Resource resource = liveContainer.getContainer().getResource();
|
||||
((RMContainerImpl) liveContainer).setQueueName(newQueueName);
|
||||
|
@ -1084,7 +1085,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
}
|
||||
}
|
||||
|
||||
appSchedulingInfo.move(newQueue);
|
||||
if (!isStopped) {
|
||||
appSchedulingInfo.move(newQueue);
|
||||
}
|
||||
this.queue = newQueue;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
|
|
@ -1939,36 +1939,47 @@ public class CapacityScheduler extends
|
|||
String targetQueueName) throws YarnException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
FiCaSchedulerApp app = getApplicationAttempt(
|
||||
ApplicationAttemptId.newInstance(appId, 0));
|
||||
String sourceQueueName = app.getQueue().getQueueName();
|
||||
LeafQueue source = this.queueManager.getAndCheckLeafQueue(
|
||||
sourceQueueName);
|
||||
SchedulerApplication<FiCaSchedulerApp> application =
|
||||
applications.get(appId);
|
||||
if (application == null) {
|
||||
throw new YarnException("App to be moved " + appId + " not found.");
|
||||
}
|
||||
String sourceQueueName = application.getQueue().getQueueName();
|
||||
LeafQueue source =
|
||||
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
|
||||
String destQueueName = handleMoveToPlanQueue(targetQueueName);
|
||||
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
|
||||
|
||||
String user = app.getUser();
|
||||
String user = application.getUser();
|
||||
try {
|
||||
dest.submitApplication(appId, user, destQueueName);
|
||||
} catch (AccessControlException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
// Move all live containers
|
||||
for (RMContainer rmContainer : app.getLiveContainers()) {
|
||||
source.detachContainer(getClusterResource(), app, rmContainer);
|
||||
// attach the Container to another queue
|
||||
dest.attachContainer(getClusterResource(), app, rmContainer);
|
||||
|
||||
FiCaSchedulerApp app = application.getCurrentAppAttempt();
|
||||
if (app != null) {
|
||||
// Move all live containers even when stopped.
|
||||
// For transferStateFromPreviousAttempt required
|
||||
for (RMContainer rmContainer : app.getLiveContainers()) {
|
||||
source.detachContainer(getClusterResource(), app, rmContainer);
|
||||
// attach the Container to another queue
|
||||
dest.attachContainer(getClusterResource(), app, rmContainer);
|
||||
}
|
||||
if (!app.isStopped()) {
|
||||
source.finishApplicationAttempt(app, sourceQueueName);
|
||||
// Submit to a new queue
|
||||
dest.submitApplicationAttempt(app, user);
|
||||
}
|
||||
// Finish app & update metrics
|
||||
app.move(dest);
|
||||
}
|
||||
source.appFinished();
|
||||
// Detach the application..
|
||||
source.finishApplicationAttempt(app, sourceQueueName);
|
||||
source.getParent().finishApplication(appId, app.getUser());
|
||||
// Finish app & update metrics
|
||||
app.move(dest);
|
||||
// Submit to a new queue
|
||||
dest.submitApplicationAttempt(app, user);
|
||||
applications.get(appId).setQueue(dest);
|
||||
LOG.info("App: " + app.getApplicationId() + " successfully moved from "
|
||||
+ sourceQueueName + " to: " + destQueueName);
|
||||
source.getParent().finishApplication(appId, user);
|
||||
application.setQueue(dest);
|
||||
LOG.info("App: " + appId + " successfully moved from " + sourceQueueName
|
||||
+ " to: " + destQueueName);
|
||||
return targetQueueName;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
@ -1980,15 +1991,23 @@ public class CapacityScheduler extends
|
|||
String newQueue) throws YarnException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
FiCaSchedulerApp app = getApplicationAttempt(
|
||||
ApplicationAttemptId.newInstance(appId, 0));
|
||||
String sourceQueueName = app.getQueue().getQueueName();
|
||||
SchedulerApplication<FiCaSchedulerApp> application =
|
||||
applications.get(appId);
|
||||
if (application == null) {
|
||||
throw new YarnException("App to be moved " + appId + " not found.");
|
||||
}
|
||||
String sourceQueueName = application.getQueue().getQueueName();
|
||||
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
|
||||
String destQueueName = handleMoveToPlanQueue(newQueue);
|
||||
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
|
||||
// Validation check - ACLs, submission limits for user & queue
|
||||
String user = app.getUser();
|
||||
checkQueuePartition(app, dest);
|
||||
String user = application.getUser();
|
||||
// Check active partition only when attempt is available
|
||||
FiCaSchedulerApp appAttempt =
|
||||
getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
|
||||
if (null != appAttempt) {
|
||||
checkQueuePartition(appAttempt, dest);
|
||||
}
|
||||
try {
|
||||
dest.validateSubmitApplication(appId, user, destQueueName);
|
||||
} catch (AccessControlException e) {
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
|
@ -110,12 +111,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
|
@ -2209,6 +2212,203 @@ public class TestCapacityScheduler {
|
|||
rm.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMoveAttemptNotAdded() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
MockRM rm = new MockRM(getCapacityConfiguration(conf));
|
||||
rm.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
|
||||
RMAppAttemptMetrics attemptMetric =
|
||||
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
|
||||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(appId);
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
Container container = mock(Container.class);
|
||||
when(attempt.getMasterContainer()).thenReturn(container);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
|
||||
rm.getRMContext().getRMApps().put(appId, app);
|
||||
|
||||
SchedulerEvent addAppEvent =
|
||||
new AppAddedSchedulerEvent(appId, "a1", "user");
|
||||
try {
|
||||
cs.moveApplication(appId, "b1");
|
||||
fail("Move should throw exception app not available");
|
||||
} catch (YarnException e) {
|
||||
assertEquals("App to be moved application_100_0001 not found.",
|
||||
e.getMessage());
|
||||
}
|
||||
cs.handle(addAppEvent);
|
||||
cs.moveApplication(appId, "b1");
|
||||
SchedulerEvent addAttemptEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
|
||||
cs.handle(addAttemptEvent);
|
||||
CSQueue rootQ = cs.getRootQueue();
|
||||
CSQueue queueB = cs.getQueue("b");
|
||||
CSQueue queueA = cs.getQueue("a");
|
||||
CSQueue queueA1 = cs.getQueue("a1");
|
||||
CSQueue queueB1 = cs.getQueue("b1");
|
||||
Assert.assertEquals(1, rootQ.getNumApplications());
|
||||
Assert.assertEquals(0, queueA.getNumApplications());
|
||||
Assert.assertEquals(1, queueB.getNumApplications());
|
||||
Assert.assertEquals(0, queueA1.getNumApplications());
|
||||
Assert.assertEquals(1, queueB1.getNumApplications());
|
||||
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAttemptMoveAdded() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
CapacityScheduler.class);
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
// Create Mock RM
|
||||
MockRM rm = new MockRM(getCapacityConfiguration(conf));
|
||||
CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
|
||||
// add node
|
||||
Resource newResource = Resource.newInstance(4 * GB, 1);
|
||||
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
|
||||
SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
|
||||
sch.handle(addNode);
|
||||
// create appid
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
|
||||
RMAppAttemptMetrics attemptMetric =
|
||||
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
|
||||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(appId);
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
Container container = mock(Container.class);
|
||||
when(attempt.getMasterContainer()).thenReturn(container);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
|
||||
rm.getRMContext().getRMApps().put(appId, app);
|
||||
// Add application
|
||||
SchedulerEvent addAppEvent =
|
||||
new AppAddedSchedulerEvent(appId, "a1", "user");
|
||||
sch.handle(addAppEvent);
|
||||
// Add application attempt
|
||||
SchedulerEvent addAttemptEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
|
||||
sch.handle(addAttemptEvent);
|
||||
// get Queues
|
||||
CSQueue queueA1 = sch.getQueue("a1");
|
||||
CSQueue queueB = sch.getQueue("b");
|
||||
CSQueue queueB1 = sch.getQueue("b1");
|
||||
|
||||
// add Running rm container and simulate live containers to a1
|
||||
ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
|
||||
RMContainerImpl rmContainer = mock(RMContainerImpl.class);
|
||||
when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
|
||||
Container container2 = mock(Container.class);
|
||||
when(rmContainer.getContainer()).thenReturn(container2);
|
||||
Resource resource = Resource.newInstance(1024, 1);
|
||||
when(container2.getResource()).thenReturn(resource);
|
||||
when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
|
||||
when(container2.getNodeId()).thenReturn(node.getNodeID());
|
||||
when(container2.getId()).thenReturn(newContainerId);
|
||||
when(rmContainer.getNodeLabelExpression())
|
||||
.thenReturn(RMNodeLabelsManager.NO_LABEL);
|
||||
when(rmContainer.getContainerId()).thenReturn(newContainerId);
|
||||
sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
|
||||
.put(newContainerId, rmContainer);
|
||||
QueueMetrics queueA1M = queueA1.getMetrics();
|
||||
queueA1M.incrPendingResources("user1", 1, resource);
|
||||
queueA1M.allocateResources("user1", resource);
|
||||
// remove attempt
|
||||
sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
|
||||
RMAppAttemptState.KILLED, true));
|
||||
// Move application to queue b1
|
||||
sch.moveApplication(appId, "b1");
|
||||
// Check queue metrics after move
|
||||
Assert.assertEquals(0, queueA1.getNumApplications());
|
||||
Assert.assertEquals(1, queueB.getNumApplications());
|
||||
Assert.assertEquals(0, queueB1.getNumApplications());
|
||||
|
||||
// Release attempt add event
|
||||
ApplicationAttemptId appAttemptId2 =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 2);
|
||||
SchedulerEvent addAttemptEvent2 =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
|
||||
sch.handle(addAttemptEvent2);
|
||||
|
||||
// Check metrics after attempt added
|
||||
Assert.assertEquals(0, queueA1.getNumApplications());
|
||||
Assert.assertEquals(1, queueB.getNumApplications());
|
||||
Assert.assertEquals(1, queueB1.getNumApplications());
|
||||
|
||||
|
||||
QueueMetrics queueB1M = queueB1.getMetrics();
|
||||
QueueMetrics queueBM = queueB.getMetrics();
|
||||
// Verify allocation MB of current state
|
||||
Assert.assertEquals(0, queueA1M.getAllocatedMB());
|
||||
Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
|
||||
Assert.assertEquals(1024, queueB1M.getAllocatedMB());
|
||||
Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores());
|
||||
|
||||
// remove attempt
|
||||
sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
|
||||
RMAppAttemptState.FINISHED, false));
|
||||
|
||||
Assert.assertEquals(0, queueA1M.getAllocatedMB());
|
||||
Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
|
||||
Assert.assertEquals(0, queueB1M.getAllocatedMB());
|
||||
Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores());
|
||||
|
||||
verifyQueueMetrics(queueB1M);
|
||||
verifyQueueMetrics(queueBM);
|
||||
// Verify queue A1 metrics
|
||||
verifyQueueMetrics(queueA1M);
|
||||
rm.close();
|
||||
}
|
||||
|
||||
private void verifyQueueMetrics(QueueMetrics queue) {
|
||||
Assert.assertEquals(0, queue.getPendingMB());
|
||||
Assert.assertEquals(0, queue.getActiveUsers());
|
||||
Assert.assertEquals(0, queue.getActiveApps());
|
||||
Assert.assertEquals(0, queue.getAppsPending());
|
||||
Assert.assertEquals(0, queue.getAppsRunning());
|
||||
Assert.assertEquals(0, queue.getAllocatedMB());
|
||||
Assert.assertEquals(0, queue.getAllocatedVirtualCores());
|
||||
}
|
||||
|
||||
private Configuration getCapacityConfiguration(Configuration config) {
|
||||
CapacitySchedulerConfiguration conf =
|
||||
new CapacitySchedulerConfiguration(config);
|
||||
|
||||
// Define top-level queues
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"a", "b"});
|
||||
conf.setCapacity(A, 50);
|
||||
conf.setCapacity(B, 50);
|
||||
conf.setQueues(A, new String[] {"a1", "a2"});
|
||||
conf.setCapacity(A1, 50);
|
||||
conf.setCapacity(A2, 50);
|
||||
conf.setQueues(B, new String[] {"b1"});
|
||||
conf.setCapacity(B1, 100);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillAllAppsInQueue() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
|
|
Loading…
Reference in New Issue