YARN-3493. RM fails to come up with error "Failed to load/recover state" when mem settings are changed. (Jian He via wangda)

(cherry picked from commit f65eeb412d)
(cherry picked from commit e7cbecddc3)
(cherry picked from commit 9d47d5aa5bffe427c4a77260f7ccc039d446e1fd)
This commit is contained in:
Wangda Tan 2015-04-17 17:11:22 -07:00 committed by Vinod Kumar Vavilapalli
parent b7e7896410
commit 61f2ddb125
8 changed files with 118 additions and 65 deletions

View File

@ -123,6 +123,9 @@ Release 2.6.1 - UNRELEASED
YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token
renewal of applications part of a bigger workflow. (Daryn Sharp via vinodkv)
YARN-3493. RM fails to come up with error "Failed to load/recover state"
when mem settings are changed. (Jian He via wangda)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -498,7 +498,7 @@ public class ApplicationMasterService extends AbstractService implements
// sanity check
try {
RMServerUtils.validateResourceRequests(ask,
RMServerUtils.normalizeAndValidateRequests(ask,
rScheduler.getMaximumResourceCapability(), app.getQueue(),
rScheduler);
} catch (InvalidResourceRequestException e) {

View File

@ -271,7 +271,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
ApplicationId applicationId = submissionContext.getApplicationId();
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user);
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();
if (UserGroupInformation.isSecurityEnabled()) {
@ -308,16 +308,18 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// create and recover app.
RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser());
appState.getUser(), true);
application.handle(new RMAppRecoverEvent(appId, rmState));
}
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext,
long submitTime, String user)
throws YarnException {
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext);
ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery);
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
@ -335,7 +337,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
throw new YarnException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
@ -348,7 +350,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
private ResourceRequest validateAndCreateResourceRequest(
ApplicationSubmissionContext submissionContext)
ApplicationSubmissionContext submissionContext, boolean isRecovery)
throws InvalidResourceRequestException {
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
@ -357,14 +359,11 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq;
if (submissionContext.getAMContainerResourceRequest() != null) {
amReq = submissionContext.getAMContainerResourceRequest();
} else {
amReq =
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
ResourceRequest amReq = submissionContext.getAMContainerResourceRequest();
if (amReq == null) {
amReq = BuilderUtils
.newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
ResourceRequest.ANY, submissionContext.getResource(), 1);
}
// set label expression for AM container
@ -374,15 +373,14 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
try {
SchedulerUtils.validateResourceRequest(amReq,
SchedulerUtils.normalizeAndValidateRequest(amReq,
scheduler.getMaximumResourceCapability(),
submissionContext.getQueue(), scheduler);
submissionContext.getQueue(), scheduler, isRecovery);
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e);
throw e;
}
return amReq;
}

View File

@ -89,11 +89,11 @@ public class RMServerUtils {
* Utility method to validate a list resource requests, by insuring that the
* requested memory/vcore is non-negative and not greater than max
*/
public static void validateResourceRequests(List<ResourceRequest> ask,
public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
Resource maximumResource, String queueName, YarnScheduler scheduler)
throws InvalidResourceRequestException {
for (ResourceRequest resReq : ask) {
SchedulerUtils.validateResourceRequest(resReq, maximumResource,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource,
queueName, scheduler);
}
}

View File

@ -909,7 +909,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
// SchedulerUtils.validateResourceRequests is not necessary because
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,

View File

@ -188,6 +188,50 @@ public class SchedulerUtils {
ask.setCapability(normalized);
}
private static void normalizeNodeLabelExpressionInRequest(
ResourceRequest resReq, QueueInfo queueInfo) {
String labelExp = resReq.getNodeLabelExpression();
// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null && ResourceRequest.ANY
.equals(resReq.getResourceName())) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
}
// If labelExp still equals to null, set it to be NO_LABEL
if (labelExp == null) {
labelExp = RMNodeLabelsManager.NO_LABEL;
}
resReq.setNodeLabelExpression(labelExp);
}
public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
boolean isRecovery)
throws InvalidResourceRequestException {
QueueInfo queueInfo = null;
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
}
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
if (!isRecovery) {
validateResourceRequest(resReq, maximumResource, queueInfo);
}
}
public static void normalizeAndvalidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler)
throws InvalidResourceRequestException {
normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
false);
}
/**
* Utility method to validate a resource request, by insuring that the
* requested memory/vcore is non-negative and not greater than max
@ -196,7 +240,7 @@ public class SchedulerUtils {
* request
*/
public static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler)
Resource maximumResource, QueueInfo queueInfo)
throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
@ -216,26 +260,8 @@ public class SchedulerUtils {
+ resReq.getCapability().getVirtualCores()
+ ", maxVirtualCores=" + maximumResource.getVirtualCores());
}
// Get queue from scheduler
QueueInfo queueInfo = null;
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
}
// check labels in the resource request.
String labelExp = resReq.getNodeLabelExpression();
// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
resReq.setNodeLabelExpression(labelExp);
}
if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
labelExp)) {

View File

@ -985,4 +985,30 @@ public class TestWorkPreservingRMRestart {
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
}
/**
* Test validateAndCreateResourceRequest fails on recovery, app should ignore
* this Exception and continue
*/
@Test (timeout = 30000)
public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Change the config so that validateAndCreateResourceRequest throws
// exception on recovery
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100);
rm2 = new MockRM(conf, memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
}
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
@ -211,23 +212,23 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("x && y");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("y");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression(" ");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
} catch (InvalidResourceRequestException e) {
e.printStackTrace();
@ -246,7 +247,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("z");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -263,7 +264,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x && y && z");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -280,15 +281,15 @@ public class TestSchedulerUtils {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression(" ");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
} catch (InvalidResourceRequestException e) {
e.printStackTrace();
@ -306,7 +307,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -324,15 +325,15 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("x && y && z");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("z");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
} catch (InvalidResourceRequestException e) {
e.printStackTrace();
@ -357,7 +358,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Zero memory should be accepted");
@ -371,7 +372,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Zero vcores should be accepted");
@ -386,7 +387,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Max memory should be accepted");
@ -401,7 +402,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Max vcores should not be accepted");
@ -415,7 +416,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("Negative memory should not be accepted");
} catch (InvalidResourceRequestException e) {
@ -430,7 +431,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("Negative vcores should not be accepted");
} catch (InvalidResourceRequestException e) {
@ -446,7 +447,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("More than max memory should not be accepted");
} catch (InvalidResourceRequestException e) {
@ -463,7 +464,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("More than max vcores should not be accepted");
} catch (InvalidResourceRequestException e) {