YARN-599. Refactoring submitApplication in ClientRMService and RMAppManager to separate out various validation checks depending on whether they rely on RM configuration or not. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1477478 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1477479 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-30 05:44:45 +00:00
parent c962ae1a3a
commit e2a1f994a1
7 changed files with 313 additions and 330 deletions

View File

@ -122,6 +122,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-591. Moved RM recovery related records out of public API as they do not
belong there. (vinodkv)
YARN-599. Refactoring submitApplication in ClientRMService and RMAppManager
to separate out various validation checks depending on whether they rely on
RM configuration or not. (Zhijie Shen via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@ -72,7 +71,6 @@ import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -83,15 +81,11 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -266,48 +260,61 @@ public class ClientRMService extends AbstractService implements
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
String user = submissionContext.getAMContainerSpec().getUser();
// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
String user = null;
try {
// Safety
user = UserGroupInformation.getCurrentUser().getShortUserName();
if (rmContext.getRMApps().get(applicationId) != null) {
throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!");
}
// Safety
submissionContext.getAMContainerSpec().setUser(user);
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
}
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq = BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
try {
SchedulerUtils.validateResourceRequest(amReq,
scheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + applicationId, e);
throw RPCUtil.getRemoteException(e);
}
}
// Though duplication will checked again when app is put into rmContext,
// but it is good to fail the invalid submission as early as possible.
if (rmContext.getRMApps().get(applicationId) != null) {
String message = "Application with id " + applicationId +
" is already present! Cannot add a duplicate!";
LOG.warn(message);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
message, "ClientRMService", "Exception in submitting application",
applicationId);
throw RPCUtil.getRemoteException(message);
}
// This needs to be synchronous as the client can query
// immediately following the submission to get the application status.
// So call handle directly and do not send an event.
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));
if (submissionContext.getQueue() == null) {
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), false);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (IOException ie) {
LOG.info("Exception in submitting application", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
} catch (YarnRemoteException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
throw e;
}
SubmitApplicationResponse response = recordFactory

View File

@ -31,8 +31,10 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* This class manages the list of applications for the resource manager.
@ -233,64 +239,77 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
boolean isRecovered) {
boolean isRecovered) throws YarnRemoteException {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
// checked here as they have to be validated whether they are part of new
// submission or just being recovered.
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq = BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
try {
SchedulerUtils.validateResourceRequest(amReq,
scheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + applicationId, e);
throw RPCUtil.getRemoteException(e);
}
}
// Create RMApp
RMApp application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(),
submissionContext.getAMContainerSpec().getUser(),
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime);
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
// influence each other
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
try {
// Sanity checks
if (submissionContext.getQueue() == null) {
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
// Create RMApp
application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(),
submissionContext.getAMContainerSpec().getUser(),
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime);
// Sanity check - duplicate?
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.info(message);
throw RPCUtil.getRemoteException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
// Setup tokens for renewal
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId,parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete()
);
}
// All done, start the RMApp
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
RMAppEventType.START));
}
} catch (IOException ie) {
LOG.info("RMAppManager submit application exception", ie);
if (application != null) {
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we havne't yet informed the
// Scheduler about the existence of the application
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
}
LOG.warn(
"Unable to add the application to the delegation token renewer.",
ie);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we havne't yet informed the
// Scheduler about the existence of the application
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
throw RPCUtil.getRemoteException(ie);
}
// All done, start the RMApp
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
RMAppEventType.START));
}
private Credentials parseCredentials(ApplicationSubmissionContext application)
@ -377,14 +396,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
checkAppNumCompletedLimit();
}
break;
case APP_SUBMIT:
{
ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
submitApplication(submissionContext, submitTime, false);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}

View File

@ -19,6 +19,5 @@
package org.apache.hadoop.yarn.server.resourcemanager;
public enum RMAppManagerEventType {
APP_SUBMIT,
APP_COMPLETED
}

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
private final ApplicationSubmissionContext submissionContext;
private final long submitTime;
public RMAppManagerSubmitEvent(
ApplicationSubmissionContext submissionContext, long submitTime) {
super(submissionContext.getApplicationId(),
RMAppManagerEventType.APP_SUBMIT);
this.submissionContext = submissionContext;
this.submitTime = submitTime;
}
public ApplicationSubmissionContext getSubmissionContext() {
return this.submissionContext;
}
public long getSubmitTime() {
return this.submitTime;
}
}

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@ -31,12 +34,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -46,11 +52,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
@ -163,9 +169,10 @@ public class TestAppManager{
super.setCompletedAppsMax(max);
}
public void submitApplication(
ApplicationSubmissionContext submissionContext) {
super.submitApplication(
submissionContext, System.currentTimeMillis(), false);
ApplicationSubmissionContext submissionContext)
throws YarnRemoteException {
super.submitApplication(submissionContext, System.currentTimeMillis(),
false);
}
}
@ -179,6 +186,40 @@ public class TestAppManager{
}
}
private RMContext rmContext;
private TestRMAppManager appMonitor;
private ApplicationSubmissionContext asContext;
private ApplicationId appId;
@Before
public void setUp() {
long now = System.currentTimeMillis();
rmContext = mockRMContext(1, now - 10);
ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
appMonitor = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
appId = MockApps.newAppID(1);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
asContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
asContext.setApplicationId(appId);
asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
asContext.setResource(mockResource());
setupDispatcher(rmContext, conf);
}
@After
public void tearDown() {
setAppEventType(RMAppEventType.KILL);
((Service)rmContext.getDispatcher()).stop();
}
@Test
public void testRMAppRetireNone() throws Exception {
long now = System.currentTimeMillis();
@ -334,38 +375,10 @@ public class TestAppManager{
@Test
public void testRMAppSubmit() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(0, now - 10);
ResourceScheduler scheduler = new CapacityScheduler();
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
ApplicationId appID = MockApps.newAppID(1);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(appID);
ContainerLaunchContext amContainer = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
context.setAMContainerSpec(amContainer);
setupDispatcher(rmContext, conf);
appMonitor.submitApplication(context);
RMApp app = rmContext.getRMApps().get(appID);
appMonitor.submitApplication(asContext);
RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
Assert.assertEquals("app name doesn't match",
YarnConfiguration.DEFAULT_APPLICATION_NAME,
app.getName());
Assert.assertEquals("app queue doesn't match",
YarnConfiguration.DEFAULT_QUEUE_NAME,
app.getQueue());
Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
// wait for event to be processed
@ -374,9 +387,8 @@ public class TestAppManager{
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
setAppEventType(RMAppEventType.KILL);
((Service)rmContext.getDispatcher()).stop();
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
getAppEventType());
}
@Test (timeout = 30000)
@ -390,10 +402,7 @@ public class TestAppManager{
new int[]{ 1, 1, 1, 1 }};
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(0, now - 10);
ResourceScheduler scheduler = new CapacityScheduler();
ResourceScheduler scheduler = mockResourceScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
ApplicationMasterService masterService =
@ -402,21 +411,12 @@ public class TestAppManager{
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
ContainerLaunchContext amContainer = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
context.setAMContainerSpec(amContainer);
setupDispatcher(rmContext, conf);
ApplicationId appID = MockApps.newAppID(1);
context.setApplicationId(appID);
ApplicationId appID = MockApps.newAppID(i * 4 + j + 1);
asContext.setApplicationId(appID);
if (individualMaxAppAttempts[i][j] != 0) {
context.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
asContext.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
}
appMonitor.submitApplication(context);
appMonitor.submitApplication(asContext);
RMApp app = rmContext.getRMApps().get(appID);
Assert.assertEquals("max application attempts doesn't match",
expectedNums[i][j], app.getMaxAppAttempts());
@ -428,96 +428,73 @@ public class TestAppManager{
Thread.sleep(1000);
}
setAppEventType(RMAppEventType.KILL);
((Service)rmContext.getDispatcher()).stop();
}
}
}
@Test (timeout = 3000)
public void testRMAppSubmitWithQueueAndName() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(1, now - 10);
ResourceScheduler scheduler = new CapacityScheduler();
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
ApplicationId appID = MockApps.newAppID(10);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(appID);
context.setApplicationName("testApp1");
context.setQueue("testQueue");
ContainerLaunchContext amContainer = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
amContainer
.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
context.setAMContainerSpec(amContainer);
setupDispatcher(rmContext, conf);
appMonitor.submitApplication(context);
RMApp app = rmContext.getRMApps().get(appID);
Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
// wait for event to be processed
int timeoutSecs = 0;
while ((getAppEventType() == RMAppEventType.KILL) &&
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
setAppEventType(RMAppEventType.KILL);
((Service)rmContext.getDispatcher()).stop();
}
@Test
public void testRMAppSubmitError() throws Exception {
long now = System.currentTimeMillis();
// specify 1 here and use same appId below so it gets duplicate entry
RMContext rmContext = mockRMContext(1, now - 10);
ResourceScheduler scheduler = new CapacityScheduler();
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
ApplicationId appID = MockApps.newAppID(0);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(appID);
context.setApplicationName("testApp1");
context.setQueue("testQueue");
setupDispatcher(rmContext, conf);
RMApp appOrig = rmContext.getRMApps().get(appID);
@Test (timeout = 30000)
public void testRMAppSubmitDuplicateApplicationId() throws Exception {
ApplicationId appId = MockApps.newAppID(0);
asContext.setApplicationId(appId);
RMApp appOrig = rmContext.getRMApps().get(appId);
Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
ContainerLaunchContext clc =
BuilderUtils.newContainerLaunchContext(null, null, null, null, null,
null, null);
context.setAMContainerSpec(clc);
// our testApp1 should be rejected and original app with same id should be left in place
appMonitor.submitApplication(context);
try {
appMonitor.submitApplication(asContext);
Assert.fail("Exception is expected when applicationId is duplicate.");
} catch (YarnRemoteException e) {
Assert.assertTrue("The thrown exception is not the expectd one.",
e.getMessage().contains("Cannot add a duplicate!"));
}
// make sure original app didn't get removed
RMApp app = rmContext.getRMApps().get(appID);
RMApp app = rmContext.getRMApps().get(appId);
Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName());
((Service)rmContext.getDispatcher()).stop();
Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
Assert.assertEquals("app state doesn't match", RMAppState.FINISHED, app.getState());
}
@Test (timeout = 30000)
public void testRMAppSubmitInvalidResourceRequest() throws Exception {
asContext.setResource(Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1));
// submit an app
try {
appMonitor.submitApplication(asContext);
Assert.fail("Application submission should fail because resource" +
" request is invalid.");
} catch (YarnRemoteException e) {
// Exception is expected
Assert.assertTrue("The thrown exception is not" +
" InvalidResourceRequestException",
e.getMessage().startsWith("Invalid resource request"));
}
}
private static ResourceScheduler mockResourceScheduler() {
ResourceScheduler scheduler = mock(ResourceScheduler.class);
when(scheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
when(scheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
return scheduler;
}
private static ContainerLaunchContext mockContainerLaunchContext(
RecordFactory recordFactory) {
ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
ContainerLaunchContext.class);
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());;
return amContainer;
}
private static Resource mockResource() {
return Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
}
}

