YARN-5932. Retrospect moveApplicationToQueue in align with YARN-5611. Contributed by Sunil G.

This commit is contained in:
Rohith Sharma K S 2016-12-07 22:43:48 +05:30
parent d2656dc5a6
commit 602c998443
17 changed files with 276 additions and 134 deletions

View File

@ -152,7 +152,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSyst
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@ -174,8 +173,6 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.UTCClock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
/**
* The client interface to the Resource Manager. This module handles all the rpc
@ -1164,25 +1161,21 @@ public class ClientRMService extends AbstractService implements
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
// Moves only allowed when app is in a state that means it is tracked by
// the scheduler
if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED,
RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED,
RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED)
.contains(application.getState())) {
// the scheduler. Introducing SUBMITTED state also to this list as there
// could be a corner scenario that app may not be in Scheduler in SUBMITTED
// state.
if (!ACTIVE_APP_STATES.contains(application.getState())) {
String msg = "App in " + application.getState() + " state cannot be moved.";
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg);
throw new YarnException(msg);
}
SettableFuture<Object> future = SettableFuture.create();
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppMoveEvent(applicationId, request.getTargetQueue(), future));
try {
Futures.get(future, YarnException.class);
this.rmAppManager.moveApplicationAcrossQueue(applicationId,
request.getTargetQueue());
} catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",

View File

@ -490,17 +490,26 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
switch(event.getType()) {
case APP_COMPLETED:
{
finishApplication(applicationId);
logApplicationSummary(applicationId);
checkAppNumCompletedLimit();
}
switch (event.getType()) {
case APP_COMPLETED :
finishApplication(applicationId);
logApplicationSummary(applicationId);
checkAppNumCompletedLimit();
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
case APP_MOVE :
// moveAllApps from scheduler will fire this event for each of
// those applications which needed to be moved to a new queue.
// Use the standard move application api to do the same.
try {
moveApplicationAcrossQueue(applicationId,
event.getTargetQueueForMove());
} catch (YarnException e) {
LOG.warn("Move Application has failed: " + e.getMessage());
}
break;
default :
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
// transaction method.
@ -579,4 +588,87 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
}
/**
* moveToQueue will invoke scheduler api to perform move queue operation.
*
* @param applicationId
* Application Id.
* @param targetQueue
* Target queue to which this app has to be moved.
* @throws YarnException
* Handle exceptions.
*/
public void moveApplicationAcrossQueue(ApplicationId applicationId, String targetQueue)
throws YarnException {
RMApp app = this.rmContext.getRMApps().get(applicationId);
// Capacity scheduler will directly follow below approach.
// 1. Do a pre-validate check to ensure that changes are fine.
// 2. Update this information to state-store
// 3. Perform real move operation and update in-memory data structures.
synchronized (applicationId) {
if (app.isAppInCompletedStates()) {
return;
}
String sourceQueue = app.getQueue();
// 1. pre-validate move application request to check for any access
// violations or other errors. If there are any violations, YarnException
// will be thrown.
rmContext.getScheduler().preValidateMoveApplication(applicationId,
targetQueue);
// 2. Update to state store with new queue and throw exception is failed.
updateAppDataToStateStore(targetQueue, app, false);
// 3. Perform the real move application
String queue = "";
try {
queue = rmContext.getScheduler().moveApplication(applicationId,
targetQueue);
} catch (YarnException e) {
// Revert to source queue since in-memory move has failed. Chances
// of this is very rare as we have already done the pre-validation.
updateAppDataToStateStore(sourceQueue, app, true);
throw e;
}
// update in-memory
if (queue != null && !queue.isEmpty()) {
app.setQueue(queue);
}
}
rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
}
private void updateAppDataToStateStore(String queue, RMApp app,
boolean toSuppressException) throws YarnException {
// Create a future object to capture exceptions from StateStore.
SettableFuture<Object> future = SettableFuture.create();
// Update new queue in Submission Context to update to StateStore.
app.getApplicationSubmissionContext().setQueue(queue);
ApplicationStateData appState = ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser(),
app.getCallerContext());
appState.setApplicationTimeouts(app.getApplicationTimeouts());
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false, future);
try {
Futures.get(future, YarnException.class);
} catch (YarnException ex) {
if (!toSuppressException) {
throw ex;
}
LOG.error("Statestore update failed for move application '"
+ app.getApplicationId() + "' to queue '" + queue
+ "' with below exception:" + ex.getMessage());
}
}
}

View File

@ -24,13 +24,24 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
public class RMAppManagerEvent extends AbstractEvent<RMAppManagerEventType> {
private final ApplicationId appId;
private final String targetQueueForMove;
public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) {
this(appId, "", type);
}
public RMAppManagerEvent(ApplicationId appId, String targetQueueForMove,
RMAppManagerEventType type) {
super(type);
this.appId = appId;
this.targetQueueForMove = targetQueueForMove;
}
public ApplicationId getApplicationId() {
return this.appId;
}
public String getTargetQueueForMove() {
return this.targetQueueForMove;
}
}

View File

@ -19,5 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
public enum RMAppManagerEventType {
APP_COMPLETED
APP_COMPLETED,
APP_MOVE
}

View File

@ -23,7 +23,6 @@ public enum RMAppEventType {
START,
RECOVER,
KILL,
MOVE, // Move app to a new queue
// Source: Scheduler and RMAppManager
APP_REJECTED,

View File

@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@ -241,14 +240,10 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.MOVE, new RMAppMoveTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@ -263,8 +258,6 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
YarnApplicationState.RUNNING))
@ -290,8 +283,6 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@ -324,7 +315,7 @@ public class RMAppImpl implements RMApp, Recoverable {
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
RMAppEventType.APP_NEW_SAVED))
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@ -337,7 +328,7 @@ public class RMAppImpl implements RMApp, Recoverable {
EnumSet.of(RMAppEventType.NODE_UPDATE,
// ignore Kill/Move as we have already saved the final Finished state
// in state store.
RMAppEventType.KILL, RMAppEventType.MOVE))
RMAppEventType.KILL))
// Transitions from KILLING state
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
@ -365,7 +356,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_REGISTERED,
RMAppEventType.APP_UPDATE_SAVED,
RMAppEventType.KILL, RMAppEventType.MOVE))
RMAppEventType.KILL))
// Transitions from FINISHED state
// ignorable transitions
@ -377,7 +368,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
RMAppEventType.KILL, RMAppEventType.MOVE))
RMAppEventType.KILL))
// Transitions from FAILED state
// ignorable transitions
@ -385,8 +376,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
RMAppEventType.MOVE))
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
// Transitions from KILLED state
// ignorable transitions
@ -399,7 +389,7 @@ public class RMAppImpl implements RMApp, Recoverable {
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
RMAppEventType.NODE_UPDATE))
.installTopology();
@ -992,32 +982,6 @@ public class RMAppImpl implements RMApp, Recoverable {
};
}
/**
* Move an app to a new queue.
* This transition must set the result on the Future in the RMAppMoveEvent,
* either as an exception for failure or null for success, or the client will
* be left waiting forever.
*/
private static final class RMAppMoveTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
try {
app.queue = app.scheduler.moveApplication(app.applicationId,
moveEvent.getTargetQueue());
} catch (YarnException ex) {
moveEvent.getResult().setException(ex);
return;
}
app.rmContext.getSystemMetricsPublisher().appUpdated(app,
app.systemClock.getTime());
// TODO: Write out change to state store (YARN-1558)
// Also take care of RM failover
moveEvent.getResult().set(null);
}
}
// synchronously recover attempt to ensure any incoming external events
// to be processed after the attempt processes the recover event.
private void recoverAppAttempts() {

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.google.common.util.concurrent.SettableFuture;
public class RMAppMoveEvent extends RMAppEvent {
private String targetQueue;
private SettableFuture<Object> result;
public RMAppMoveEvent(ApplicationId id, String newQueue,
SettableFuture<Object> resultFuture) {
super(id, RMAppEventType.MOVE);
this.targetQueue = newQueue;
this.result = resultFuture;
}
public String getTargetQueue() {
return targetQueue;
}
public SettableFuture<Object> getResult() {
return result;
}
}

View File

@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -63,7 +65,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -359,6 +360,13 @@ public abstract class AbstractYarnScheduler
+ " does not support moving apps between queues");
}
@Override
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support pre-validation of moving apps between queues");
}
public void removeQueue(String queueName) throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support removing queues");
@ -672,10 +680,10 @@ public abstract class AbstractYarnScheduler
throw new YarnException(errMsg);
}
// generate move events for each pending/running app
for (ApplicationAttemptId app : apps) {
SettableFuture<Object> future = SettableFuture.create();
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
for (ApplicationAttemptId appAttemptId : apps) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppManagerEvent(appAttemptId.getApplicationId(),
destQueue, RMAppManagerEventType.APP_MOVE));
}
} finally {
writeLock.unlock();

View File

@ -229,6 +229,17 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException;
/**
*
* @param appId Application ID
* @param newQueue Target QueueName
* @throws YarnException if the pre-validation for move cannot be carried out
*/
@LimitedPrivate("yarn")
@Evolving
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException;
/**
* Completely drain sourceQueue of applications, by moving all of them to
* destQueue.

View File

@ -32,8 +32,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
volatile CSQueue parent;
final String queueName;
@ -837,4 +840,10 @@ public abstract class AbstractCSQueue implements CSQueue {
return true;
}
@Override
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
// Dummy implementation
}
}

View File

@ -362,4 +362,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* @return readLock of corresponding queue.
*/
public ReentrantReadWriteLock.ReadLock getReadLock();
/**
* Validate submitApplication api so that moveApplication do a pre-check.
* @param applicationId Application ID
* @param userName User Name
* @param queue Queue Name
* @throws AccessControlException if any acl violation is there.
*/
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException;
}

View File

@ -2050,9 +2050,8 @@ public class CapacityScheduler extends
sourceQueueName);
String destQueueName = handleMoveToPlanQueue(targetQueueName);
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
// Validation check - ACLs, submission limits for user & queue
String user = app.getUser();
checkQueuePartition(app, dest);
try {
dest.submitApplication(appId, user, destQueueName);
} catch (AccessControlException e) {
@ -2080,6 +2079,30 @@ public class CapacityScheduler extends
}
}
@Override
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
throws YarnException {
try {
writeLock.lock();
FiCaSchedulerApp app = getApplicationAttempt(
ApplicationAttemptId.newInstance(appId, 0));
String sourceQueueName = app.getQueue().getQueueName();
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
String destQueueName = handleMoveToPlanQueue(newQueue);
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
// Validation check - ACLs, submission limits for user & queue
String user = app.getUser();
checkQueuePartition(app, dest);
try {
dest.validateSubmitApplication(appId, user, destQueueName);
} catch (AccessControlException e) {
throw new YarnException(e);
}
} finally {
writeLock.unlock();
}
}
/*
* Check application can be moved to queue with labels enabled. All labels in
* application life time will be checked

View File

@ -573,6 +573,21 @@ public class LeafQueue extends AbstractCSQueue {
public void submitApplication(ApplicationId applicationId, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
validateSubmitApplication(applicationId, userName, queue);
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
throw ace;
}
}
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
try {
writeLock.lock();
// Check if the queue is accepting jobs
@ -607,15 +622,13 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.unlock();
}
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
getParent().validateSubmitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
throw ace;
}
}
public Resource getAMResourceLimit() {

View File

@ -340,16 +340,7 @@ public class ParentQueue extends AbstractCSQueue {
try {
writeLock.lock();
// Sanity check
if (queue.equals(queueName)) {
throw new AccessControlException(
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
}
validateSubmitApplication(applicationId, user, queue);
addApplication(applicationId, user);
} finally {
@ -369,6 +360,24 @@ public class ParentQueue extends AbstractCSQueue {
}
}
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
try {
writeLock.lock();
if (queue.equals(queueName)) {
throw new AccessControlException(
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
}
} finally {
writeLock.unlock();
}
}
@Override
public void submitApplicationAttempt(FiCaSchedulerApp application,

View File

@ -1696,7 +1696,41 @@ public class FairScheduler extends
writeLock.unlock();
}
}
@Override
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
throws YarnException {
try {
writeLock.lock();
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
if (app == null) {
throw new YarnException("App to be moved " + appId + " not found.");
}
FSAppAttempt attempt = app.getCurrentAppAttempt();
// To serialize with FairScheduler#allocate, synchronize on app attempt
try {
attempt.getWriteLock().lock();
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
String destQueueName = handleMoveToPlanQueue(newQueue);
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
if (targetQueue == null) {
throw new YarnException("Target queue " + newQueue
+ " not found or is not a leaf queue.");
}
if (oldQueue.isRunnableApp(attempt)) {
verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
}
} finally {
attempt.getWriteLock().unlock();
}
} finally {
writeLock.unlock();
}
}
private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app,
FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
String queueName = targetQueue.getQueueName();

View File

@ -87,10 +87,10 @@ public class TestMoveApplication {
application.getApplicationId(), "newqueue"));
fail("Should have hit exception");
} catch (YarnException ex) {
assertEquals("Move not supported", ex.getCause().getMessage());
assertEquals("Move not supported", ex.getMessage());
}
}
@Test (timeout = 10000)
public void testMoveTooLate() throws Exception {
// Submit application
@ -178,5 +178,13 @@ public class TestMoveApplication {
QueueACL acl, String queueName) {
return acl != QueueACL.ADMINISTER_QUEUE;
}
@Override
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
throws YarnException {
if (failMove) {
throw new YarnException("Move not supported");
}
}
}
}

View File

@ -457,6 +457,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
CapacityScheduler scheduler =
((CapacityScheduler) rm.getResourceScheduler());
try {
scheduler.preValidateMoveApplication(app1.getApplicationId(), "a2");
scheduler.moveApplication(app1.getApplicationId(), "a2");
fail("Should throw exception since target queue doesnt have "
+ "required labels");