YARN-1504. RM changes for moving apps between queues (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1567482 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
41b2227119
commit
673ba4c00e
|
@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -866,5 +867,11 @@ public class ResourceSchedulerWrapper implements ResourceScheduler,
|
|||
public RMContainer getRMContainer(ContainerId containerId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String moveApplication(ApplicationId appId, String newQueue)
|
||||
throws YarnException {
|
||||
return scheduler.moveApplication(appId, newQueue);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -99,6 +99,8 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1498. Common scheduler changes for moving apps between queues (Sandy
|
||||
Ryza)
|
||||
|
||||
YARN-1504. RM changes for moving apps between queues (Sandy Ryza)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
|
||||
|
|
|
@ -96,6 +96,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
|
@ -722,10 +724,74 @@ public class ClientRMService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest request) throws YarnException {
|
||||
throw new UnsupportedOperationException("Move not yet supported");
|
||||
ApplicationId applicationId = request.getApplicationId();
|
||||
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST,
|
||||
"UNKNOWN", "ClientRMService" , "Error getting UGI",
|
||||
applicationId);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
|
||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||
if (application == null) {
|
||||
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
|
||||
"Trying to move an absent application", applicationId);
|
||||
throw new ApplicationNotFoundException("Trying to move an absent"
|
||||
+ " application " + applicationId);
|
||||
}
|
||||
|
||||
if (!checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.MODIFY_APP, application)) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.MOVE_APP_REQUEST,
|
||||
"User doesn't have permissions to "
|
||||
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
||||
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||
}
|
||||
|
||||
// Moves only allowed when app is in a state that means it is tracked by
|
||||
// the scheduler
|
||||
if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED,
|
||||
RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED,
|
||||
RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED)
|
||||
.contains(application.getState())) {
|
||||
String msg = "App in " + application.getState() + " state cannot be moved.";
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
|
||||
SettableFuture<Object> future = SettableFuture.create();
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppMoveEvent(applicationId, request.getTargetQueue(), future));
|
||||
|
||||
try {
|
||||
Futures.get(future, YarnException.class);
|
||||
} catch (YarnException ex) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
|
||||
ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
|
||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||
AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
|
||||
MoveApplicationAcrossQueuesResponse response = recordFactory
|
||||
.newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
|
||||
return response;
|
||||
}
|
||||
|
||||
private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
|
||||
|
|
|
@ -45,6 +45,7 @@ public class RMAuditLogger {
|
|||
|
||||
public static final String KILL_APP_REQUEST = "Kill Application Request";
|
||||
public static final String SUBMIT_APP_REQUEST = "Submit Application Request";
|
||||
public static final String MOVE_APP_REQUEST = "Move Application Request";
|
||||
public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
|
||||
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
||||
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
||||
|
|
|
@ -23,6 +23,7 @@ public enum RMAppEventType {
|
|||
START,
|
||||
RECOVER,
|
||||
KILL,
|
||||
MOVE, // Move app to a new queue
|
||||
|
||||
// Source: Scheduler and RMAppManager
|
||||
APP_REJECTED,
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
|
@ -167,6 +168,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// Transitions from SUBMITTED state
|
||||
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
||||
RMAppEventType.MOVE, new RMAppMoveTransition())
|
||||
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
|
||||
RMAppEventType.APP_REJECTED,
|
||||
new FinalSavingTransition(
|
||||
|
@ -182,6 +185,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// Transitions from ACCEPTED state
|
||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||
RMAppEventType.MOVE, new RMAppMoveTransition())
|
||||
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
|
||||
RMAppEventType.ATTEMPT_REGISTERED)
|
||||
.addTransition(RMAppState.ACCEPTED,
|
||||
|
@ -203,6 +208,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// Transitions from RUNNING state
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||
RMAppEventType.MOVE, new RMAppMoveTransition())
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
|
||||
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||
new FinalSavingTransition(
|
||||
|
@ -692,6 +699,31 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Move an app to a new queue.
|
||||
* This transition must set the result on the Future in the RMAppMoveEvent,
|
||||
* either as an exception for failure or null for success, or the client will
|
||||
* be left waiting forever.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final class RMAppMoveTransition extends RMAppTransition {
|
||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
|
||||
try {
|
||||
app.queue = app.scheduler.moveApplication(app.applicationId,
|
||||
moveEvent.getTargetQueue());
|
||||
} catch (YarnException ex) {
|
||||
moveEvent.getResult().setException(ex);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: Write out change to state store (YARN-1558)
|
||||
|
||||
moveEvent.getResult().set(null);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final class RMAppRecoveredTransition implements
|
||||
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
||||
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
public class RMAppMoveEvent extends RMAppEvent {
|
||||
private String targetQueue;
|
||||
private SettableFuture<Object> result;
|
||||
|
||||
public RMAppMoveEvent(ApplicationId id, String newQueue,
|
||||
SettableFuture<Object> resultFuture) {
|
||||
super(id, RMAppEventType.MOVE);
|
||||
this.targetQueue = newQueue;
|
||||
this.result = resultFuture;
|
||||
}
|
||||
|
||||
public String getTargetQueue() {
|
||||
return targetQueue;
|
||||
}
|
||||
|
||||
public SettableFuture<Object> getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
|
@ -27,11 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
public class AbstractYarnScheduler {
|
||||
public abstract class AbstractYarnScheduler implements ResourceScheduler {
|
||||
|
||||
protected RMContext rmContext;
|
||||
protected Map<ApplicationId, SchedulerApplication> applications;
|
||||
|
@ -61,4 +62,11 @@ public class AbstractYarnScheduler {
|
|||
public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
|
||||
return applications;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String moveApplication(ApplicationId appId, String newQueue)
|
||||
throws YarnException {
|
||||
throw new YarnException(getClass().getSimpleName()
|
||||
+ " does not support moving apps between queues");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
|
||||
/**
|
||||
|
@ -180,4 +182,16 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
@LimitedPrivate("yarn")
|
||||
@Unstable
|
||||
public RMContainer getRMContainer(ContainerId containerId);
|
||||
|
||||
/**
|
||||
* Moves the given application to the given queue
|
||||
* @param appId
|
||||
* @param newQueue
|
||||
* @return the name of the queue the application was placed into
|
||||
* @throws YarnException if the move cannot be carried out
|
||||
*/
|
||||
@LimitedPrivate("yarn")
|
||||
@Evolving
|
||||
public String moveApplication(ApplicationId appId, String newQueue)
|
||||
throws YarnException;
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
|
@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
|
@ -121,8 +121,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@LimitedPrivate("yarn")
|
||||
@Unstable
|
||||
@SuppressWarnings("unchecked")
|
||||
public class FairScheduler extends AbstractYarnScheduler implements
|
||||
ResourceScheduler {
|
||||
public class FairScheduler extends AbstractYarnScheduler {
|
||||
private boolean initialized;
|
||||
private FairSchedulerConfiguration conf;
|
||||
private Resource minimumAllocation;
|
||||
|
|
|
@ -77,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
|
@ -106,7 +105,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@Evolving
|
||||
@SuppressWarnings("unchecked")
|
||||
public class FifoScheduler extends AbstractYarnScheduler implements
|
||||
ResourceScheduler, Configurable {
|
||||
Configurable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
|
@ -236,6 +237,20 @@ public class TestClientRMService {
|
|||
"application " + request.getApplicationId());
|
||||
}
|
||||
}
|
||||
|
||||
@Test (expected = ApplicationNotFoundException.class)
|
||||
public void testMoveAbsentApplication() throws YarnException {
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
when(rmContext.getRMApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, RMApp>());
|
||||
ClientRMService rmService = new ClientRMService(rmContext, null, null,
|
||||
null, null, null);
|
||||
ApplicationId applicationId =
|
||||
BuilderUtils.newApplicationId(System.currentTimeMillis(), 0);
|
||||
MoveApplicationAcrossQueuesRequest request =
|
||||
MoveApplicationAcrossQueuesRequest.newInstance(applicationId, "newqueue");
|
||||
rmService.moveApplicationAcrossQueues(request);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetQueueInfo() throws Exception {
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.security.AccessControlException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestMoveApplication {
|
||||
private ResourceManager resourceManager = null;
|
||||
private static boolean failMove;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
|
||||
FifoSchedulerWithMove.class);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
resourceManager = new ResourceManager();
|
||||
resourceManager.init(conf);
|
||||
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
|
||||
resourceManager.getRMNMTokenSecretManager().rollMasterKey();
|
||||
resourceManager.start();
|
||||
failMove = false;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
resourceManager.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveRejectedByScheduler() throws Exception {
|
||||
failMove = true;
|
||||
|
||||
// Submit application
|
||||
Application application = new Application("user1", resourceManager);
|
||||
application.submit();
|
||||
|
||||
ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||
try {
|
||||
// FIFO scheduler does not support moves
|
||||
clientRMService.moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest.newInstance(
|
||||
application.getApplicationId(), "newqueue"));
|
||||
fail("Should have hit exception");
|
||||
} catch (YarnException ex) {
|
||||
assertEquals("Move not supported", ex.getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testMoveTooLate() throws Exception {
|
||||
// Submit application
|
||||
Application application = new Application("user1", resourceManager);
|
||||
ApplicationId appId = application.getApplicationId();
|
||||
application.submit();
|
||||
|
||||
ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||
// Kill the application
|
||||
clientRMService.forceKillApplication(
|
||||
KillApplicationRequest.newInstance(appId));
|
||||
RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
|
||||
// wait until it's dead
|
||||
while (rmApp.getState() != RMAppState.KILLED) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
try {
|
||||
clientRMService.moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
|
||||
fail("Should have hit exception");
|
||||
} catch (YarnException ex) {
|
||||
assertEquals(YarnException.class,
|
||||
ex.getClass());
|
||||
assertEquals("App in KILLED state cannot be moved.", ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
public void testMoveSuccessful() throws Exception {
|
||||
// Submit application
|
||||
Application application = new Application("user1", resourceManager);
|
||||
ApplicationId appId = application.getApplicationId();
|
||||
application.submit();
|
||||
|
||||
// Wait for app to be accepted
|
||||
RMApp app = resourceManager.rmContext.getRMApps().get(appId);
|
||||
while (app.getState() != RMAppState.ACCEPTED) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||
// FIFO scheduler does not support moves
|
||||
clientRMService.moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
|
||||
|
||||
RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
|
||||
assertEquals("newqueue", rmApp.getQueue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveRejectedByPermissions() throws Exception {
|
||||
failMove = true;
|
||||
|
||||
// Submit application
|
||||
final Application application = new Application("user1", resourceManager);
|
||||
application.submit();
|
||||
|
||||
final ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||
try {
|
||||
UserGroupInformation.createRemoteUser("otheruser").doAs(
|
||||
new PrivilegedExceptionAction<MoveApplicationAcrossQueuesResponse>() {
|
||||
@Override
|
||||
public MoveApplicationAcrossQueuesResponse run() throws Exception {
|
||||
return clientRMService.moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest.newInstance(
|
||||
application.getApplicationId(), "newqueue"));
|
||||
}
|
||||
|
||||
});
|
||||
fail("Should have hit exception");
|
||||
} catch (Exception ex) {
|
||||
assertEquals(AccessControlException.class, ex.getCause().getCause().getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public static class FifoSchedulerWithMove extends FifoScheduler {
|
||||
@Override
|
||||
public String moveApplication(ApplicationId appId, String newQueue)
|
||||
throws YarnException {
|
||||
if (failMove) {
|
||||
throw new YarnException("Move not supported");
|
||||
}
|
||||
return newQueue;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
||||
QueueACL acl, String queueName) {
|
||||
return acl != QueueACL.ADMINISTER_QUEUE;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue