YARN-6031. Application recovery has failed when node label feature is turned off during RM recovery. Contributed by Ying Zhang.

(cherry picked from commit 3fa0d540df)
This commit is contained in:
Sunil G 2017-01-23 12:19:35 +05:30
parent efe8f941a9
commit f8e49bb343
2 changed files with 121 additions and 2 deletions

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.AccessRequest; import org.apache.hadoop.yarn.security.AccessRequest;
@ -63,6 +64,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -334,6 +336,34 @@ protected void recoverApplication(ApplicationStateData appState,
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser(), true); appState.getUser(), true);
// If null amReq has been returned, check if it is the case that
// application has specified node label expression while node label
// has been disabled. Reject the recovery of this application if it
// is true and give clear message so that user can react properly.
if (!appContext.getUnmanagedAM() &&
application.getAMResourceRequest() == null &&
!YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
// check application submission context and see if am resource request
// or application itself contains any node label expression.
ResourceRequest amReqFromAppContext =
appContext.getAMContainerResourceRequest();
String labelExp = (amReqFromAppContext != null) ?
amReqFromAppContext.getNodeLabelExpression() : null;
if (labelExp == null) {
labelExp = appContext.getNodeLabelExpression();
}
if (labelExp != null &&
!labelExp.equals(RMNodeLabelsManager.NO_LABEL)) {
String message = "Failed to recover application " + appId
+ ". NodeLabel is not enabled in cluster, but AM resource request "
+ "contains a label expression.";
LOG.warn(message);
application.handle(
new RMAppEvent(appId, RMAppEventType.APP_REJECTED, message));
return;
}
}
application.handle(new RMAppRecoverEvent(appId, rmState)); application.handle(new RMAppRecoverEvent(appId, rmState));
} }
@ -353,8 +383,28 @@ private RMAppImpl createAndPopulateNewRMApp(
} }
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq = ResourceRequest amReq = null;
validateAndCreateResourceRequest(submissionContext, isRecovery); try {
amReq = validateAndCreateResourceRequest(submissionContext, isRecovery);
} catch (InvalidLabelResourceRequestException e) {
// This can happen if the application had been submitted and run
// with Node Label enabled but recover with Node Label disabled.
// Thus there might be node label expression in the application's
// resource requests. If this is the case, create RmAppImpl with
// null amReq and reject the application later with clear error
// message. So that the application can still be tracked by RM
// after recovery and user can see what's going on and react accordingly.
if (isRecovery &&
!YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
if (LOG.isDebugEnabled()) {
LOG.debug("AMResourceRequest is not created for " + applicationId
+ ". NodeLabel is not enabled in cluster, but AM resource "
+ "request contains a label expression.");
}
} else {
throw e;
}
}
// Verify and get the update application priority and set back to // Verify and get the update application priority and set back to
// submissionContext // submissionContext

View File

@ -110,6 +110,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -2472,4 +2473,72 @@ private MockAM launchAndFailAM(RMApp app, MockRM rm, MockNM nm)
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
return am; return am;
} }
@Test(timeout = 60000)
public void testRMRestartAfterNodeLabelDisabled() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
MockRM rm1 = new MockRM(
TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
rm1.start();
// add node label "x" and set node to label mapping
Set<String> clusterNodeLabels = new HashSet<String>();
clusterNodeLabels.add("x");
RMNodeLabelsManager nodeLabelManager =
rm1.getRMContext().getNodeLabelManager();
nodeLabelManager.
addToCluserNodeLabelsWithDefaultExclusivity(clusterNodeLabels);
nodeLabelManager.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
// submit an application with specifying am node label expression as "x"
RMApp app1 = rm1.submitApp(200, "someApp", "someUser", null, "a1", "x");
// check am container allocated with correct node label expression
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
ContainerId amContainerId1 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
Assert.assertEquals("x", rm1.getRMContext().getScheduler().
getRMContainer(amContainerId1).getNodeLabelExpression());
finishApplicationMaster(app1, rm1, nm1, am1);
// restart rm with node label disabled
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
MockRM rm2 = new MockRM(
TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
// rm should successfully start with app1 loaded back in FAILED state
// due to node label not enabled but am resource request contains
// node label expression.
try {
rm2.start();
Assert.assertTrue("RM start successfully", true);
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
} catch (Exception e) {
LOG.debug("Exception on start", e);
Assert.fail("RM should start without any issue");
} finally {
rm1.stop();
rm2.stop();
}
}
} }