YARN-10953. Make CapacityScheduler#getOrCreateQueueFromPlacementConte… Contributed by Andras Gyori
This commit is contained in:
parent
5535d66fb5
commit
4b1b6b858a
|
@ -234,6 +234,7 @@ public class CapacityScheduler extends
|
|||
private boolean multiNodePlacementEnabled;
|
||||
|
||||
private boolean printedVerboseLoggingForAsyncScheduling;
|
||||
private boolean appShouldFailFast;
|
||||
|
||||
/**
|
||||
* EXPERT
|
||||
|
@ -355,6 +356,9 @@ public class CapacityScheduler extends
|
|||
this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
|
||||
this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
|
||||
|
||||
this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(
|
||||
getConfig());
|
||||
|
||||
// number of threads for async scheduling
|
||||
int maxAsyncSchedulingThreads = this.conf.getInt(
|
||||
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
|
||||
|
@ -491,6 +495,8 @@ public class CapacityScheduler extends
|
|||
assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
|
||||
maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
|
||||
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
|
||||
appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(
|
||||
getConfig());
|
||||
|
||||
LOG.info("assignMultipleEnabled = " + assignMultipleEnabled + "\n" +
|
||||
"maxAssignPerHeartbeat = " + maxAssignPerHeartbeat + "\n" +
|
||||
|
@ -880,7 +886,7 @@ public class CapacityScheduler extends
|
|||
if (queue == null) {
|
||||
//During a restart, this indicates a queue was removed, which is
|
||||
//not presently supported
|
||||
if (!getConfiguration().shouldAppFailFast(getConfig())) {
|
||||
if (!appShouldFailFast) {
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.KILL,
|
||||
"Application killed on recovery as it"
|
||||
|
@ -901,7 +907,7 @@ public class CapacityScheduler extends
|
|||
if (!(queue instanceof LeafQueue)) {
|
||||
// During RM restart, this means leaf queue was converted to a parent
|
||||
// queue, which is not supported for running apps.
|
||||
if (!getConfiguration().shouldAppFailFast(getConfig())) {
|
||||
if (!appShouldFailFast) {
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.KILL,
|
||||
"Application killed on recovery as it was "
|
||||
|
@ -951,73 +957,83 @@ public class CapacityScheduler extends
|
|||
applicationId, String user, String queueName,
|
||||
ApplicationPlacementContext placementContext,
|
||||
boolean isRecovery) {
|
||||
|
||||
CSQueue queue = getQueue(queueName);
|
||||
ApplicationPlacementContext fallbackContext = placementContext;
|
||||
QueuePath queuePath = new QueuePath(queueName);
|
||||
|
||||
if (queue == null) {
|
||||
// Even if placement rules are turned off, we still have the opportunity
|
||||
// to auto create a queue.
|
||||
if (placementContext == null) {
|
||||
fallbackContext = CSQueueUtils.extractQueuePath(queueName);
|
||||
}
|
||||
|
||||
//we need to make sure there is no empty path parts present
|
||||
String path = fallbackContext.getFullQueuePath();
|
||||
String[] pathParts = path.split("\\.");
|
||||
for (int i = 0; i < pathParts.length; i++) {
|
||||
if ("".equals(pathParts[i])) {
|
||||
LOG.error("Application submitted to invalid path: '{}'", path);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
if (fallbackContext.hasParentQueue()) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
return queueManager.createQueue(fallbackContext);
|
||||
} catch (YarnException | IOException e) {
|
||||
// A null queue is expected if the placementContext is null. In order
|
||||
// not to disrupt the control flow, if we fail to auto create a queue,
|
||||
// we fall back to the original logic.
|
||||
if (placementContext == null) {
|
||||
LOG.error("Could not auto-create leaf queue " + queueName +
|
||||
" due to : ", e);
|
||||
return null;
|
||||
}
|
||||
if (isRecovery) {
|
||||
if (!getConfiguration().shouldAppFailFast(getConfig())) {
|
||||
LOG.error("Could not auto-create leaf queue " + queueName +
|
||||
" due to : ", e);
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.KILL,
|
||||
"Application killed on recovery"
|
||||
+ " as it was submitted to queue " + queueName
|
||||
+ " which could not be auto-created"));
|
||||
} else{
|
||||
String queueErrorMsg =
|
||||
"Queue named " + queueName + " could not be "
|
||||
+ "auto-created during application recovery.";
|
||||
LOG.error(FATAL, queueErrorMsg, e);
|
||||
throw new QueueInvalidException(queueErrorMsg);
|
||||
}
|
||||
} else{
|
||||
LOG.error("Could not auto-create leaf queue due to : ", e);
|
||||
final String message =
|
||||
"Application " + applicationId + " submission by user : "
|
||||
+ user
|
||||
+ " to queue : " + queueName + " failed : " + e
|
||||
.getMessage();
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
|
||||
message));
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
if (queue != null) {
|
||||
return queue;
|
||||
}
|
||||
|
||||
if (isAmbiguous(queueName)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (placementContext != null) {
|
||||
queuePath = new QueuePath(placementContext.getFullQueuePath());
|
||||
}
|
||||
|
||||
//we need to make sure there are no empty path parts present
|
||||
if (queuePath.hasEmptyPart()) {
|
||||
LOG.error("Application submitted to invalid path due to empty parts: " +
|
||||
"'{}'", queuePath);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!queuePath.hasParent()) {
|
||||
LOG.error("Application submitted to a queue without parent" +
|
||||
" '{}'", queuePath);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
writeLock.lock();
|
||||
return queueManager.createQueue(queuePath);
|
||||
} catch (YarnException | IOException e) {
|
||||
// A null queue is expected if the placementContext is null. In order
|
||||
// not to disrupt the control flow, if we fail to auto create a queue,
|
||||
// we fall back to the original logic.
|
||||
if (placementContext == null) {
|
||||
LOG.error("Could not auto-create leaf queue " + queueName +
|
||||
" due to : ", e);
|
||||
return null;
|
||||
}
|
||||
handleQueueCreationError(applicationId, user, queueName, isRecovery, e);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void handleQueueCreationError(
|
||||
ApplicationId applicationId, String user, String queueName,
|
||||
boolean isRecovery, Exception e) {
|
||||
if (isRecovery) {
|
||||
if (!appShouldFailFast) {
|
||||
LOG.error("Could not auto-create leaf queue " + queueName +
|
||||
" due to : ", e);
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.KILL,
|
||||
"Application killed on recovery"
|
||||
+ " as it was submitted to queue " + queueName
|
||||
+ " which did not exist and could not be auto-created"));
|
||||
} else {
|
||||
String queueErrorMsg =
|
||||
"Queue named " + queueName + " could not be "
|
||||
+ "auto-created during application recovery.";
|
||||
LOG.error(FATAL, queueErrorMsg, e);
|
||||
throw new QueueInvalidException(queueErrorMsg);
|
||||
}
|
||||
} else {
|
||||
LOG.error("Could not auto-create leaf queue due to : ", e);
|
||||
final String message =
|
||||
"Application " + applicationId + " submission by user : "
|
||||
+ user
|
||||
+ " to queue : " + queueName + " failed : " + e
|
||||
.getMessage();
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
|
||||
message));
|
||||
}
|
||||
return queue;
|
||||
}
|
||||
|
||||
private void addApplication(ApplicationId applicationId, String queueName,
|
||||
|
|
|
@ -1672,7 +1672,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED);
|
||||
}
|
||||
|
||||
public boolean shouldAppFailFast(Configuration conf) {
|
||||
public static boolean shouldAppFailFast(Configuration conf) {
|
||||
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -528,10 +527,10 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
* @throws YarnException if the given path is not eligible to be auto created
|
||||
* @throws IOException if the given path can not be added to the parent
|
||||
*/
|
||||
public LeafQueue createQueue(ApplicationPlacementContext queue)
|
||||
public LeafQueue createQueue(QueuePath queue)
|
||||
throws YarnException, IOException {
|
||||
String leafQueueName = queue.getQueue();
|
||||
String parentQueueName = queue.getParentQueue();
|
||||
String leafQueueName = queue.getLeafName();
|
||||
String parentQueueName = queue.getParent();
|
||||
|
||||
if (!StringUtils.isEmpty(parentQueueName)) {
|
||||
CSQueue parentQueue = getQueue(parentQueueName);
|
||||
|
@ -563,16 +562,22 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
* to be auto created
|
||||
*/
|
||||
public List<String> determineMissingParents(
|
||||
ApplicationPlacementContext queue) throws SchedulerDynamicEditException {
|
||||
if (!queue.hasParentQueue()) {
|
||||
QueuePath queue) throws SchedulerDynamicEditException {
|
||||
if (!queue.hasParent()) {
|
||||
throw new SchedulerDynamicEditException("Can not auto create queue "
|
||||
+ queue.getFullQueuePath() + " due to missing ParentQueue path.");
|
||||
+ queue.getFullPath() + " due to missing ParentQueue path.");
|
||||
}
|
||||
|
||||
if (isAmbiguous(queue.getParent())) {
|
||||
throw new SchedulerDynamicEditException("Could not auto-create queue "
|
||||
+ queue + " due to ParentQueue " + queue.getParent() +
|
||||
" being ambiguous.");
|
||||
}
|
||||
|
||||
// Start from the first parent
|
||||
int firstStaticParentDistance = 1;
|
||||
|
||||
StringBuilder parentCandidate = new StringBuilder(queue.getParentQueue());
|
||||
StringBuilder parentCandidate = new StringBuilder(queue.getParent());
|
||||
LinkedList<String> parentsToCreate = new LinkedList<>();
|
||||
|
||||
CSQueue firstExistingParent = getQueue(parentCandidate.toString());
|
||||
|
@ -584,7 +589,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
|
||||
if (firstStaticParentDistance > MAXIMUM_DYNAMIC_QUEUE_DEPTH) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto create queue " + queue.getFullQueuePath()
|
||||
"Could not auto create queue " + queue.getFullPath()
|
||||
+ ". The distance of the LeafQueue from the first static " +
|
||||
"ParentQueue is " + firstStaticParentDistance + ", which is " +
|
||||
"above the limit.");
|
||||
|
@ -607,7 +612,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
if (!(firstExistingParent instanceof ParentQueue)) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Could not auto create hierarchy of "
|
||||
+ queue.getFullQueuePath() + ". Queue " + queue.getParentQueue() +
|
||||
+ queue.getFullPath() + ". Queue " + queue.getParent() +
|
||||
" is not a ParentQueue."
|
||||
);
|
||||
}
|
||||
|
@ -616,7 +621,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
|
||||
if (!existingParentQueue.isEligibleForAutoQueueCreation()) {
|
||||
throw new SchedulerDynamicEditException("Auto creation of queue " +
|
||||
queue.getFullQueuePath() + " is not enabled under parent "
|
||||
queue.getFullPath() + " is not enabled under parent "
|
||||
+ existingParentQueue.getQueuePath());
|
||||
}
|
||||
|
||||
|
@ -637,12 +642,12 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
this.configuredNodeLabels = new ConfiguredNodeLabels(conf);
|
||||
}
|
||||
|
||||
private LeafQueue createAutoQueue(ApplicationPlacementContext queue)
|
||||
private LeafQueue createAutoQueue(QueuePath queue)
|
||||
throws SchedulerDynamicEditException {
|
||||
List<String> parentsToCreate = determineMissingParents(queue);
|
||||
// First existing parent is either the parent of the last missing parent
|
||||
// or the parent of the given path
|
||||
String existingParentName = queue.getParentQueue();
|
||||
String existingParentName = queue.getParent();
|
||||
if (!parentsToCreate.isEmpty()) {
|
||||
existingParentName = parentsToCreate.get(0).substring(
|
||||
0, parentsToCreate.get(0).lastIndexOf("."));
|
||||
|
@ -657,21 +662,21 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
}
|
||||
|
||||
LeafQueue leafQueue = existingParentQueue.addDynamicLeafQueue(
|
||||
queue.getFullQueuePath());
|
||||
queue.getFullPath());
|
||||
addQueue(leafQueue.getQueuePath(), leafQueue);
|
||||
|
||||
return leafQueue;
|
||||
}
|
||||
|
||||
private LeafQueue createLegacyAutoQueue(ApplicationPlacementContext queue)
|
||||
private LeafQueue createLegacyAutoQueue(QueuePath queue)
|
||||
throws IOException, SchedulerDynamicEditException {
|
||||
CSQueue parentQueue = getQueue(queue.getParentQueue());
|
||||
CSQueue parentQueue = getQueue(queue.getParent());
|
||||
// Case 1: Handle ManagedParentQueue
|
||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||
(ManagedParentQueue) parentQueue;
|
||||
AutoCreatedLeafQueue autoCreatedLeafQueue =
|
||||
new AutoCreatedLeafQueue(
|
||||
csContext, queue.getQueue(), autoCreateEnabledParentQueue);
|
||||
csContext, queue.getLeafName(), autoCreateEnabledParentQueue);
|
||||
|
||||
addLegacyDynamicQueue(autoCreatedLeafQueue);
|
||||
return autoCreatedLeafQueue;
|
||||
|
|
|
@ -1566,6 +1566,46 @@ public class TestCapacityScheduler {
|
|||
return rm;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppSubmission() throws Exception {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setQueues(A, new String[] {"a1", "a2", "b"});
|
||||
conf.setCapacity(A1, 20);
|
||||
conf.setCapacity("root.a.b", 10);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
RMApp noParentQueueApp = submitAppAndWaitForState(rm, "q", RMAppState.FAILED);
|
||||
Assert.assertEquals(RMAppState.FAILED, noParentQueueApp.getState());
|
||||
|
||||
RMApp ambiguousQueueApp = submitAppAndWaitForState(rm, "b", RMAppState.FAILED);
|
||||
Assert.assertEquals(RMAppState.FAILED, ambiguousQueueApp.getState());
|
||||
|
||||
RMApp emptyPartQueueApp = submitAppAndWaitForState(rm, "root..a1", RMAppState.FAILED);
|
||||
Assert.assertEquals(RMAppState.FAILED, emptyPartQueueApp.getState());
|
||||
|
||||
RMApp failedAutoQueue = submitAppAndWaitForState(rm, "root.a.b.c.d", RMAppState.FAILED);
|
||||
Assert.assertEquals(RMAppState.FAILED, failedAutoQueue.getState());
|
||||
}
|
||||
|
||||
private RMApp submitAppAndWaitForState(MockRM rm, String b, RMAppState state) throws Exception {
|
||||
MockRMAppSubmissionData ambiguousQueueAppData =
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
|
||||
.withWaitForAppAcceptedState(false)
|
||||
.withAppName("app")
|
||||
.withUser("user")
|
||||
.withAcls(null)
|
||||
.withQueue(b)
|
||||
.withUnmanagedAM(false)
|
||||
.build();
|
||||
RMApp app1 = MockRMAppSubmitter.submit(rm, ambiguousQueueAppData);
|
||||
rm.waitForState(app1.getApplicationId(), state);
|
||||
return app1;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppBasic() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
|
|
|
@ -1209,8 +1209,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
|
|||
|
||||
protected LeafQueue createQueue(String queuePath) throws YarnException,
|
||||
IOException {
|
||||
return autoQueueHandler.createQueue(
|
||||
CSQueueUtils.extractQueuePath(queuePath));
|
||||
return autoQueueHandler.createQueue(new QueuePath(queuePath));
|
||||
}
|
||||
|
||||
private void assertQueueMinResource(CSQueue queue, float expected) {
|
||||
|
|
|
@ -42,11 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueTemplate;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
|
@ -359,8 +359,7 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
|
||||
private LeafQueue createQueue(String queuePath) throws YarnException,
|
||||
IOException {
|
||||
return autoQueueHandler.createQueue(
|
||||
CSQueueUtils.extractQueuePath(queuePath));
|
||||
return autoQueueHandler.createQueue(new QueuePath(queuePath));
|
||||
}
|
||||
|
||||
private JSONObject sendRequestToSchedulerEndpoint() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue