YARN-3241. FairScheduler handles invalid queue names inconsistently. (Zhihai Xu via kasha)
This commit is contained in:
parent
6ca1f12024
commit
2bc097cd14
|
@ -94,6 +94,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3269. Yarn.nodemanager.remote-app-log-dir could not be configured to
|
YARN-3269. Yarn.nodemanager.remote-app-log-dir could not be configured to
|
||||||
fully qualified path. (Xuan Gong via junping_du)
|
fully qualified path. (Xuan Gong via junping_du)
|
||||||
|
|
||||||
|
YARN-3241. FairScheduler handles "invalid" queue names inconsistently.
|
||||||
|
(Zhihai Xu via kasha)
|
||||||
|
|
||||||
Release 2.7.0 - UNRELEASED
|
Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -426,13 +426,19 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
Map<FSQueueType, Set<String>> configuredQueues,
|
Map<FSQueueType, Set<String>> configuredQueues,
|
||||||
Set<String> reservableQueues)
|
Set<String> reservableQueues)
|
||||||
throws AllocationConfigurationException {
|
throws AllocationConfigurationException {
|
||||||
String queueName = element.getAttribute("name");
|
String queueName = element.getAttribute("name").trim();
|
||||||
|
|
||||||
if (queueName.contains(".")) {
|
if (queueName.contains(".")) {
|
||||||
throw new AllocationConfigurationException("Bad fair scheduler config "
|
throw new AllocationConfigurationException("Bad fair scheduler config "
|
||||||
+ "file: queue name (" + queueName + ") shouldn't contain period.");
|
+ "file: queue name (" + queueName + ") shouldn't contain period.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (queueName.isEmpty()) {
|
||||||
|
throw new AllocationConfigurationException("Bad fair scheduler config "
|
||||||
|
+ "file: queue name shouldn't be empty or "
|
||||||
|
+ "consist only of whitespace.");
|
||||||
|
}
|
||||||
|
|
||||||
if (parentName != null) {
|
if (parentName != null) {
|
||||||
queueName = parentName + "." + queueName;
|
queueName = parentName + "." + queueName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -701,6 +701,8 @@ public class FairScheduler extends
|
||||||
appRejectMsg = queueName + " is not a leaf queue";
|
appRejectMsg = queueName + " is not a leaf queue";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (InvalidQueueNameException qne) {
|
||||||
|
appRejectMsg = qne.getMessage();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
appRejectMsg = "Error assigning app to queue " + queueName;
|
appRejectMsg = "Error assigning app to queue " + queueName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown when Queue Name is malformed.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public class InvalidQueueNameException extends IllegalArgumentException {
|
||||||
|
private static final long serialVersionUID = -7306320927804540011L;
|
||||||
|
|
||||||
|
public InvalidQueueNameException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public InvalidQueueNameException(String message, Throwable t) {
|
||||||
|
super(message, t);
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
/**
|
/**
|
||||||
* Maintains a list of queues as well as scheduling parameters for each queue,
|
* Maintains a list of queues as well as scheduling parameters for each queue,
|
||||||
* such as guaranteed share allocations, from the fair scheduler config file.
|
* such as guaranteed share allocations, from the fair scheduler config file.
|
||||||
|
@ -155,7 +156,13 @@ public class QueueManager {
|
||||||
|
|
||||||
// Move up the queue tree until we reach one that exists.
|
// Move up the queue tree until we reach one that exists.
|
||||||
while (sepIndex != -1) {
|
while (sepIndex != -1) {
|
||||||
|
int prevSepIndex = sepIndex;
|
||||||
sepIndex = name.lastIndexOf('.', sepIndex-1);
|
sepIndex = name.lastIndexOf('.', sepIndex-1);
|
||||||
|
String node = name.substring(sepIndex+1, prevSepIndex);
|
||||||
|
if (!isQueueNameValid(node)) {
|
||||||
|
throw new InvalidQueueNameException("Illegal node name at offset " +
|
||||||
|
(sepIndex+1) + " for queue name " + name);
|
||||||
|
}
|
||||||
FSQueue queue;
|
FSQueue queue;
|
||||||
String curName = null;
|
String curName = null;
|
||||||
curName = name.substring(0, sepIndex);
|
curName = name.substring(0, sepIndex);
|
||||||
|
@ -401,4 +408,13 @@ public class QueueManager {
|
||||||
// recursively
|
// recursively
|
||||||
rootQueue.updatePreemptionVariables();
|
rootQueue.updatePreemptionVariables();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether queue name is valid,
|
||||||
|
* return true if it is valid, otherwise return false.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isQueueNameValid(String node) {
|
||||||
|
return !node.isEmpty() && node.equals(node.trim());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -551,6 +551,29 @@ public class TestAllocationFileLoaderService {
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that you can't have the queue name with whitespace only in the
|
||||||
|
* allocations file.
|
||||||
|
*/
|
||||||
|
@Test (expected = AllocationConfigurationException.class)
|
||||||
|
public void testQueueNameContainingOnlyWhitespace() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\" \">");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
|
allocLoader.reloadAllocations();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReservableQueue() throws Exception {
|
public void testReservableQueue() throws Exception {
|
||||||
|
|
|
@ -4444,4 +4444,82 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
assertEquals("Incorrect number of perf metrics", 1,
|
assertEquals("Incorrect number of perf metrics", 1,
|
||||||
collector.getRecords().size());
|
collector.getRecords().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueNameWithTrailingSpace() throws Exception {
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// only default queue
|
||||||
|
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
|
||||||
|
// submit app with queue name "A"
|
||||||
|
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
|
||||||
|
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
|
||||||
|
appAttemptId1.getApplicationId(), "A", "user1");
|
||||||
|
scheduler.handle(appAddedEvent1);
|
||||||
|
// submission accepted
|
||||||
|
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
assertNotNull(scheduler.getSchedulerApplications().get(appAttemptId1.
|
||||||
|
getApplicationId()));
|
||||||
|
|
||||||
|
AppAttemptAddedSchedulerEvent attempAddedEvent =
|
||||||
|
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
|
||||||
|
scheduler.handle(attempAddedEvent);
|
||||||
|
// That queue should have one app
|
||||||
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("A", true)
|
||||||
|
.getNumRunnableApps());
|
||||||
|
assertNotNull(scheduler.getSchedulerApp(appAttemptId1));
|
||||||
|
|
||||||
|
// submit app with queue name "A "
|
||||||
|
ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
|
||||||
|
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
|
||||||
|
appAttemptId2.getApplicationId(), "A ", "user1");
|
||||||
|
scheduler.handle(appAddedEvent2);
|
||||||
|
// submission rejected
|
||||||
|
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
assertNull(scheduler.getSchedulerApplications().get(appAttemptId2.
|
||||||
|
getApplicationId()));
|
||||||
|
assertNull(scheduler.getSchedulerApp(appAttemptId2));
|
||||||
|
|
||||||
|
// submit app with queue name "B.C"
|
||||||
|
ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
|
||||||
|
AppAddedSchedulerEvent appAddedEvent3 = new AppAddedSchedulerEvent(
|
||||||
|
appAttemptId3.getApplicationId(), "B.C", "user1");
|
||||||
|
scheduler.handle(appAddedEvent3);
|
||||||
|
// submission accepted
|
||||||
|
assertEquals(3, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
assertNotNull(scheduler.getSchedulerApplications().get(appAttemptId3.
|
||||||
|
getApplicationId()));
|
||||||
|
|
||||||
|
attempAddedEvent =
|
||||||
|
new AppAttemptAddedSchedulerEvent(appAttemptId3, false);
|
||||||
|
scheduler.handle(attempAddedEvent);
|
||||||
|
// That queue should have one app
|
||||||
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("B.C", true)
|
||||||
|
.getNumRunnableApps());
|
||||||
|
assertNotNull(scheduler.getSchedulerApp(appAttemptId3));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyQueueNameInConfigFile() throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
// set empty queue name
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"\">");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
try {
|
||||||
|
scheduler.init(conf);
|
||||||
|
Assert.fail("scheduler init should fail because" +
|
||||||
|
" empty queue name.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.assertTrue(e.getMessage().contains(
|
||||||
|
"Failed to initialize FairScheduler"));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,6 +124,17 @@ public class TestQueueManager {
|
||||||
.getChildQueues().isEmpty());
|
.getChildQueues().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckQueueNodeName() {
|
||||||
|
assertFalse(queueManager.isQueueNameValid(""));
|
||||||
|
assertFalse(queueManager.isQueueNameValid(" "));
|
||||||
|
assertFalse(queueManager.isQueueNameValid(" a"));
|
||||||
|
assertFalse(queueManager.isQueueNameValid("a "));
|
||||||
|
assertFalse(queueManager.isQueueNameValid(" a "));
|
||||||
|
assertTrue(queueManager.isQueueNameValid("a b"));
|
||||||
|
assertTrue(queueManager.isQueueNameValid("a"));
|
||||||
|
}
|
||||||
|
|
||||||
private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLeafQueues) {
|
private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLeafQueues) {
|
||||||
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
|
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
|
||||||
allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
|
allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
|
||||||
|
|
Loading…
Reference in New Issue