YARN-6031. Application recovery has failed when node label feature is turned off during RM recovery. Contributed by Ying Zhang.
This commit is contained in:
parent
a847903b6e
commit
3fa0d540df
|
@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.security.AccessRequest;
|
import org.apache.hadoop.yarn.security.AccessRequest;
|
||||||
|
@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
|
@ -336,6 +338,34 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
||||||
appState.getUser(), true, appState.getStartTime());
|
appState.getUser(), true, appState.getStartTime());
|
||||||
|
|
||||||
|
// If null amReq has been returned, check if it is the case that
|
||||||
|
// application has specified node label expression while node label
|
||||||
|
// has been disabled. Reject the recovery of this application if it
|
||||||
|
// is true and give clear message so that user can react properly.
|
||||||
|
if (!appContext.getUnmanagedAM() &&
|
||||||
|
application.getAMResourceRequest() == null &&
|
||||||
|
!YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
|
||||||
|
// check application submission context and see if am resource request
|
||||||
|
// or application itself contains any node label expression.
|
||||||
|
ResourceRequest amReqFromAppContext =
|
||||||
|
appContext.getAMContainerResourceRequest();
|
||||||
|
String labelExp = (amReqFromAppContext != null) ?
|
||||||
|
amReqFromAppContext.getNodeLabelExpression() : null;
|
||||||
|
if (labelExp == null) {
|
||||||
|
labelExp = appContext.getNodeLabelExpression();
|
||||||
|
}
|
||||||
|
if (labelExp != null &&
|
||||||
|
!labelExp.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||||
|
String message = "Failed to recover application " + appId
|
||||||
|
+ ". NodeLabel is not enabled in cluster, but AM resource request "
|
||||||
|
+ "contains a label expression.";
|
||||||
|
LOG.warn(message);
|
||||||
|
application.handle(
|
||||||
|
new RMAppEvent(appId, RMAppEventType.APP_REJECTED, message));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
application.handle(new RMAppRecoverEvent(appId, rmState));
|
application.handle(new RMAppRecoverEvent(appId, rmState));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,8 +385,28 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
ResourceRequest amReq =
|
ResourceRequest amReq = null;
|
||||||
validateAndCreateResourceRequest(submissionContext, isRecovery);
|
try {
|
||||||
|
amReq = validateAndCreateResourceRequest(submissionContext, isRecovery);
|
||||||
|
} catch (InvalidLabelResourceRequestException e) {
|
||||||
|
// This can happen if the application had been submitted and run
|
||||||
|
// with Node Label enabled but recover with Node Label disabled.
|
||||||
|
// Thus there might be node label expression in the application's
|
||||||
|
// resource requests. If this is the case, create RmAppImpl with
|
||||||
|
// null amReq and reject the application later with clear error
|
||||||
|
// message. So that the application can still be tracked by RM
|
||||||
|
// after recovery and user can see what's going on and react accordingly.
|
||||||
|
if (isRecovery &&
|
||||||
|
!YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("AMResourceRequest is not created for " + applicationId
|
||||||
|
+ ". NodeLabel is not enabled in cluster, but AM resource "
|
||||||
|
+ "request contains a label expression.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Verify and get the update application priority and set back to
|
// Verify and get the update application priority and set back to
|
||||||
// submissionContext
|
// submissionContext
|
||||||
|
|
|
@ -110,6 +110,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||||
|
@ -2539,4 +2540,72 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
return am;
|
return am;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRMRestartAfterNodeLabelDisabled() throws Exception {
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(
|
||||||
|
TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
// add node label "x" and set node to label mapping
|
||||||
|
Set<String> clusterNodeLabels = new HashSet<String>();
|
||||||
|
clusterNodeLabels.add("x");
|
||||||
|
RMNodeLabelsManager nodeLabelManager =
|
||||||
|
rm1.getRMContext().getNodeLabelManager();
|
||||||
|
nodeLabelManager.
|
||||||
|
addToCluserNodeLabelsWithDefaultExclusivity(clusterNodeLabels);
|
||||||
|
nodeLabelManager.addLabelsToNode(
|
||||||
|
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
||||||
|
|
||||||
|
// submit an application with specifying am node label expression as "x"
|
||||||
|
RMApp app1 = rm1.submitApp(200, "someApp", "someUser", null, "a1", "x");
|
||||||
|
// check am container allocated with correct node label expression
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
ContainerId amContainerId1 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
|
Assert.assertEquals("x", rm1.getRMContext().getScheduler().
|
||||||
|
getRMContainer(amContainerId1).getNodeLabelExpression());
|
||||||
|
finishApplicationMaster(app1, rm1, nm1, am1);
|
||||||
|
|
||||||
|
// restart rm with node label disabled
|
||||||
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
|
||||||
|
MockRM rm2 = new MockRM(
|
||||||
|
TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// rm should successfully start with app1 loaded back in FAILED state
|
||||||
|
// due to node label not enabled but am resource request contains
|
||||||
|
// node label expression.
|
||||||
|
try {
|
||||||
|
rm2.start();
|
||||||
|
Assert.assertTrue("RM start successfully", true);
|
||||||
|
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
||||||
|
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.debug("Exception on start", e);
|
||||||
|
Assert.fail("RM should start without any issue");
|
||||||
|
} finally {
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue