YARN-6872. Ensure apps could run given NodeLabels are disabled post RM switchover/restart. Contributed by Sunil G
This commit is contained in:
parent
a01d92156d
commit
e84a3f43a1
|
@ -17,11 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -37,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
|
@ -56,11 +51,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class manages the list of applications for the resource manager.
|
* This class manages the list of applications for the resource manager.
|
||||||
|
@ -324,34 +321,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
||||||
appState.getUser(), true);
|
appState.getUser(), true);
|
||||||
|
|
||||||
// If null amReq has been returned, check if it is the case that
|
|
||||||
// application has specified node label expression while node label
|
|
||||||
// has been disabled. Reject the recovery of this application if it
|
|
||||||
// is true and give clear message so that user can react properly.
|
|
||||||
if (!appContext.getUnmanagedAM() &&
|
|
||||||
application.getAMResourceRequest() == null &&
|
|
||||||
!YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
|
|
||||||
// check application submission context and see if am resource request
|
|
||||||
// or application itself contains any node label expression.
|
|
||||||
ResourceRequest amReqFromAppContext =
|
|
||||||
appContext.getAMContainerResourceRequest();
|
|
||||||
String labelExp = (amReqFromAppContext != null) ?
|
|
||||||
amReqFromAppContext.getNodeLabelExpression() : null;
|
|
||||||
if (labelExp == null) {
|
|
||||||
labelExp = appContext.getNodeLabelExpression();
|
|
||||||
}
|
|
||||||
if (labelExp != null &&
|
|
||||||
!labelExp.equals(RMNodeLabelsManager.NO_LABEL)) {
|
|
||||||
String message = "Failed to recover application " + appId
|
|
||||||
+ ". NodeLabel is not enabled in cluster, but AM resource request "
|
|
||||||
+ "contains a label expression.";
|
|
||||||
LOG.warn(message);
|
|
||||||
application.handle(
|
|
||||||
new RMAppEvent(appId, RMAppEventType.APP_REJECTED, message));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
application.handle(new RMAppRecoverEvent(appId, rmState));
|
application.handle(new RMAppRecoverEvent(appId, rmState));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,28 +337,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
ResourceRequest amReq = null;
|
|
||||||
try {
|
ResourceRequest amReq = validateAndCreateResourceRequest(
|
||||||
amReq = validateAndCreateResourceRequest(submissionContext, isRecovery);
|
submissionContext, isRecovery);
|
||||||
} catch (InvalidLabelResourceRequestException e) {
|
|
||||||
// This can happen if the application had been submitted and run
|
|
||||||
// with Node Label enabled but recover with Node Label disabled.
|
|
||||||
// Thus there might be node label expression in the application's
|
|
||||||
// resource requests. If this is the case, create RmAppImpl with
|
|
||||||
// null amReq and reject the application later with clear error
|
|
||||||
// message. So that the application can still be tracked by RM
|
|
||||||
// after recovery and user can see what's going on and react accordingly.
|
|
||||||
if (isRecovery &&
|
|
||||||
!YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("AMResourceRequest is not created for " + applicationId
|
|
||||||
+ ". NodeLabel is not enabled in cluster, but AM resource "
|
|
||||||
+ "request contains a label expression.");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify and get the update application priority and set back to
|
// Verify and get the update application priority and set back to
|
||||||
// submissionContext
|
// submissionContext
|
||||||
|
|
|
@ -815,7 +815,8 @@ public class AppSchedulingInfo {
|
||||||
this.placesBlacklistedByApp = appInfo.getBlackList();
|
this.placesBlacklistedByApp = appInfo.getBlackList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void recoverContainer(RMContainer rmContainer) {
|
public synchronized void recoverContainer(RMContainer rmContainer,
|
||||||
|
String partition) {
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
if (pending) {
|
if (pending) {
|
||||||
// If there was any container to recover, the application was
|
// If there was any container to recover, the application was
|
||||||
|
@ -828,9 +829,8 @@ public class AppSchedulingInfo {
|
||||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
metrics.allocateResources(rmContainer.getNodeLabelExpression(),
|
metrics.allocateResources(partition, user, 1,
|
||||||
user, 1, rmContainer.getAllocatedResource(),
|
rmContainer.getAllocatedResource(), false);
|
||||||
false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
|
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
|
||||||
|
|
|
@ -779,7 +779,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
public synchronized void recoverContainer(SchedulerNode node,
|
public synchronized void recoverContainer(SchedulerNode node,
|
||||||
RMContainer rmContainer) {
|
RMContainer rmContainer) {
|
||||||
// recover app scheduling info
|
// recover app scheduling info
|
||||||
appSchedulingInfo.recoverContainer(rmContainer);
|
appSchedulingInfo.recoverContainer(rmContainer, node.getPartition());
|
||||||
|
|
||||||
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -51,7 +53,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class SchedulerUtils {
|
public class SchedulerUtils {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(SchedulerUtils.class);
|
||||||
|
|
||||||
private static final RecordFactory recordFactory =
|
private static final RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
|
@ -230,9 +234,14 @@ public class SchedulerUtils {
|
||||||
String labelExp = resReq.getNodeLabelExpression();
|
String labelExp = resReq.getNodeLabelExpression();
|
||||||
if (!(RMNodeLabelsManager.NO_LABEL.equals(labelExp)
|
if (!(RMNodeLabelsManager.NO_LABEL.equals(labelExp)
|
||||||
|| null == labelExp)) {
|
|| null == labelExp)) {
|
||||||
throw new InvalidLabelResourceRequestException(
|
String message = "NodeLabel is not enabled in cluster, but resource"
|
||||||
"Invalid resource request, node label not enabled "
|
+ " request contains a label expression.";
|
||||||
+ "but request contains label expression");
|
LOG.warn(message);
|
||||||
|
if (!isRecovery) {
|
||||||
|
throw new InvalidLabelResourceRequestException(
|
||||||
|
"Invalid resource request, node label not enabled "
|
||||||
|
+ "but request contains label expression");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (null == queueInfo) {
|
if (null == queueInfo) {
|
||||||
|
|
|
@ -2397,14 +2397,14 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// rm should successfully start with app1 loaded back in FAILED state
|
// rm should successfully start with app1 loaded back in SUCCESS state
|
||||||
// due to node label not enabled but am resource request contains
|
// by pushing app to run default label for am container and let other
|
||||||
// node label expression.
|
// containers to run normally.
|
||||||
|
|
||||||
try {
|
try {
|
||||||
rm2.start();
|
rm2.start();
|
||||||
Assert.assertTrue("RM start successfully", true);
|
Assert.assertTrue("RM start successfully", true);
|
||||||
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
||||||
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.debug("Exception on start", e);
|
LOG.debug("Exception on start", e);
|
||||||
Assert.fail("RM should start without any issue");
|
Assert.fail("RM should start without any issue");
|
||||||
|
|
Loading…
Reference in New Issue