YARN-4106. NodeLabels for NM in distributed mode is not updated even after clusterNodelabel addition in RM. (Bibin A Chundatt via wangda)

This commit is contained in:
Wangda Tan 2015-09-10 09:30:09 -07:00
parent 8e615588d5
commit 77666105b4
5 changed files with 74 additions and 41 deletions

View File

@ -824,6 +824,9 @@ Release 2.8.0 - UNRELEASED
YARN-3591. Resource localization on a bad disk causes subsequent containers failure. YARN-3591. Resource localization on a bad disk causes subsequent containers failure.
(Lavkesh Lahngir via vvasudev) (Lavkesh Lahngir via vvasudev)
YARN-4106. NodeLabels for NM in distributed mode is not updated even after
clusterNodelabel addition in RM. (Bibin A Chundatt via wangda)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -897,6 +897,9 @@ private NMDistributedNodeLabelsHandler(
private final NodeLabelsProvider nodeLabelsProvider; private final NodeLabelsProvider nodeLabelsProvider;
private Set<NodeLabel> previousNodeLabels; private Set<NodeLabel> previousNodeLabels;
private boolean updatedLabelsSentToRM; private boolean updatedLabelsSentToRM;
private long lastNodeLabelSendFailMills = 0L;
// TODO : Need to check which conf to use.Currently setting as 1 min
private static final long FAILEDLABELRESENDINTERVAL = 60000;
@Override @Override
public Set<NodeLabel> getNodeLabelsForRegistration() { public Set<NodeLabel> getNodeLabelsForRegistration() {
@ -938,12 +941,15 @@ public Set<NodeLabel> getNodeLabelsForHeartbeat() {
// take some action only on modification of labels // take some action only on modification of labels
boolean areNodeLabelsUpdated = boolean areNodeLabelsUpdated =
nodeLabelsForHeartbeat.size() != previousNodeLabels.size() nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat); || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat)
|| checkResendLabelOnFailure();
updatedLabelsSentToRM = false; updatedLabelsSentToRM = false;
if (areNodeLabelsUpdated) { if (areNodeLabelsUpdated) {
previousNodeLabels = nodeLabelsForHeartbeat; previousNodeLabels = nodeLabelsForHeartbeat;
try { try {
LOG.info("Modified labels from provider: "
+ StringUtils.join(",", previousNodeLabels));
validateNodeLabels(nodeLabelsForHeartbeat); validateNodeLabels(nodeLabelsForHeartbeat);
updatedLabelsSentToRM = true; updatedLabelsSentToRM = true;
} catch (IOException e) { } catch (IOException e) {
@ -980,16 +986,33 @@ private void validateNodeLabels(Set<NodeLabel> nodeLabelsForHeartbeat)
} }
} }
/*
* In case of failure when RM doesnt accept labels need to resend Labels to
* RM. This method checks whether we need to resend
*/
public boolean checkResendLabelOnFailure() {
if (lastNodeLabelSendFailMills > 0L) {
long lastFailTimePassed =
System.currentTimeMillis() - lastNodeLabelSendFailMills;
if (lastFailTimePassed > FAILEDLABELRESENDINTERVAL) {
return true;
}
}
return false;
}
@Override @Override
public void verifyRMHeartbeatResponseForNodeLabels( public void verifyRMHeartbeatResponseForNodeLabels(
NodeHeartbeatResponse response) { NodeHeartbeatResponse response) {
if (updatedLabelsSentToRM) { if (updatedLabelsSentToRM) {
if (response.getAreNodeLabelsAcceptedByRM()) { if (response.getAreNodeLabelsAcceptedByRM()) {
lastNodeLabelSendFailMills = 0L;
LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels) LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were Accepted by RM "); + "} were Accepted by RM ");
} else { } else {
// case where updated labels from NodeLabelsProvider is sent to RM and // case where updated labels from NodeLabelsProvider is sent to RM and
// RM rejected the labels // RM rejected the labels
lastNodeLabelSendFailMills = System.currentTimeMillis();
LOG.error( LOG.error(
"NM node labels {" + StringUtils.join(",", previousNodeLabels) "NM node labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were not accepted by RM and message from RM : " + "} were not accepted by RM and message from RM : "

View File

@ -30,8 +30,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Provides base implementation of NodeLabelsProvider with Timer and expects * Provides base implementation of NodeLabelsProvider with Timer and expects
* subclass to provide TimerTask which can fetch NodeLabels * subclass to provide TimerTask which can fetch NodeLabels
@ -55,8 +53,6 @@ public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider {
protected Set<NodeLabel> nodeLabels = protected Set<NodeLabel> nodeLabels =
CommonNodeLabelsManager.EMPTY_NODELABEL_SET; CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
@VisibleForTesting
long startTime = 0;
public AbstractNodeLabelsProvider(String name) { public AbstractNodeLabelsProvider(String name) {
super(name); super(name);
@ -77,12 +73,13 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
timerTask = createTimerTask(); timerTask = createTimerTask();
timerTask.run();
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) { if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
nodeLabelsScheduler = nodeLabelsScheduler =
new Timer("DistributedNodeLabelsRunner-Timer", true); new Timer("DistributedNodeLabelsRunner-Timer", true);
// Start the timer task and then periodically at the configured interval // Start the timer task and then periodically at the configured interval
// time. Illegal values for intervalTime is handled by timer api // time. Illegal values for intervalTime is handled by timer api
nodeLabelsScheduler.scheduleAtFixedRate(timerTask, startTime, nodeLabelsScheduler.scheduleAtFixedRate(timerTask, intervalTime,
intervalTime); intervalTime);
} }
super.serviceStart(); super.serviceStart();

View File

@ -20,7 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.TimerTask; import java.util.TimerTask;
@ -41,16 +40,6 @@ public class ConfigurationNodeLabelsProvider extends AbstractNodeLabelsProvider
public ConfigurationNodeLabelsProvider() { public ConfigurationNodeLabelsProvider() {
super("Configuration Based NodeLabels Provider"); super("Configuration Based NodeLabels Provider");
} }
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
// In case timer is not configured avoid calling timertask.run thus avoiding
// unnecessary creation of YarnConfiguration Object
updateNodeLabelsFromConfig(conf);
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
startTime = new Date().getTime() + intervalTime;
}
}
private void updateNodeLabelsFromConfig(Configuration conf) private void updateNodeLabelsFromConfig(Configuration conf)
throws IOException { throws IOException {

View File

@ -25,14 +25,17 @@
import java.net.URL; import java.net.URL;
import java.util.TimerTask; import java.util.TimerTask;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase { public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
@ -48,13 +51,17 @@ public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
private ConfigurationNodeLabelsProvider nodeLabelsProvider; private ConfigurationNodeLabelsProvider nodeLabelsProvider;
@Before @BeforeClass
public void setup() { public static void create() {
loader = loader =
new XMLPathClassLoader( new XMLPathClassLoader(
TestConfigurationNodeLabelsProvider.class.getClassLoader()); TestConfigurationNodeLabelsProvider.class.getClassLoader());
testRootDir.mkdirs(); testRootDir.mkdirs();
Thread.currentThread().setContextClassLoader(loader);
}
@Before
public void setup() {
nodeLabelsProvider = new ConfigurationNodeLabelsProvider(); nodeLabelsProvider = new ConfigurationNodeLabelsProvider();
} }
@ -62,44 +69,43 @@ public void setup() {
public void tearDown() throws Exception { public void tearDown() throws Exception {
if (nodeLabelsProvider != null) { if (nodeLabelsProvider != null) {
nodeLabelsProvider.close(); nodeLabelsProvider.close();
nodeLabelsProvider.stop();
} }
}
@AfterClass
public static void remove() throws Exception {
if (testRootDir.exists()) { if (testRootDir.exists()) {
FileContext.getLocalFSFileContext().delete( FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true); new Path(testRootDir.getAbsolutePath()), true);
} }
} }
private Configuration getConfForNodeLabels() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, "A,B,CX");
return conf;
}
@Test @Test
public void testNodeLabelsFromConfig() throws IOException, public void testNodeLabelsFromConfig() throws IOException,
InterruptedException { InterruptedException {
Configuration conf = getConfForNodeLabels(); Configuration conf = new Configuration();
modifyConf("A,B,CX");
nodeLabelsProvider.init(conf); nodeLabelsProvider.init(conf);
// test for ensuring labels are set during initialization of the class // test for ensuring labels are set during initialization of the class
nodeLabelsProvider.start(); nodeLabelsProvider.start();
Thread.sleep(1000l); // sleep so that timer has run once during
// initialization
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"), assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
nodeLabelsProvider.getNodeLabels()); nodeLabelsProvider.getNodeLabels());
// test for valid Modification // test for valid Modification
TimerTask timerTask = nodeLabelsProvider.getTimerTask(); TimerTask timerTask = nodeLabelsProvider.getTimerTask();
modifyConfAndCallTimer(timerTask, "X,y,Z"); modifyConf("X,y,Z");
timerTask.run();
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"), assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
nodeLabelsProvider.getNodeLabels()); nodeLabelsProvider.getNodeLabels());
} }
@Test @Test
public void testConfigForNoTimer() throws Exception { public void testConfigForNoTimer() throws Exception {
Configuration conf = getConfForNodeLabels(); Configuration conf = new Configuration();
modifyConf("A,B,CX");
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER); AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
nodeLabelsProvider.init(conf); nodeLabelsProvider.init(conf);
nodeLabelsProvider.start(); nodeLabelsProvider.start();
Assert Assert
@ -112,18 +118,33 @@ public void testConfigForNoTimer() throws Exception {
nodeLabelsProvider.getNodeLabels()); nodeLabelsProvider.getNodeLabels());
} }
private static void modifyConfAndCallTimer(TimerTask timerTask, @Test
String nodeLabels) throws FileNotFoundException, IOException { public void testConfigTimer() throws Exception {
Configuration conf = new Configuration();
modifyConf("A,B,CX");
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
1000);
nodeLabelsProvider.init(conf);
nodeLabelsProvider.start();
// Ensure that even though timer is not run, node labels are fetched at
// least once so
// that NM registers/updates Labels with RM
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
nodeLabelsProvider.getNodeLabels());
modifyConf("X,y,Z");
Thread.sleep(1500);
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
nodeLabelsProvider.getNodeLabels());
}
private static void modifyConf(String nodeLabels)
throws FileNotFoundException, IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels); conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels);
conf.writeXml(new FileOutputStream(nodeLabelsConfigFile)); FileOutputStream confStream = new FileOutputStream(nodeLabelsConfigFile);
ClassLoader actualLoader = Thread.currentThread().getContextClassLoader(); conf.writeXml(confStream);
try { IOUtils.closeQuietly(confStream);
Thread.currentThread().setContextClassLoader(loader);
timerTask.run();
} finally {
Thread.currentThread().setContextClassLoader(actualLoader);
}
} }
private static class XMLPathClassLoader extends ClassLoader { private static class XMLPathClassLoader extends ClassLoader {