View File

@ -250,17 +250,70 @@ public class TestClientRMService {
rmContext, null, null, null, dtsm);
rmService.renewDelegationToken(request);
}
@Test (timeout = 30000)
@SuppressWarnings ("rawtypes")
public void testAppSubmit() throws Exception {
YarnScheduler yarnScheduler = mockYarnScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {}
});
ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
// without name and queue
ApplicationId appId1 = getApplicationId(100);
SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
appId1, null, null);
try {
rmService.submitApplication(submitRequest1);
} catch (YarnRemoteException e) {
Assert.fail("Exception is not expected.");
}
RMApp app1 = rmContext.getRMApps().get(appId1);
Assert.assertNotNull("app doesn't exist", app1);
Assert.assertEquals("app name doesn't match",
YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
Assert.assertEquals("app queue doesn't match",
YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue());
// with name and queue
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
ApplicationId appId2 = getApplicationId(101);
SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
appId2, name, queue);
try {
rmService.submitApplication(submitRequest2);
} catch (YarnRemoteException e) {
Assert.fail("Exception is not expected.");
}
RMApp app2 = rmContext.getRMApps().get(appId2);
Assert.assertNotNull("app doesn't exist", app2);
Assert.assertEquals("app name doesn't match", name, app2.getName());
Assert.assertEquals("app queue doesn't match", queue, app2.getQueue());
// duplicate appId
try {
rmService.submitApplication(submitRequest2);
Assert.fail("Exception is expected.");
} catch (YarnRemoteException e) {
Assert.assertTrue("The thrown exception is not expected.",
e.getMessage().contains("Cannot add a duplicate!"));
}
}
@Test(timeout=4000)
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
YarnScheduler yarnScheduler = mockYarnScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
@ -270,8 +323,10 @@ public class TestClientRMService {
final ApplicationId appId1 = getApplicationId(100);
final ApplicationId appId2 = getApplicationId(101);
final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
appId1, null, null);
final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
appId2, null, null);
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
@ -319,61 +374,23 @@ public class TestClientRMService {
t.join();
}
@Test (timeout = 30000)
public void testInvalidResourceRequestWhenSubmittingApplication()
throws IOException, InterruptedException, BrokenBarrierException {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
final ApplicationId appId = getApplicationId(100);
final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
when(submitRequest.getApplicationSubmissionContext()
.getResource()).thenReturn(resource);
final ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
// submit an app
try {
rmService.submitApplication(submitRequest);
Assert.fail("Application submission should fail because resource" +
" request is invalid.");
} catch (YarnRemoteException e) {
// Exception is expected
Assert.assertTrue("The thrown exception is not" +
" InvalidResourceRequestException",
e.getMessage().startsWith("Invalid resource request"));
}
}
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue) {
String user = MockApps.newUserName();
String queue = MockApps.newQueue();
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
when(submissionContext.getAMContainerSpec().getUser()).thenReturn(user);
when(submissionContext.getQueue()).thenReturn(queue);
when(submissionContext.getApplicationId()).thenReturn(appId);
when(submissionContext.getResource()).thenReturn(resource);
ApplicationSubmissionContext submissionContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
submissionContext.setAMContainerSpec(amContainerSpec);
submissionContext.getAMContainerSpec().setUser(user);
submissionContext.setApplicationName(name);
submissionContext.setQueue(queue);
submissionContext.setApplicationId(appId);
submissionContext.setResource(resource);
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
@ -429,4 +446,15 @@ public class TestClientRMService {
queueName, asContext, yarnScheduler, null , System
.currentTimeMillis());
}
private static YarnScheduler mockYarnScheduler() {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
return yarnScheduler;
}
}