YARN-4106. NodeLabels for NM in distributed mode is not updated even after clusterNodelabel addition in RM. (Bibin A Chundatt via wangda)
(cherry picked from commit 77666105b4
)
This commit is contained in:
parent
7909462c3a
commit
5decd8f9f7
|
@ -772,6 +772,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3591. Resource localization on a bad disk causes subsequent containers failure.
|
||||
(Lavkesh Lahngir via vvasudev)
|
||||
|
||||
YARN-4106. NodeLabels for NM in distributed mode is not updated even after
|
||||
clusterNodelabel addition in RM. (Bibin A Chundatt via wangda)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -897,6 +897,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
private final NodeLabelsProvider nodeLabelsProvider;
|
||||
private Set<NodeLabel> previousNodeLabels;
|
||||
private boolean updatedLabelsSentToRM;
|
||||
private long lastNodeLabelSendFailMills = 0L;
|
||||
// TODO : Need to check which conf to use.Currently setting as 1 min
|
||||
private static final long FAILEDLABELRESENDINTERVAL = 60000;
|
||||
|
||||
@Override
|
||||
public Set<NodeLabel> getNodeLabelsForRegistration() {
|
||||
|
@ -938,12 +941,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
// take some action only on modification of labels
|
||||
boolean areNodeLabelsUpdated =
|
||||
nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
|
||||
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
|
||||
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat)
|
||||
|| checkResendLabelOnFailure();
|
||||
|
||||
updatedLabelsSentToRM = false;
|
||||
if (areNodeLabelsUpdated) {
|
||||
previousNodeLabels = nodeLabelsForHeartbeat;
|
||||
try {
|
||||
LOG.info("Modified labels from provider: "
|
||||
+ StringUtils.join(",", previousNodeLabels));
|
||||
validateNodeLabels(nodeLabelsForHeartbeat);
|
||||
updatedLabelsSentToRM = true;
|
||||
} catch (IOException e) {
|
||||
|
@ -980,16 +986,33 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* In case of failure when RM doesnt accept labels need to resend Labels to
|
||||
* RM. This method checks whether we need to resend
|
||||
*/
|
||||
public boolean checkResendLabelOnFailure() {
|
||||
if (lastNodeLabelSendFailMills > 0L) {
|
||||
long lastFailTimePassed =
|
||||
System.currentTimeMillis() - lastNodeLabelSendFailMills;
|
||||
if (lastFailTimePassed > FAILEDLABELRESENDINTERVAL) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void verifyRMHeartbeatResponseForNodeLabels(
|
||||
NodeHeartbeatResponse response) {
|
||||
if (updatedLabelsSentToRM) {
|
||||
if (response.getAreNodeLabelsAcceptedByRM()) {
|
||||
lastNodeLabelSendFailMills = 0L;
|
||||
LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
|
||||
+ "} were Accepted by RM ");
|
||||
} else {
|
||||
// case where updated labels from NodeLabelsProvider is sent to RM and
|
||||
// RM rejected the labels
|
||||
lastNodeLabelSendFailMills = System.currentTimeMillis();
|
||||
LOG.error(
|
||||
"NM node labels {" + StringUtils.join(",", previousNodeLabels)
|
||||
+ "} were not accepted by RM and message from RM : "
|
||||
|
|
|
@ -30,8 +30,6 @@ import org.apache.hadoop.yarn.api.records.NodeLabel;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Provides base implementation of NodeLabelsProvider with Timer and expects
|
||||
* subclass to provide TimerTask which can fetch NodeLabels
|
||||
|
@ -55,8 +53,6 @@ public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider {
|
|||
protected Set<NodeLabel> nodeLabels =
|
||||
CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
|
||||
|
||||
@VisibleForTesting
|
||||
long startTime = 0;
|
||||
|
||||
public AbstractNodeLabelsProvider(String name) {
|
||||
super(name);
|
||||
|
@ -77,12 +73,13 @@ public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider {
|
|||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
timerTask = createTimerTask();
|
||||
timerTask.run();
|
||||
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
|
||||
nodeLabelsScheduler =
|
||||
new Timer("DistributedNodeLabelsRunner-Timer", true);
|
||||
// Start the timer task and then periodically at the configured interval
|
||||
// time. Illegal values for intervalTime is handled by timer api
|
||||
nodeLabelsScheduler.scheduleAtFixedRate(timerTask, startTime,
|
||||
nodeLabelsScheduler.scheduleAtFixedRate(timerTask, intervalTime,
|
||||
intervalTime);
|
||||
}
|
||||
super.serviceStart();
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.TimerTask;
|
||||
|
||||
|
@ -41,16 +40,6 @@ public class ConfigurationNodeLabelsProvider extends AbstractNodeLabelsProvider
|
|||
public ConfigurationNodeLabelsProvider() {
|
||||
super("Configuration Based NodeLabels Provider");
|
||||
}
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
// In case timer is not configured avoid calling timertask.run thus avoiding
|
||||
// unnecessary creation of YarnConfiguration Object
|
||||
updateNodeLabelsFromConfig(conf);
|
||||
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
|
||||
startTime = new Date().getTime() + intervalTime;
|
||||
}
|
||||
}
|
||||
|
||||
private void updateNodeLabelsFromConfig(Configuration conf)
|
||||
throws IOException {
|
||||
|
|
|
@ -25,14 +25,17 @@ import java.net.MalformedURLException;
|
|||
import java.net.URL;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
|
||||
|
@ -48,13 +51,17 @@ public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
|
|||
|
||||
private ConfigurationNodeLabelsProvider nodeLabelsProvider;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
@BeforeClass
|
||||
public static void create() {
|
||||
loader =
|
||||
new XMLPathClassLoader(
|
||||
TestConfigurationNodeLabelsProvider.class.getClassLoader());
|
||||
testRootDir.mkdirs();
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
nodeLabelsProvider = new ConfigurationNodeLabelsProvider();
|
||||
}
|
||||
|
||||
|
@ -62,44 +69,43 @@ public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
|
|||
public void tearDown() throws Exception {
|
||||
if (nodeLabelsProvider != null) {
|
||||
nodeLabelsProvider.close();
|
||||
nodeLabelsProvider.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void remove() throws Exception {
|
||||
if (testRootDir.exists()) {
|
||||
FileContext.getLocalFSFileContext().delete(
|
||||
new Path(testRootDir.getAbsolutePath()), true);
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration getConfForNodeLabels() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, "A,B,CX");
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLabelsFromConfig() throws IOException,
|
||||
InterruptedException {
|
||||
Configuration conf = getConfForNodeLabels();
|
||||
Configuration conf = new Configuration();
|
||||
modifyConf("A,B,CX");
|
||||
nodeLabelsProvider.init(conf);
|
||||
// test for ensuring labels are set during initialization of the class
|
||||
nodeLabelsProvider.start();
|
||||
Thread.sleep(1000l); // sleep so that timer has run once during
|
||||
// initialization
|
||||
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
// test for valid Modification
|
||||
TimerTask timerTask = nodeLabelsProvider.getTimerTask();
|
||||
modifyConfAndCallTimer(timerTask, "X,y,Z");
|
||||
modifyConf("X,y,Z");
|
||||
timerTask.run();
|
||||
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigForNoTimer() throws Exception {
|
||||
Configuration conf = getConfForNodeLabels();
|
||||
Configuration conf = new Configuration();
|
||||
modifyConf("A,B,CX");
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
|
||||
|
||||
nodeLabelsProvider.init(conf);
|
||||
nodeLabelsProvider.start();
|
||||
Assert
|
||||
|
@ -112,18 +118,33 @@ public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
|
|||
nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
|
||||
private static void modifyConfAndCallTimer(TimerTask timerTask,
|
||||
String nodeLabels) throws FileNotFoundException, IOException {
|
||||
@Test
|
||||
public void testConfigTimer() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
modifyConf("A,B,CX");
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
1000);
|
||||
nodeLabelsProvider.init(conf);
|
||||
nodeLabelsProvider.start();
|
||||
// Ensure that even though timer is not run, node labels are fetched at
|
||||
// least once so
|
||||
// that NM registers/updates Labels with RM
|
||||
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
modifyConf("X,y,Z");
|
||||
Thread.sleep(1500);
|
||||
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
}
|
||||
|
||||
private static void modifyConf(String nodeLabels)
|
||||
throws FileNotFoundException, IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels);
|
||||
conf.writeXml(new FileOutputStream(nodeLabelsConfigFile));
|
||||
ClassLoader actualLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
timerTask.run();
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(actualLoader);
|
||||
}
|
||||
FileOutputStream confStream = new FileOutputStream(nodeLabelsConfigFile);
|
||||
conf.writeXml(confStream);
|
||||
IOUtils.closeQuietly(confStream);
|
||||
}
|
||||
|
||||
private static class XMLPathClassLoader extends ClassLoader {
|
||||
|
|
Loading…
Reference in New Issue