YARN-5932. Retrospect moveApplicationToQueue in align with YARN-5611. Contributed by Sunil G.

This commit is contained in:
Rohith Sharma K S 2016-12-07 10:39:14 +05:30
parent a7288da595
commit 563480dccd
17 changed files with 274 additions and 133 deletions

View File

@ -152,7 +152,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@ -174,8 +173,6 @@
import org.apache.hadoop.yarn.util.UTCClock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -1191,23 +1188,18 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
}
// Moves only allowed when app is in a state that means it is tracked by
// the scheduler
if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED,
RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED,
RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED)
.contains(application.getState())) {
// the scheduler. Introducing SUBMITTED state also to this list as there
// could be a corner scenario that app may not be in Scheduler in SUBMITTED
// state.
if (!ACTIVE_APP_STATES.contains(application.getState())) {
String msg = "App in " + application.getState() + " state cannot be moved.";
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg);
throw new YarnException(msg);
}
SettableFuture<Object> future = SettableFuture.create();
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppMoveEvent(applicationId, request.getTargetQueue(), future));
try {
Futures.get(future, YarnException.class);
this.rmAppManager.moveApplicationAcrossQueue(applicationId, request.getTargetQueue());
} catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",

View File

@ -498,17 +498,26 @@ public void handle(RMAppManagerEvent event) {
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
switch(event.getType()) {
case APP_COMPLETED:
{
finishApplication(applicationId);
logApplicationSummary(applicationId);
checkAppNumCompletedLimit();
}
switch (event.getType()) {
case APP_COMPLETED :
finishApplication(applicationId);
logApplicationSummary(applicationId);
checkAppNumCompletedLimit();
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
case APP_MOVE :
// moveAllApps from scheduler will fire this event for each of
// those applications which needed to be moved to a new queue.
// Use the standard move application api to do the same.
try {
moveApplicationAcrossQueue(applicationId,
event.getTargetQueueForMove());
} catch (YarnException e) {
LOG.warn("Move Application has failed: " + e.getMessage());
}
break;
default :
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
// transaction method.
@ -587,4 +596,87 @@ public void updateApplicationPriority(ApplicationId applicationId,
rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
}
/**
* moveToQueue will invoke scheduler api to perform move queue operation.
*
* @param applicationId
* Application Id.
* @param targetQueue
* Target queue to which this app has to be moved.
* @throws YarnException
* Handle exceptions.
*/
public void moveApplicationAcrossQueue(ApplicationId applicationId, String targetQueue)
throws YarnException {
RMApp app = this.rmContext.getRMApps().get(applicationId);
// Capacity scheduler will directly follow below approach.
// 1. Do a pre-validate check to ensure that changes are fine.
// 2. Update this information to state-store
// 3. Perform real move operation and update in-memory data structures.
synchronized (applicationId) {
if (app.isAppInCompletedStates()) {
return;
}
String sourceQueue = app.getQueue();
// 1. pre-validate move application request to check for any access
// violations or other errors. If there are any violations, YarnException
// will be thrown.
rmContext.getScheduler().preValidateMoveApplication(applicationId,
targetQueue);
// 2. Update to state store with new queue and throw exception is failed.
updateAppDataToStateStore(targetQueue, app, false);
// 3. Perform the real move application
String queue = "";
try {
queue = rmContext.getScheduler().moveApplication(applicationId,
targetQueue);
} catch (YarnException e) {
// Revert to source queue since in-memory move has failed. Chances
// of this is very rare as we have already done the pre-validation.
updateAppDataToStateStore(sourceQueue, app, true);
throw e;
}
// update in-memory
if (queue != null && !queue.isEmpty()) {
app.setQueue(queue);
}
}
rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
}
private void updateAppDataToStateStore(String queue, RMApp app,
boolean toSuppressException) throws YarnException {
// Create a future object to capture exceptions from StateStore.
SettableFuture<Object> future = SettableFuture.create();
// Update new queue in Submission Context to update to StateStore.
app.getApplicationSubmissionContext().setQueue(queue);
ApplicationStateData appState = ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser(),
app.getCallerContext());
appState.setApplicationTimeouts(app.getApplicationTimeouts());
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false, future);
try {
Futures.get(future, YarnException.class);
} catch (YarnException ex) {
if (!toSuppressException) {
throw ex;
}
LOG.error("Statestore update failed for move application '"
+ app.getApplicationId() + "' to queue '" + queue
+ "' with below exception:" + ex.getMessage());
}
}
}

View File

