YARN-3940. Application moveToQueue should check NodeLabel permission. Contributed by Bibin A Chundatt
(cherry picked from commit 46e02ab719
)
This commit is contained in:
parent
8d467037cf
commit
b952ce854b
|
@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
|
@ -1852,6 +1853,7 @@ public class CapacityScheduler extends
|
||||||
LeafQueue dest = getAndCheckLeafQueue(destQueueName);
|
LeafQueue dest = getAndCheckLeafQueue(destQueueName);
|
||||||
// Validation check - ACLs, submission limits for user & queue
|
// Validation check - ACLs, submission limits for user & queue
|
||||||
String user = app.getUser();
|
String user = app.getUser();
|
||||||
|
checkQueuePartition(app, dest);
|
||||||
try {
|
try {
|
||||||
dest.submitApplication(appId, user, destQueueName);
|
dest.submitApplication(appId, user, destQueueName);
|
||||||
} catch (AccessControlException e) {
|
} catch (AccessControlException e) {
|
||||||
|
@ -1876,6 +1878,39 @@ public class CapacityScheduler extends
|
||||||
return targetQueueName;
|
return targetQueueName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check application can be moved to queue with labels enabled. All labels in
|
||||||
|
* application life time will be checked
|
||||||
|
*
|
||||||
|
* @param appId
|
||||||
|
* @param dest
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest)
|
||||||
|
throws YarnException {
|
||||||
|
if (!YarnConfiguration.areNodeLabelsEnabled(conf)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Set<String> targetqueuelabels = dest.getAccessibleNodeLabels();
|
||||||
|
AppSchedulingInfo schedulingInfo = app.getAppSchedulingInfo();
|
||||||
|
Set<String> appLabelexpressions = schedulingInfo.getRequestedPartitions();
|
||||||
|
// default partition access always available remove empty label
|
||||||
|
appLabelexpressions.remove(RMNodeLabelsManager.NO_LABEL);
|
||||||
|
Set<String> nonAccessiblelabels = new HashSet<String>();
|
||||||
|
for (String label : appLabelexpressions) {
|
||||||
|
if (!SchedulerUtils.checkQueueLabelExpression(targetqueuelabels, label,
|
||||||
|
null)) {
|
||||||
|
nonAccessiblelabels.add(label);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nonAccessiblelabels.size() > 0) {
|
||||||
|
throw new YarnException(
|
||||||
|
"Specified queue=" + dest.getQueueName() + " can't satisfy following "
|
||||||
|
+ "apps label expressions =" + nonAccessiblelabels
|
||||||
|
+ " accessible node labels =" + targetqueuelabels);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check that the String provided in input is the name of an existing,
|
* Check that the String provided in input is the name of an existing,
|
||||||
* LeafQueue, if successful returns the queue.
|
* LeafQueue, if successful returns the queue.
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
@ -86,7 +89,53 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
||||||
|
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Configuration getConfigurationWithSubQueueLabels(
|
||||||
|
Configuration config) {
|
||||||
|
CapacitySchedulerConfiguration conf2 =
|
||||||
|
new CapacitySchedulerConfiguration(config);
|
||||||
|
|
||||||
|
// Define top-level queues
|
||||||
|
conf2.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {"a", "b"});
|
||||||
|
conf2.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||||
|
conf2.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
|
||||||
|
|
||||||
|
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
|
final String aa1 = a + ".a1";
|
||||||
|
final String aa2 = a + ".a2";
|
||||||
|
final String aa3 = a + ".a3";
|
||||||
|
final String aa4 = a + ".a4";
|
||||||
|
conf2.setQueues(a, new String[] {"a1", "a2", "a3", "a4"});
|
||||||
|
conf2.setCapacity(a, 50);
|
||||||
|
conf2.setCapacity(b, 50);
|
||||||
|
conf2.setCapacity(aa1, 40);
|
||||||
|
conf2.setCapacity(aa2, 20);
|
||||||
|
conf2.setCapacity(aa3, 20);
|
||||||
|
conf2.setCapacity(aa4, 20);
|
||||||
|
conf2.setAccessibleNodeLabels(a, ImmutableSet.of("x", "y", "z"));
|
||||||
|
conf2.setAccessibleNodeLabels(aa1, ImmutableSet.of("x", "y"));
|
||||||
|
conf2.setAccessibleNodeLabels(aa2, ImmutableSet.of("y"));
|
||||||
|
conf2.setAccessibleNodeLabels(aa3, ImmutableSet.of("x", "y", "z"));
|
||||||
|
conf2.setAccessibleNodeLabels(aa4, ImmutableSet.of("x", "y"));
|
||||||
|
conf2.setCapacityByLabel(a, "x", 50);
|
||||||
|
conf2.setCapacityByLabel(a, "y", 50);
|
||||||
|
conf2.setCapacityByLabel(a, "z", 50);
|
||||||
|
conf2.setCapacityByLabel(b, "x", 50);
|
||||||
|
conf2.setCapacityByLabel(b, "y", 50);
|
||||||
|
conf2.setCapacityByLabel(b, "z", 50);
|
||||||
|
conf2.setCapacityByLabel(aa1, "x", 50);
|
||||||
|
conf2.setCapacityByLabel(aa3, "x", 25);
|
||||||
|
conf2.setCapacityByLabel(aa4, "x", 25);
|
||||||
|
conf2.setCapacityByLabel(aa1, "y", 25);
|
||||||
|
conf2.setCapacityByLabel(aa2, "y", 25);
|
||||||
|
conf2.setCapacityByLabel(aa4, "y", 50);
|
||||||
|
conf2.setCapacityByLabel(aa3, "z", 50);
|
||||||
|
conf2.setCapacityByLabel(aa4, "z", 50);
|
||||||
|
return conf2;
|
||||||
|
}
|
||||||
|
|
||||||
private Set<String> toSet(String... elements) {
|
private Set<String> toSet(String... elements) {
|
||||||
Set<String> set = Sets.newHashSet(elements);
|
Set<String> set = Sets.newHashSet(elements);
|
||||||
return set;
|
return set;
|
||||||
|
@ -364,6 +413,71 @@ public class TestCapacitySchedulerNodeLabelUpdate {
|
||||||
rm.close();
|
rm.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 3000000)
|
||||||
|
public void testMoveApplicationWithLabel() throws Exception {
|
||||||
|
// set node -> label
|
||||||
|
mgr.addToCluserNodeLabelsWithDefaultExclusivity(
|
||||||
|
ImmutableSet.of("x", "y", "z"));
|
||||||
|
// set mapping:
|
||||||
|
// h1 -> x
|
||||||
|
// h2 -> y
|
||||||
|
mgr.addLabelsToNode(
|
||||||
|
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
||||||
|
mgr.addLabelsToNode(
|
||||||
|
ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
|
||||||
|
mgr.addLabelsToNode(
|
||||||
|
ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("z")));
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm = new MockRM(getConfigurationWithSubQueueLabels(conf)) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
|
||||||
|
rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
|
||||||
|
rm.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("h1:1234", 4096 * 2);
|
||||||
|
MockNM nm2 = rm.registerNode("h2:1234", 4096 * 2);
|
||||||
|
MockNM nm3 = rm.registerNode("h3:1234", 4096 * 2);
|
||||||
|
MockNM nm4 = rm.registerNode("h4:1234", 4096 * 2);
|
||||||
|
// launch an app to queue a1 (label = x), and check all container will
|
||||||
|
// be allocated in h1
|
||||||
|
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a1");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
|
||||||
|
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
|
||||||
|
ContainerId container1 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm.waitForState(nm1, container1, RMContainerState.ALLOCATED, 10 * 1000);
|
||||||
|
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
|
||||||
|
ContainerId container2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||||
|
rm.waitForState(nm2, container2, RMContainerState.ALLOCATED, 10 * 1000);
|
||||||
|
CapacityScheduler scheduler =
|
||||||
|
((CapacityScheduler) rm.getResourceScheduler());
|
||||||
|
try {
|
||||||
|
scheduler.moveApplication(app1.getApplicationId(), "a2");
|
||||||
|
fail("Should throw exception since target queue doesnt have "
|
||||||
|
+ "required labels");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.assertTrue("Yarn Exception should be thrown",
|
||||||
|
e instanceof YarnException);
|
||||||
|
Assert.assertEquals("Specified queue=a2 can't satisfy "
|
||||||
|
+ "following apps label expressions =[x] accessible "
|
||||||
|
+ "node labels =[y]", e.getMessage());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
scheduler.moveApplication(app1.getApplicationId(), "a3");
|
||||||
|
scheduler.moveApplication(app1.getApplicationId(), "a4");
|
||||||
|
// Check move to queue with accessible label ANY
|
||||||
|
scheduler.moveApplication(app1.getApplicationId(), "b");
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Should not throw exception since target queue has "
|
||||||
|
+ "required labels");
|
||||||
|
}
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testComplexResourceUsageWhenNodeUpdatesPartition()
|
public void testComplexResourceUsageWhenNodeUpdatesPartition()
|
||||||
|
|
Loading…
Reference in New Issue