YARN-6156. AM blacklisting to consider node label partition (Bibin A Chundatt via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-02-15 14:48:17 +05:30
parent cd3e59a3dc
commit b7613e0f40
5 changed files with 118 additions and 9 deletions

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
.RMAppAttemptState; .RMAppAttemptState;
@ -561,4 +562,25 @@ public class RMServerUtils {
} }
return newApplicationTimeout; return newApplicationTimeout;
} }
/**
* Get applicable Node count for AM.
*
* @param rmContext context
* @param conf configuration
* @param amreq am resource request
* @return applicable node count
*/
public static int getApplicableNodeCountForAM(RMContext rmContext,
Configuration conf, ResourceRequest amreq) {
if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager();
String amNodeLabelExpression = amreq.getNodeLabelExpression();
amNodeLabelExpression = (amNodeLabelExpression == null
|| amNodeLabelExpression.trim().isEmpty())
? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression;
return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression);
}
return rmContext.getScheduler().getNumClusterNodes();
}
} }

View File

@ -350,6 +350,22 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
} }
} }
/*
* Get active node count based on label.
*/
public int getActiveNMCountPerLabel(String label) {
if (label == null) {
return 0;
}
try {
readLock.lock();
RMNodeLabel labelInfo = labelCollections.get(label);
return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs();
} finally {
readLock.unlock();
}
}
public Set<String> getLabelsOnNode(NodeId nodeId) { public Set<String> getLabelsOnNode(NodeId nodeId) {
try { try {
readLock.lock(); readLock.lock();

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp; package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -46,9 +44,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -987,9 +983,11 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transfer over the blacklist from the previous app-attempt. // Transfer over the blacklist from the previous app-attempt.
currentAMBlacklistManager = currentAttempt.getAMBlacklistManager(); currentAMBlacklistManager = currentAttempt.getAMBlacklistManager();
} else { } else {
if (amBlacklistingEnabled) { if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) {
currentAMBlacklistManager = new SimpleBlacklistManager( currentAMBlacklistManager = new SimpleBlacklistManager(
scheduler.getNumClusterNodes(), blacklistDisableThreshold); RMServerUtils.getApplicableNodeCountForAM(rmContext, conf,
getAMResourceRequest()),
blacklistDisableThreshold);
} else { } else {
currentAMBlacklistManager = new DisabledBlacklistManager(); currentAMBlacklistManager = new DisabledBlacklistManager();
} }
@ -1006,7 +1004,7 @@ public class RMAppImpl implements RMApp, Recoverable {
attempts.put(appAttemptId, attempt); attempts.put(appAttemptId, attempt);
currentAttempt = attempt; currentAttempt = attempt;
} }
private void private void
createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt(); createNewAttempt();

View File

@ -1057,7 +1057,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.amReq.setRelaxLocality(true); appAttempt.amReq.setRelaxLocality(true);
appAttempt.getAMBlacklistManager().refreshNodeHostCount( appAttempt.getAMBlacklistManager().refreshNodeHostCount(
appAttempt.scheduler.getNumClusterNodes()); RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext,
appAttempt.conf, appAttempt.amReq));
ResourceBlacklistRequest amBlacklist = ResourceBlacklistRequest amBlacklist =
appAttempt.getAMBlacklistManager().getBlacklistUpdates(); appAttempt.getAMBlacklistManager().getBlacklistUpdates();
@ -1246,7 +1247,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
} }
private void rememberTargetTransitions(RMAppAttemptEvent event, private void rememberTargetTransitions(RMAppAttemptEvent event,
Object transitionToDo, RMAppAttemptState targetFinalState) { Object transitionToDo, RMAppAttemptState targetFinalState) {
transitionTodo = transitionToDo; transitionTodo = transitionToDo;

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -721,6 +723,77 @@ public class TestCapacitySchedulerNodeLabelUpdate {
rm.close(); rm.close();
} }
@Test(timeout = 30000)
public void testBlacklistAMDisableLabel() throws Exception {
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
true);
conf.setFloat(
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
0.5f);
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("x"),
NodeId.newInstance("h3", 0), toSet("x"), NodeId.newInstance("h6", 0),
toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("y"),
NodeId.newInstance("h5", 0), toSet("y"), NodeId.newInstance("h7", 0),
toSet("y")));
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
// Nodes in label default h1,h8,h9
// Nodes in label x h2,h3,h6
// Nodes in label y h4,h5,h7
MockNM nm1 = rm.registerNode("h1:1234", 2048);
MockNM nm2 = rm.registerNode("h2:1234", 2048);
rm.registerNode("h3:1234", 2048);
rm.registerNode("h4:1234", 2048);
rm.registerNode("h5:1234", 2048);
rm.registerNode("h6:1234", 2048);
rm.registerNode("h7:1234", 2048);
rm.registerNode("h8:1234", 2048);
rm.registerNode("h9:1234", 2048);
// Submit app with AM container launched on default partition i.e. h1.
RMApp app = rm.submitApp(GB, "app", "user", null, "a");
MockRM.launchAndRegisterAM(app, rm, nm1);
RMAppAttempt appAttempt = app.getCurrentAppAttempt();
// Add default node blacklist from default
appAttempt.getAMBlacklistManager().addNode("h1");
ResourceBlacklistRequest blacklistUpdates =
appAttempt.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(1, blacklistUpdates.getBlacklistAdditions().size());
Assert.assertEquals(0, blacklistUpdates.getBlacklistRemovals().size());
// Adding second node from default parition
appAttempt.getAMBlacklistManager().addNode("h8");
blacklistUpdates = appAttempt.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(0, blacklistUpdates.getBlacklistAdditions().size());
Assert.assertEquals(2, blacklistUpdates.getBlacklistRemovals().size());
// Submission in label x
RMApp applabel = rm.submitApp(GB, "app", "user", null, "a", "x");
MockRM.launchAndRegisterAM(applabel, rm, nm2);
RMAppAttempt appAttemptlabelx = applabel.getCurrentAppAttempt();
appAttemptlabelx.getAMBlacklistManager().addNode("h2");
ResourceBlacklistRequest blacklistUpdatesOnx =
appAttemptlabelx.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(1, blacklistUpdatesOnx.getBlacklistAdditions().size());
Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistRemovals().size());
// Adding second node from default parition
appAttemptlabelx.getAMBlacklistManager().addNode("h3");
blacklistUpdatesOnx =
appAttempt.getAMBlacklistManager().getBlacklistUpdates();
Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistAdditions().size());
Assert.assertEquals(2, blacklistUpdatesOnx.getBlacklistRemovals().size());
rm.close();
}
private void checkAMResourceLimit(MockRM rm, String queuename, int memory, private void checkAMResourceLimit(MockRM rm, String queuename, int memory,
String label) throws InterruptedException { String label) throws InterruptedException {
Assert.assertEquals(memory, Assert.assertEquals(memory,