YARN-3493. RM fails to come up with error "Failed to load/recover state" when mem settings are changed. (Jian He via wangda)

(cherry picked from commit f65eeb412d)
This commit is contained in:
Wangda Tan 2015-04-17 17:11:22 -07:00
parent 9827da95bc
commit e7cbecddc3
8 changed files with 119 additions and 72 deletions

View File

@ -176,6 +176,9 @@ Release 2.8.0 - UNRELEASED
YARN-3021. YARN's delegation-token handling disallows certain trust setups
to operate properly over DistCp. (Yongjun Zhang via jianhe)
YARN-3493. RM fails to come up with error "Failed to load/recover state"
when mem settings are changed. (Jian He via wangda)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -499,7 +499,7 @@ public class ApplicationMasterService extends AbstractService implements
// sanity check
try {
RMServerUtils.validateResourceRequests(ask,
RMServerUtils.normalizeAndValidateRequests(ask,
rScheduler.getMaximumResourceCapability(), app.getQueue(),
rScheduler);
} catch (InvalidResourceRequestException e) {

View File

@ -279,7 +279,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
ApplicationId applicationId = submissionContext.getApplicationId();
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user);
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();
if (UserGroupInformation.isSecurityEnabled()) {
@ -316,16 +316,18 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// create and recover app.
RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser());
appState.getUser(), true);
application.handle(new RMAppRecoverEvent(appId, rmState));
}
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext,
long submitTime, String user)
throws YarnException {
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext);
ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery);
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
@ -343,7 +345,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
throw new YarnException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
@ -356,7 +358,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
private ResourceRequest validateAndCreateResourceRequest(
ApplicationSubmissionContext submissionContext)
ApplicationSubmissionContext submissionContext, boolean isRecovery)
throws InvalidResourceRequestException {
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
@ -365,16 +367,13 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq;
if (submissionContext.getAMContainerResourceRequest() != null) {
amReq = submissionContext.getAMContainerResourceRequest();
} else {
amReq =
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
ResourceRequest amReq = submissionContext.getAMContainerResourceRequest();
if (amReq == null) {
amReq = BuilderUtils
.newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
ResourceRequest.ANY, submissionContext.getResource(), 1);
}
// set label expression for AM container
if (null == amReq.getNodeLabelExpression()) {
amReq.setNodeLabelExpression(submissionContext
@ -382,14 +381,15 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
try {
SchedulerUtils.validateResourceRequest(amReq,
SchedulerUtils.normalizeAndValidateRequest(amReq,
scheduler.getMaximumResourceCapability(),
submissionContext.getQueue(), scheduler);
submissionContext.getQueue(), scheduler, isRecovery);
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e);
throw e;
}
SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(),
scheduler.getClusterResource(),
scheduler.getMinimumResourceCapability(),

View File

@ -90,11 +90,11 @@ public class RMServerUtils {
* Utility method to validate a list resource requests, by insuring that the
* requested memory/vcore is non-negative and not greater than max
*/
public static void validateResourceRequests(List<ResourceRequest> ask,
public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
Resource maximumResource, String queueName, YarnScheduler scheduler)
throws InvalidResourceRequestException {
for (ResourceRequest resReq : ask) {
SchedulerUtils.validateResourceRequest(resReq, maximumResource,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource,
queueName, scheduler);
}
}

View File

@ -940,7 +940,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
// SchedulerUtils.validateResourceRequests is not necessary because
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,

View File

@ -189,6 +189,50 @@ public class SchedulerUtils {
ask.setCapability(normalized);
}
private static void normalizeNodeLabelExpressionInRequest(
ResourceRequest resReq, QueueInfo queueInfo) {
String labelExp = resReq.getNodeLabelExpression();
// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null && ResourceRequest.ANY
.equals(resReq.getResourceName())) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
}
// If labelExp still equals to null, set it to be NO_LABEL
if (labelExp == null) {
labelExp = RMNodeLabelsManager.NO_LABEL;
}
resReq.setNodeLabelExpression(labelExp);
}
public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
boolean isRecovery)
throws InvalidResourceRequestException {
QueueInfo queueInfo = null;
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
}
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
if (!isRecovery) {
validateResourceRequest(resReq, maximumResource, queueInfo);
}
}
public static void normalizeAndvalidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler)
throws InvalidResourceRequestException {
normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
false);
}
/**
* Utility method to validate a resource request, by insuring that the
* requested memory/vcore is non-negative and not greater than max
@ -196,7 +240,7 @@ public class SchedulerUtils {
* @throws InvalidResourceRequestException when there is invalid request
*/
public static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler)
Resource maximumResource, QueueInfo queueInfo)
throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
@ -216,31 +260,7 @@ public class SchedulerUtils {
+ resReq.getCapability().getVirtualCores()
+ ", maxVirtualCores=" + maximumResource.getVirtualCores());
}
// Get queue from scheduler
QueueInfo queueInfo = null;
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
}
// check labels in the resource request.
String labelExp = resReq.getNodeLabelExpression();
// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null
&& ResourceRequest.ANY.equals(resReq.getResourceName())) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
}
// If labelExp still equals to null, set it to be NO_LABEL
resReq
.setNodeLabelExpression(labelExp == null ? RMNodeLabelsManager.NO_LABEL
: labelExp);
// we don't allow specify label expression other than resourceName=ANY now
if (!ResourceRequest.ANY.equals(resReq.getResourceName())
&& labelExp != null && !labelExp.trim().isEmpty()) {

View File

@ -1011,4 +1011,30 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
}
/**
* Test validateAndCreateResourceRequest fails on recovery, app should ignore
* this Exception and continue
*/
@Test (timeout = 30000)
public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Change the config so that validateAndCreateResourceRequest throws
// exception on recovery
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100);
rm2 = new MockRM(conf, memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
}
}

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestExceptio
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
@ -213,19 +212,19 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("y");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression(" ");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
} catch (InvalidResourceRequestException e) {
e.printStackTrace();
@ -244,7 +243,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("z");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -263,7 +262,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x && y");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -280,15 +279,15 @@ public class TestSchedulerUtils {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression(" ");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
} catch (InvalidResourceRequestException e) {
e.printStackTrace();
@ -306,7 +305,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -324,15 +323,15 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("y");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
resReq.setNodeLabelExpression("z");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
} catch (InvalidResourceRequestException e) {
e.printStackTrace();
@ -351,7 +350,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), "rack", resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -371,7 +370,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), "rack", resource, 1);
resReq.setNodeLabelExpression("x");
SchedulerUtils.validateResourceRequest(resReq, maxResource, "queue",
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
scheduler);
fail("Should fail");
} catch (InvalidResourceRequestException e) {
@ -395,7 +394,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Zero memory should be accepted");
@ -409,7 +408,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Zero vcores should be accepted");
@ -424,7 +423,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Max memory should be accepted");
@ -439,7 +438,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
} catch (InvalidResourceRequestException e) {
fail("Max vcores should not be accepted");
@ -453,7 +452,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("Negative memory should not be accepted");
} catch (InvalidResourceRequestException e) {
@ -468,7 +467,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("Negative vcores should not be accepted");
} catch (InvalidResourceRequestException e) {
@ -484,7 +483,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("More than max memory should not be accepted");
} catch (InvalidResourceRequestException e) {
@ -501,7 +500,7 @@ public class TestSchedulerUtils {
ResourceRequest resReq =
BuilderUtils.newResourceRequest(mock(Priority.class),
ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource, null,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
mockScheduler);
fail("More than max vcores should not be accepted");
} catch (InvalidResourceRequestException e) {