@ -24,13 +24,24 @@
public class RMAppManagerEvent extends AbstractEvent<RMAppManagerEventType> {
private final ApplicationId appId;
private final String targetQueueForMove;
public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) {
this(appId, "", type);
}
public RMAppManagerEvent(ApplicationId appId, String targetQueueForMove,
RMAppManagerEventType type) {
super(type);
this.appId = appId;
this.targetQueueForMove = targetQueueForMove;
}
public ApplicationId getApplicationId() {
return this.appId;
}
public String getTargetQueueForMove() {
return this.targetQueueForMove;
}
}

View File

@ -19,5 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
public enum RMAppManagerEventType {
APP_COMPLETED
APP_COMPLETED,
APP_MOVE
}

View File

@ -23,7 +23,6 @@ public enum RMAppEventType {
START,
RECOVER,
KILL,
MOVE, // Move app to a new queue
// Source: Scheduler and RMAppManager
APP_REJECTED,

View File

@ -71,7 +71,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@ -247,14 +246,10 @@ RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.MOVE, new RMAppMoveTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
@ -271,8 +266,6 @@ RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
@ -300,8 +293,6 @@ RMAppEventType.KILL, new KillAttemptTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
@ -338,7 +329,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
RMAppEventType.APP_NEW_SAVED))
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@ -353,7 +344,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
EnumSet.of(RMAppEventType.NODE_UPDATE,
// ignore Kill/Move as we have already saved the final Finished state
// in state store.
RMAppEventType.KILL, RMAppEventType.MOVE))
RMAppEventType.KILL))
// Transitions from KILLING state
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
@ -383,7 +374,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_REGISTERED,
RMAppEventType.APP_UPDATE_SAVED,
RMAppEventType.KILL, RMAppEventType.MOVE))
RMAppEventType.KILL))
// Transitions from FINISHED state
// ignorable transitions
@ -395,7 +386,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
RMAppEventType.KILL, RMAppEventType.MOVE))
RMAppEventType.KILL))
// Transitions from FAILED state
// ignorable transitions
@ -403,8 +394,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
RMAppEventType.MOVE))
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
// Transitions from KILLED state
// ignorable transitions
@ -417,7 +407,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
RMAppEventType.NODE_UPDATE))
.installTopology();
@ -1077,32 +1067,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
/**
* Move an app to a new queue.
* This transition must set the result on the Future in the RMAppMoveEvent,
* either as an exception for failure or null for success, or the client will
* be left waiting forever.
*/
private static final class RMAppMoveTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
try {
app.queue = app.scheduler.moveApplication(app.applicationId,
moveEvent.getTargetQueue());
} catch (YarnException ex) {
moveEvent.getResult().setException(ex);
return;
}
app.rmContext.getSystemMetricsPublisher().appUpdated(app,
app.systemClock.getTime());
// TODO: Write out change to state store (YARN-1558)
// Also take care of RM failover
moveEvent.getResult().set(null);
}
}
// synchronously recover attempt to ensure any incoming external events
// to be processed after the attempt processes the recover event.
private void recoverAppAttempts() {

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.google.common.util.concurrent.SettableFuture;
public class RMAppMoveEvent extends RMAppEvent {
private String targetQueue;
private SettableFuture<Object> result;
public RMAppMoveEvent(ApplicationId id, String newQueue,
SettableFuture<Object> resultFuture) {
super(id, RMAppEventType.MOVE);
this.targetQueue = newQueue;
this.result = resultFuture;
}
public String getTargetQueue() {
return targetQueue;
}
public SettableFuture<Object> getResult() {
return result;
}
}

View File

@ -57,6 +57,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -64,7 +66,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -360,6 +361,13 @@ public String moveApplication(ApplicationId appId, String newQueue)
+ " does not support moving apps between queues");
}
@Override
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support pre-validation of moving apps between queues");
}
public void removeQueue(String queueName) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support removing queues");
@ -675,10 +683,10 @@ public void moveAllApps(String sourceQueue, String destQueue)
throw new YarnException(errMsg);
}
// generate move events for each pending/running app
for (ApplicationAttemptId app : apps) {
SettableFuture<Object> future = SettableFuture.create();
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
for (ApplicationAttemptId appAttemptId : apps) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppManagerEvent(appAttemptId.getApplicationId(),
destQueue, RMAppManagerEventType.APP_MOVE));
}
} finally {
writeLock.unlock();

View File

@ -229,6 +229,17 @@ boolean checkAccess(UserGroupInformation callerUGI,
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException;
/**
*
* @param appId Application ID
* @param newQueue Target QueueName
* @throws YarnException if the pre-validation for move cannot be carried out
*/
@LimitedPrivate("yarn")
@Evolving
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException;
/**
* Completely drain sourceQueue of applications, by moving all of them to
* destQueue.

View File

@ -32,8 +32,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -67,6 +69,7 @@
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
volatile CSQueue parent;
final String queueName;
@ -837,4 +840,10 @@ public boolean accept(Resource cluster,
return true;
}
@Override
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
// Dummy implementation
}
}

View File

@ -362,4 +362,14 @@ void apply(Resource cluster,
* @return readLock of corresponding queue.
*/
public ReentrantReadWriteLock.ReadLock getReadLock();
/**
* Validate submitApplication api so that moveApplication do a pre-check.
* @param applicationId Application ID
* @param userName User Name
* @param queue Queue Name
* @throws AccessControlException if any acl violation is there.
*/
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException;
}

View File

@ -2049,9 +2049,8 @@ public String moveApplication(ApplicationId appId,
sourceQueueName);
String destQueueName = handleMoveToPlanQueue(targetQueueName);
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
// Validation check - ACLs, submission limits for user & queue
String user = app.getUser();
checkQueuePartition(app, dest);
try {
dest.submitApplication(appId, user, destQueueName);
} catch (AccessControlException e) {
@ -2079,6 +2078,30 @@ public String moveApplication(ApplicationId appId,
}
}
@Override
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException {
try {
writeLock.lock();
FiCaSchedulerApp app = getApplicationAttempt(
ApplicationAttemptId.newInstance(appId, 0));
String sourceQueueName = app.getQueue().getQueueName();
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
String destQueueName = handleMoveToPlanQueue(newQueue);
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
// Validation check - ACLs, submission limits for user & queue
String user = app.getUser();
checkQueuePartition(app, dest);
try {
dest.validateSubmitApplication(appId, user, destQueueName);
} catch (AccessControlException e) {
throw new YarnException(e);
}
} finally {
writeLock.unlock();
}
}
/**
* Check application can be moved to queue with labels enabled. All labels in
* application life time will be checked

View File

@ -564,6 +564,21 @@ public void submitApplicationAttempt(FiCaSchedulerApp application,
public void submitApplication(ApplicationId applicationId, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
validateSubmitApplication(applicationId, userName, queue);
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
throw ace;
}
}
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
try {
writeLock.lock();
// Check if the queue is accepting jobs
@ -598,15 +613,13 @@ public void submitApplication(ApplicationId applicationId, String userName,
writeLock.unlock();
}
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
getParent().validateSubmitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
throw ace;
}
}
public Resource getAMResourceLimit() {

View File

@ -340,16 +340,7 @@ public void submitApplication(ApplicationId applicationId, String user,
try {
writeLock.lock();
// Sanity check
if (queue.equals(queueName)) {
throw new AccessControlException(
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
}
validateSubmitApplication(applicationId, user, queue);
addApplication(applicationId, user);
} finally {
@ -369,6 +360,24 @@ public void submitApplication(ApplicationId applicationId, String user,
}
}
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
try {
writeLock.lock();
if (queue.equals(queueName)) {
throw new AccessControlException(
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
}
} finally {
writeLock.unlock();
}
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,

View File

@ -1544,7 +1544,41 @@ public String moveApplication(ApplicationId appId,
writeLock.unlock();
}
}
@Override
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
throws YarnException {
try {
writeLock.lock();
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
if (app == null) {
throw new YarnException("App to be moved " + appId + " not found.");
}
FSAppAttempt attempt = app.getCurrentAppAttempt();
// To serialize with FairScheduler#allocate, synchronize on app attempt
try {
attempt.getWriteLock().lock();
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
String destQueueName = handleMoveToPlanQueue(newQueue);
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
if (targetQueue == null) {
throw new YarnException("Target queue " + newQueue
+ " not found or is not a leaf queue.");
}
if (oldQueue.isRunnableApp(attempt)) {
verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
}
} finally {
attempt.getWriteLock().unlock();
}
} finally {
writeLock.unlock();
}
}
private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app,
FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
String queueName = targetQueue.getQueueName();

View File

@ -87,10 +87,10 @@ public void testMoveRejectedByScheduler() throws Exception {
application.getApplicationId(), "newqueue"));
fail("Should have hit exception");
} catch (YarnException ex) {
assertEquals("Move not supported", ex.getCause().getMessage());
assertEquals("Move not supported", ex.getMessage());
}
}
@Test (timeout = 10000)
public void testMoveTooLate() throws Exception {
// Submit application
@ -178,5 +178,13 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
return acl != QueueACL.ADMINISTER_QUEUE;
}
@Override
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
throws YarnException {
if (failMove) {
throw new YarnException("Move not supported");
}
}
}
}

View File

@ -457,6 +457,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
CapacityScheduler scheduler =
((CapacityScheduler) rm.getResourceScheduler());
try {
scheduler.preValidateMoveApplication(app1.getApplicationId(), "a2");
scheduler.moveApplication(app1.getApplicationId(), "a2");
fail("Should throw exception since target queue doesnt have "
+ "required labels");