YARN-11107. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly. Contributed by zhangxiping1
This commit is contained in:
parent
a7b4e8f03e
commit
3b46aae977
|
@ -351,7 +351,21 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
|
||||||
((AbstractYarnScheduler)getScheduler())
|
((AbstractYarnScheduler)getScheduler())
|
||||||
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
|
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
|
||||||
|
|
||||||
|
String label="";
|
||||||
|
try {
|
||||||
|
label = rmContext.getScheduler()
|
||||||
|
.getQueueInfo(app.getQueue(), false, false)
|
||||||
|
.getDefaultNodeLabelExpression();
|
||||||
|
} catch (Exception e){
|
||||||
|
//Queue may not exist since it could be auto-created in case of
|
||||||
|
// dynamic queues
|
||||||
|
}
|
||||||
|
|
||||||
|
if (label == null || label.equals("")) {
|
||||||
response.setNumClusterNodes(getScheduler().getNumClusterNodes());
|
response.setNumClusterNodes(getScheduler().getNumClusterNodes());
|
||||||
|
} else {
|
||||||
|
response.setNumClusterNodes(rmContext.getNodeLabelManager().getActiveNMCountPerLabel(label));
|
||||||
|
}
|
||||||
|
|
||||||
// add collector address for this application
|
// add collector address for this application
|
||||||
if (timelineServiceV2Enabled) {
|
if (timelineServiceV2Enabled) {
|
||||||
|
|
|
@ -16,36 +16,46 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
||||||
.RMContainerEvent;
|
.RMContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSet;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -208,4 +218,157 @@ public class TestApplicationMasterServiceCapacity extends
|
||||||
Assert.assertEquals(appPriority2, response2.getApplicationPriority());
|
Assert.assertEquals(appPriority2, response2.getApplicationPriority());
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testGetNMNumInAllocatedResponseWithOutNodeLabel() throws Exception {
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// Register node1 node2 node3 node4
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 6 * GB);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:1234", 6 * GB);
|
||||||
|
MockNM nm3 = rm.registerNode("host3:1234", 6 * GB);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
|
||||||
|
.createWithMemory(2048, rm)
|
||||||
|
.build();
|
||||||
|
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
nm3.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||||
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
|
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
|
||||||
|
List<ContainerId> release = new ArrayList<ContainerId>();
|
||||||
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
|
allocateRequest.setReleaseList(release);
|
||||||
|
allocateRequest.setAskList(ask);
|
||||||
|
|
||||||
|
AllocateResponse response1 = am1.allocate(allocateRequest);
|
||||||
|
Assert.assertEquals(3, response1.getNumClusterNodes());
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config);
|
||||||
|
|
||||||
|
// Define top-level queues
|
||||||
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
|
||||||
|
|
||||||
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
conf.setCapacity(A, 50);
|
||||||
|
conf.setMaximumCapacity(A, 100);
|
||||||
|
conf.setAccessibleNodeLabels(A, toSet("x"));
|
||||||
|
conf.setDefaultNodeLabelExpression(A, "x");
|
||||||
|
conf.setCapacityByLabel(A, "x", 100);
|
||||||
|
|
||||||
|
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
|
conf.setCapacity(B, 50);
|
||||||
|
conf.setMaximumCapacity(B, 100);
|
||||||
|
conf.setAccessibleNodeLabels(B, toSet("y"));
|
||||||
|
conf.setDefaultNodeLabelExpression(B, "y");
|
||||||
|
conf.setCapacityByLabel(B, "y", 100);
|
||||||
|
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testGetNMNumInAllocatedResponseWithNodeLabel() throws Exception {
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
|
||||||
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||||
|
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// add node label "x","y" and set node to label mapping
|
||||||
|
Set<String> clusterNodeLabels = new HashSet<String>();
|
||||||
|
clusterNodeLabels.add("x");
|
||||||
|
clusterNodeLabels.add("y");
|
||||||
|
|
||||||
|
RMNodeLabelsManager nodeLabelManager = rm.getRMContext().getNodeLabelManager();
|
||||||
|
nodeLabelManager.
|
||||||
|
addToCluserNodeLabelsWithDefaultExclusivity(clusterNodeLabels);
|
||||||
|
|
||||||
|
//has 3 nodes with node label "x",1 node with node label "y"
|
||||||
|
nodeLabelManager
|
||||||
|
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host1", 1234), toSet("x")));
|
||||||
|
nodeLabelManager
|
||||||
|
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host2", 1234), toSet("x")));
|
||||||
|
nodeLabelManager
|
||||||
|
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host3", 1234), toSet("x")));
|
||||||
|
nodeLabelManager
|
||||||
|
.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host4", 1234), toSet("y")));
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// Register node1 node2 node3 node4
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 6 * GB);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:1234", 6 * GB);
|
||||||
|
MockNM nm3 = rm.registerNode("host3:1234", 6 * GB);
|
||||||
|
MockNM nm4 = rm.registerNode("host4:1234", 6 * GB);
|
||||||
|
|
||||||
|
// submit an application to queue root.a expression as "x"
|
||||||
|
MockRMAppSubmissionData data1 = MockRMAppSubmissionData.Builder
|
||||||
|
.createWithMemory(2048, rm)
|
||||||
|
.withAppName("someApp1")
|
||||||
|
.withUser("someUser")
|
||||||
|
.withQueue("root.a")
|
||||||
|
.build();
|
||||||
|
RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||||
|
|
||||||
|
// submit an application to queue root.b expression as "y"
|
||||||
|
MockRMAppSubmissionData data2 = MockRMAppSubmissionData.Builder
|
||||||
|
.createWithMemory(2048, rm)
|
||||||
|
.withAppName("someApp2")
|
||||||
|
.withUser("someUser")
|
||||||
|
.withQueue("root.b")
|
||||||
|
.build();
|
||||||
|
RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm4);
|
||||||
|
|
||||||
|
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
|
||||||
|
List<ContainerId> release = new ArrayList<ContainerId>();
|
||||||
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
|
allocateRequest.setReleaseList(release);
|
||||||
|
allocateRequest.setAskList(ask);
|
||||||
|
|
||||||
|
AllocateResponse response1 = am1.allocate(allocateRequest);
|
||||||
|
AllocateResponse response2 = am2.allocate(allocateRequest);
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||||
|
RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
|
||||||
|
RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
|
||||||
|
|
||||||
|
// Do node heartbeats many times
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode4));
|
||||||
|
}
|
||||||
|
|
||||||
|
//has 3 nodes with node label "x"
|
||||||
|
Assert.assertEquals(3, response1.getNumClusterNodes());
|
||||||
|
|
||||||
|
//has 1 node with node label "y"
|
||||||
|
Assert.assertEquals(1, response2.getNumClusterNodes());
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue