YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed Node Label Configuration Setup. (Naganarasimha G R)

(cherry picked from commit fc07464d1a)
This commit is contained in:
Wangda Tan 2015-08-20 11:51:03 -07:00
parent b567aa2b4f
commit 6452b31760
11 changed files with 793 additions and 86 deletions

View File

@ -120,6 +120,9 @@ Release 2.8.0 - UNRELEASED
YARN-4055. Report node resource utilization in heartbeat. YARN-4055. Report node resource utilization in heartbeat.
(Inigo Goiri via kasha) (Inigo Goiri via kasha)
YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed
Node Label Configuration Setup. (Naganarasimha G R)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -1967,6 +1967,36 @@ public class YarnConfiguration extends Configuration {
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
} }
private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
+ "node-labels.";
public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
NM_NODE_LABELS_PREFIX + "provider";
// whitelist names for the yarn.nodemanager.node-labels.provider
public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
// If -1 is configured then no timer task should be created
public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
public static final String NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS =
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-timeout-ms";
// once in 10 mins
public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
10 * 60 * 1000;
// Twice of default interval time
public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS =
DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS * 2;
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
public YarnConfiguration() { public YarnConfiguration() {
super(); super();
} }

View File

@ -916,7 +916,7 @@ public class CommonNodeLabelsManager extends AbstractService {
} }
} }
private void checkAndThrowLabelName(String label) throws IOException { public static void checkAndThrowLabelName(String label) throws IOException {
if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
throw new IOException("label added is empty or exceeds " throw new IOException("label added is empty or exceeds "
+ MAX_LABEL_LENGTH + " character(s)"); + MAX_LABEL_LENGTH + " character(s)");

View File

@ -2092,6 +2092,53 @@
<value>centralized</value> <value>centralized</value>
</property> </property>
<!-- Distributed Node Labels Configuration -->
<property>
<description>
When node labels "yarn.node-labels.configuration-type" is
of type "distributed" Administrators can configure the source of the
node labels provider by configuring this parameter. Administrators can
specify either "config" or the class name of the provider. Configured
class needs to extend
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider.
If "config" is specified then, "ConfigurationNodeLabelsProvider" will
be used.
</description>
<name>yarn.nodemanager.node-labels.provider</name>
</property>
<property>
<description>
When node labels "yarn.nodemanager.node-labels.provider" is of type
"config" or the configured class extends AbstractNodeLabelsProvider then
periodically node labels are retrieved from the node labels provider.
This configuration is to define the interval. If -1 is configured then
node labels are retrieved from. provider only during initialization.
Defaults to 10 mins.
</description>
<name>yarn.nodemanager.node-labels.provider.fetch-interval-ms</name>
<value>600000</value>
</property>
<property>
<description>
When node labels "yarn.nodemanager.node-labels.provider"
is of type "config" then ConfigurationNodeLabelsProvider fetches the
labels this parameter.
</description>
<name>yarn.nodemanager.node-labels.provider.configured-node-labels</name>
</property>
<property>
<description>
When node labels "yarn.nodemanager.node-labels.provider" is a class
which extends AbstractNodeLabelsProvider then this configuration provides
the timeout period after which it will stop querying the Node labels
provider. Defaults to 20 mins.
</description>
<name>yarn.nodemanager.node-labels.provider.fetch-timeout-ms</name>
<value>1200000</value>
</property>
<!-- Other Configuration --> <!-- Other Configuration -->
<property> <property>

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@ -122,12 +123,38 @@ public class NodeManager extends CompositeService
metrics, nodeLabelsProvider); metrics, nodeLabelsProvider);
} }
@VisibleForTesting protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
protected NodeLabelsProvider createNodeLabelsProvider( throws IOException {
Configuration conf) throws IOException { NodeLabelsProvider provider = null;
// TODO as part of YARN-2729 String providerString =
// Need to get the implementation of provider service and return conf.get(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null);
return null; if (providerString == null || providerString.trim().length() == 0) {
// Seems like Distributed Node Labels configuration is not enabled
return provider;
}
switch (providerString.trim().toLowerCase()) {
case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
provider = new ConfigurationNodeLabelsProvider();
break;
default:
try {
Class<? extends NodeLabelsProvider> labelsProviderClass =
conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null,
NodeLabelsProvider.class);
provider = labelsProviderClass.newInstance();
} catch (InstantiationException | IllegalAccessException
| RuntimeException e) {
LOG.error("Failed to create NodeLabelsProvider based on Configuration",
e);
throw new IOException("Failed to create NodeLabelsProvider : "
+ e.getMessage(), e);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Distributed Node Labels is enabled"
+ " with provider class as : " + provider.getClass().toString());
}
return provider;
} }
protected NodeResourceMonitor createNodeResourceMonitor() { protected NodeResourceMonitor createNodeResourceMonitor() {

View File

@ -137,8 +137,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private boolean registeredWithRM = false; private boolean registeredWithRM = false;
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>(); Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
private final NodeLabelsProvider nodeLabelsProvider; private NMNodeLabelsHandler nodeLabelsHandler;
private final boolean hasNodeLabelsProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@ -150,8 +149,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeLabelsProvider nodeLabelsProvider) { NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName()); super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker; this.healthChecker = healthChecker;
this.nodeLabelsProvider = nodeLabelsProvider; nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
this.hasNodeLabelsProvider = (nodeLabelsProvider != null);
this.context = context; this.context = context;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.metrics = metrics; this.metrics = metrics;
@ -313,13 +311,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void registerWithRM() protected void registerWithRM()
throws YarnException, IOException { throws YarnException, IOException {
List<NMContainerStatus> containerReports = getNMContainerStatuses(); List<NMContainerStatus> containerReports = getNMContainerStatuses();
Set<NodeLabel> nodeLabels = null; Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
if (hasNodeLabelsProvider) {
nodeLabels = nodeLabelsProvider.getNodeLabels();
nodeLabels =
(null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: nodeLabels;
}
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(), nodeManagerVersionId, containerReports, getRunningApplications(),
@ -380,14 +372,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
.append(this.nodeId).append(" with total resource of ") .append(this.nodeId).append(" with total resource of ")
.append(this.totalResource); .append(this.totalResource);
if (regNMResponse.getAreNodeLabelsAcceptedByRM()) { successfullRegistrationMsg.append(nodeLabelsHandler
successfullRegistrationMsg .verifyRMRegistrationResponseForNodeLabels(regNMResponse));
.append(" and with following Node label(s) : {")
.append(StringUtils.join(",", nodeLabels)).append("}");
} else if (hasNodeLabelsProvider) {
//case where provider is set but RM did not accept the Node Labels
LOG.error(regNMResponse.getDiagnosticsMessage());
}
LOG.info(successfullRegistrationMsg); LOG.info(successfullRegistrationMsg);
LOG.info("Notifying ContainerManager to unblock new container-requests"); LOG.info("Notifying ContainerManager to unblock new container-requests");
@ -688,33 +674,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void run() { public void run() {
int lastHeartbeatID = 0; int lastHeartbeatID = 0;
Set<NodeLabel> lastUpdatedNodeLabelsToRM = null;
if (hasNodeLabelsProvider) {
lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
lastUpdatedNodeLabelsToRM =
(null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: lastUpdatedNodeLabelsToRM;
}
while (!isStopped) { while (!isStopped) {
// Send heartbeat // Send heartbeat
try { try {
NodeHeartbeatResponse response = null; NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat = null; Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
if (hasNodeLabelsProvider) {
nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
// if the provider returns null then consider empty labels are set
nodeLabelsForHeartbeat =
(nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: nodeLabelsForHeartbeat;
if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
lastUpdatedNodeLabelsToRM)) {
// if nodelabels have not changed then no need to send
nodeLabelsForHeartbeat = null;
}
}
NodeHeartbeatRequest request = NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus, NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context NodeStatusUpdaterImpl.this.context
@ -740,9 +707,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
updateMasterKeys(response); updateMasterKeys(response);
if (response.getNodeAction() == NodeAction.SHUTDOWN) { if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
.warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + " heartbeat, hence shutting down.");
+ " hence shutting down.");
LOG.warn("Message from ResourceManager: " LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage()); + response.getDiagnosticsMessage());
context.setDecommissioned(true); context.setDecommissioned(true);
@ -764,16 +730,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
break; break;
} }
if (response.getAreNodeLabelsAcceptedByRM()) { nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response);
lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat;
LOG.info("Node Labels {"
+ StringUtils.join(",", nodeLabelsForHeartbeat)
+ "} were Accepted by RM ");
} else if (nodeLabelsForHeartbeat != null) {
// case where NodeLabelsProvider is set and updated labels were
// sent to RM and RM rejected the labels
LOG.error(response.getDiagnosticsMessage());
}
// Explicitly put this method after checking the resync response. We // Explicitly put this method after checking the resync response. We
// don't want to remove the completed containers before resync // don't want to remove the completed containers before resync
@ -833,23 +790,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
} }
/**
* Caller should take care of sending non null nodelabels for both
* arguments
*
* @param nodeLabelsNew
* @param nodeLabelsOld
* @return if the New node labels are diff from the older one.
*/
private boolean areNodeLabelsUpdated(Set<NodeLabel> nodeLabelsNew,
Set<NodeLabel> nodeLabelsOld) {
if (nodeLabelsNew.size() != nodeLabelsOld.size()
|| !nodeLabelsOld.containsAll(nodeLabelsNew)) {
return true;
}
return false;
}
private void updateMasterKeys(NodeHeartbeatResponse response) { private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over // See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
@ -879,4 +819,183 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
reports.addAll(logAggregationReportForAppsTempList); reports.addAll(logAggregationReportForAppsTempList);
return reports; return reports;
} }
private NMNodeLabelsHandler createNMNodeLabelsHandler(
NodeLabelsProvider nodeLabelsProvider) {
if (nodeLabelsProvider == null) {
return new NMCentralizedNodeLabelsHandler();
} else {
return new NMDistributedNodeLabelsHandler(nodeLabelsProvider);
}
}
private static interface NMNodeLabelsHandler {
/**
* validates nodeLabels From Provider and returns it to the caller. Also
* ensures that if provider returns null then empty label set is considered
*/
Set<NodeLabel> getNodeLabelsForRegistration();
/**
* @return RMRegistration Success message and on failure will log
* independently and returns empty string
*/
String verifyRMRegistrationResponseForNodeLabels(
RegisterNodeManagerResponse regNMResponse);
/**
* If nodeLabels From Provider is different previous node labels then it
* will check the syntax correctness and throws exception if invalid. If
* valid, returns nodeLabels From Provider. Also ensures that if provider
* returns null then empty label set is considered and If labels are not
* modified it returns null.
*/
Set<NodeLabel> getNodeLabelsForHeartbeat();
/**
* check whether if updated labels sent to RM was accepted or not
* @param response
*/
void verifyRMHeartbeatResponseForNodeLabels(NodeHeartbeatResponse response);
}
/**
* In centralized configuration, NM need not send Node labels or process the
* response
*/
private static class NMCentralizedNodeLabelsHandler
implements NMNodeLabelsHandler {
@Override
public Set<NodeLabel> getNodeLabelsForHeartbeat() {
return null;
}
@Override
public Set<NodeLabel> getNodeLabelsForRegistration() {
return null;
}
@Override
public void verifyRMHeartbeatResponseForNodeLabels(
NodeHeartbeatResponse response) {
}
@Override
public String verifyRMRegistrationResponseForNodeLabels(
RegisterNodeManagerResponse regNMResponse) {
return "";
}
}
private static class NMDistributedNodeLabelsHandler
implements NMNodeLabelsHandler {
private NMDistributedNodeLabelsHandler(
NodeLabelsProvider nodeLabelsProvider) {
this.nodeLabelsProvider = nodeLabelsProvider;
}
private final NodeLabelsProvider nodeLabelsProvider;
private Set<NodeLabel> previousNodeLabels;
private boolean updatedLabelsSentToRM;
@Override
public Set<NodeLabel> getNodeLabelsForRegistration() {
Set<NodeLabel> nodeLabels = nodeLabelsProvider.getNodeLabels();
nodeLabels = (null == nodeLabels)
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels;
previousNodeLabels = nodeLabels;
try {
validateNodeLabels(nodeLabels);
} catch (IOException e) {
nodeLabels = null;
}
return nodeLabels;
}
@Override
public String verifyRMRegistrationResponseForNodeLabels(
RegisterNodeManagerResponse regNMResponse) {
StringBuilder successfulNodeLabelsRegistrationMsg = new StringBuilder("");
if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
successfulNodeLabelsRegistrationMsg
.append(" and with following Node label(s) : {")
.append(StringUtils.join(",", previousNodeLabels)).append("}");
} else {
// case where provider is set but RM did not accept the Node Labels
LOG.error(regNMResponse.getDiagnosticsMessage());
}
return successfulNodeLabelsRegistrationMsg.toString();
}
@Override
public Set<NodeLabel> getNodeLabelsForHeartbeat() {
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsProvider.getNodeLabels();
// if the provider returns null then consider empty labels are set
nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null)
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
: nodeLabelsForHeartbeat;
// take some action only on modification of labels
boolean areNodeLabelsUpdated =
nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
updatedLabelsSentToRM = false;
if (areNodeLabelsUpdated) {
previousNodeLabels = nodeLabelsForHeartbeat;
try {
validateNodeLabels(nodeLabelsForHeartbeat);
updatedLabelsSentToRM = true;
} catch (IOException e) {
// set previous node labels to invalid set, so that invalid
// labels are not verified for every HB, and send empty set
// to RM to have same nodeLabels which was earlier set.
nodeLabelsForHeartbeat = null;
}
} else {
// if nodelabels have not changed then no need to send
nodeLabelsForHeartbeat = null;
}
return nodeLabelsForHeartbeat;
}
private void validateNodeLabels(Set<NodeLabel> nodeLabelsForHeartbeat)
throws IOException {
Iterator<NodeLabel> iterator = nodeLabelsForHeartbeat.iterator();
boolean hasInvalidLabel = false;
StringBuilder errorMsg = new StringBuilder("");
while (iterator.hasNext()) {
try {
CommonNodeLabelsManager
.checkAndThrowLabelName(iterator.next().getName());
} catch (IOException e) {
errorMsg.append(e.getMessage());
errorMsg.append(" , ");
hasInvalidLabel = true;
}
}
if (hasInvalidLabel) {
LOG.error("Invalid Node Label(s) from Provider : " + errorMsg);
throw new IOException(errorMsg.toString());
}
}
@Override
public void verifyRMHeartbeatResponseForNodeLabels(
NodeHeartbeatResponse response) {
if (updatedLabelsSentToRM) {
if (response.getAreNodeLabelsAcceptedByRM()) {
LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were Accepted by RM ");
} else {
// case where updated labels from NodeLabelsProvider is sent to RM and
// RM rejected the labels
LOG.error(
"NM node labels {" + StringUtils.join(",", previousNodeLabels)
+ "} were not accepted by RM and message from RM : "
+ response.getDiagnosticsMessage());
}
}
}
}
} }

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import com.google.common.annotations.VisibleForTesting;
/**
* Provides base implementation of NodeLabelsProvider with Timer and expects
* subclass to provide TimerTask which can fetch NodeLabels
*/
public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider {
public static final long DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER = -1;
// Delay after which timer task are triggered to fetch NodeLabels
protected long intervalTime;
// Timer used to schedule node labels fetching
protected Timer nodeLabelsScheduler;
public static final String NODE_LABELS_SEPRATOR = ",";
protected Lock readLock = null;
protected Lock writeLock = null;
protected TimerTask timerTask;
protected Set<NodeLabel> nodeLabels =
CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
@VisibleForTesting
long startTime = 0;
public AbstractNodeLabelsProvider(String name) {
super(name);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.intervalTime =
conf.getLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
timerTask = createTimerTask();
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
nodeLabelsScheduler =
new Timer("DistributedNodeLabelsRunner-Timer", true);
// Start the timer task and then periodically at the configured interval
// time. Illegal values for intervalTime is handled by timer api
nodeLabelsScheduler.scheduleAtFixedRate(timerTask, startTime,
intervalTime);
}
super.serviceStart();
}
/**
* terminate the timer
* @throws Exception
*/
@Override
protected void serviceStop() throws Exception {
if (nodeLabelsScheduler != null) {
nodeLabelsScheduler.cancel();
}
super.serviceStop();
}
/**
* @return Returns output from provider.
*/
@Override
public Set<NodeLabel> getNodeLabels() {
readLock.lock();
try {
return nodeLabels;
} finally {
readLock.unlock();
}
}
protected void setNodeLabels(Set<NodeLabel> nodeLabelsSet) {
writeLock.lock();
try {
nodeLabels = nodeLabelsSet;
} finally {
writeLock.unlock();
}
}
/**
* Used only by tests to access the timer task directly
*
* @return the timer task
*/
TimerTask getTimerTask() {
return timerTask;
}
static Set<NodeLabel> convertToNodeLabelSet(Set<String> nodeLabels) {
if (null == nodeLabels) {
return null;
}
Set<NodeLabel> labels = new HashSet<NodeLabel>();
for (String label : nodeLabels) {
labels.add(NodeLabel.newInstance(label));
}
return labels;
}
public abstract TimerTask createTimerTask();
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Provides Node's Labels by constantly monitoring the configuration.
*/
public class ConfigurationNodeLabelsProvider extends AbstractNodeLabelsProvider {
private static final Log LOG = LogFactory
.getLog(ConfigurationNodeLabelsProvider.class);
public ConfigurationNodeLabelsProvider() {
super("Configuration Based NodeLabels Provider");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
// In case timer is not configured avoid calling timertask.run thus avoiding
// unnecessary creation of YarnConfiguration Object
updateNodeLabelsFromConfig(conf);
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
startTime = new Date().getTime() + intervalTime;
}
}
private void updateNodeLabelsFromConfig(Configuration conf)
throws IOException {
String confLabelString =
conf.get(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, null);
String[] nodeLabelsFromConfiguration =
(confLabelString == null || confLabelString.isEmpty()) ? new String[] {}
: StringUtils.getStrings(confLabelString);
setNodeLabels(convertToNodeLabelSet(new HashSet<String>(
Arrays.asList(nodeLabelsFromConfiguration))));
}
private class ConfigurationMonitorTimerTask extends TimerTask {
@Override
public void run() {
try {
updateNodeLabelsFromConfig(new YarnConfiguration());
} catch (Exception e) {
LOG.error("Failed to update node Labels from configuration.xml ", e);
}
}
}
@Override
public TimerTask createTimerTask() {
return new ConfigurationMonitorTimerTask();
}
}

View File

@ -22,8 +22,11 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestNodeManager { public class TestNodeManager {
@ -54,4 +57,49 @@ public class TestNodeManager {
} }
} }
@Test
public void testCreationOfNodeLabelsProviderService()
throws InterruptedException {
try {
NodeManager nodeManager = new NodeManager();
Configuration conf = new Configuration();
NodeLabelsProvider labelsProviderService =
nodeManager.createNodeLabelsProvider(conf);
Assert
.assertNull(
"LabelsProviderService should not be initialized in default configuration",
labelsProviderService);
// With valid className
conf.set(
YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
"org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider");
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
Assert.assertNotNull("LabelsProviderService should be initialized When "
+ "node labels provider class is configured", labelsProviderService);
// With invalid className
conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
"org.apache.hadoop.yarn.server.nodemanager.NodeManager");
try {
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
Assert.fail("Expected to throw IOException on Invalid configuration");
} catch (IOException e) {
// exception expected on invalid configuration
}
Assert.assertNotNull("LabelsProviderService should be initialized When "
+ "node labels provider class is configured", labelsProviderService);
// With valid whitelisted configurations
conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER);
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
Assert.assertNotNull("LabelsProviderService should be initialized When "
+ "node labels provider class is configured", labelsProviderService);
} catch (Exception e) {
Assert.fail("Exception caught");
e.printStackTrace();
}
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -108,7 +109,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
if (receivedNMHeartbeat) { if (receivedNMHeartbeat) {
return; return;
} }
int i = 500; int i = 10;
while (!receivedNMHeartbeat && i > 0) { while (!receivedNMHeartbeat && i > 0) {
synchronized (ResourceTrackerForLabels.class) { synchronized (ResourceTrackerForLabels.class) {
if (!receivedNMHeartbeat) { if (!receivedNMHeartbeat) {
@ -193,7 +194,6 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
public static class DummyNodeLabelsProvider extends NodeLabelsProvider { public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
@SuppressWarnings("unchecked")
private Set<NodeLabel> nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET; private Set<NodeLabel> nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
public DummyNodeLabelsProvider() { public DummyNodeLabelsProvider() {
@ -224,8 +224,8 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
new ResourceTrackerForLabels(); new ResourceTrackerForLabels();
nm = new NodeManager() { nm = new NodeManager() {
@Override @Override
protected NodeLabelsProvider createNodeLabelsProvider( protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
Configuration conf) throws IOException { throws IOException {
return dummyLabelsProviderRef; return dummyLabelsProviderRef;
} }
@ -255,8 +255,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
nm.start(); nm.start();
resourceTracker.waitTillRegister(); resourceTracker.waitTillRegister();
assertNLCollectionEquals(resourceTracker.labels, assertNLCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef dummyLabelsProviderRef.getNodeLabels());
.getNodeLabels());
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag(); resourceTracker.resetNMHeartbeatReceiveFlag();
@ -283,10 +282,71 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
dummyLabelsProviderRef.setNodeLabels(null); dummyLabelsProviderRef.setNodeLabels(null);
nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat(); resourceTracker.waitTillHeartbeat();
assertNotNull(
"If provider sends null then empty label set should be sent and not null",
resourceTracker.labels);
assertTrue("If provider sends null then empty labels should be sent", assertTrue("If provider sends null then empty labels should be sent",
resourceTracker.labels.isEmpty()); resourceTracker.labels.isEmpty());
resourceTracker.resetNMHeartbeatReceiveFlag(); resourceTracker.resetNMHeartbeatReceiveFlag();
nm.stop(); nm.stop();
} }
@Test
public void testInvalidNodeLabelsFromProvider() throws InterruptedException,
IOException {
final ResourceTrackerForLabels resourceTracker =
new ResourceTrackerForLabels();
nm = new NodeManager() {
@Override
protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
throws IOException {
return dummyLabelsProviderRef;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
NodeLabelsProvider labelsProvider) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, labelsProvider) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected void stopRMProxy() {
return;
}
};
}
};
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P"));
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat with invalid labels
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("_.P"));
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNull("On Invalid Labels we need to retain earlier labels, HB "
+ "needs to send null", resourceTracker.labels);
resourceTracker.resetNMHeartbeatReceiveFlag();
// on next heartbeat same invalid labels will be given by the provider, but
// again label validation check and reset RM with empty labels set should
// not happen
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
resourceTracker.resetNMHeartbeatReceiveFlag();
assertNull("NodeStatusUpdater need not send repeatedly empty labels on "
+ "invalid labels from provider ", resourceTracker.labels);
}
} }

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.TimerTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
protected static File testRootDir = new File("target",
TestConfigurationNodeLabelsProvider.class.getName() + "-localDir")
.getAbsoluteFile();
final static File nodeLabelsConfigFile = new File(testRootDir,
"yarn-site.xml");
private static XMLPathClassLoader loader;
private ConfigurationNodeLabelsProvider nodeLabelsProvider;
@Before
public void setup() {
loader =
new XMLPathClassLoader(
TestConfigurationNodeLabelsProvider.class.getClassLoader());
testRootDir.mkdirs();
nodeLabelsProvider = new ConfigurationNodeLabelsProvider();
}
@After
public void tearDown() throws Exception {
if (nodeLabelsProvider != null) {
nodeLabelsProvider.close();
}
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
}
private Configuration getConfForNodeLabels() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, "A,B,CX");
return conf;
}
@Test
public void testNodeLabelsFromConfig() throws IOException,
InterruptedException {
Configuration conf = getConfForNodeLabels();
nodeLabelsProvider.init(conf);
// test for ensuring labels are set during initialization of the class
nodeLabelsProvider.start();
Thread.sleep(1000l); // sleep so that timer has run once during
// initialization
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
nodeLabelsProvider.getNodeLabels());
// test for valid Modification
TimerTask timerTask = nodeLabelsProvider.getTimerTask();
modifyConfAndCallTimer(timerTask, "X,y,Z");
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
nodeLabelsProvider.getNodeLabels());
}
@Test
public void testConfigForNoTimer() throws Exception {
Configuration conf = getConfForNodeLabels();
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
nodeLabelsProvider.init(conf);
nodeLabelsProvider.start();
Assert
.assertNull(
"Timer is not expected to be created when interval is configured as -1",
nodeLabelsProvider.nodeLabelsScheduler);
// Ensure that even though timer is not run, node labels are fetched at least once so
// that NM registers/updates Labels with RM
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
nodeLabelsProvider.getNodeLabels());
}
private static void modifyConfAndCallTimer(TimerTask timerTask,
String nodeLabels) throws FileNotFoundException, IOException {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels);
conf.writeXml(new FileOutputStream(nodeLabelsConfigFile));
ClassLoader actualLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(loader);
timerTask.run();
} finally {
Thread.currentThread().setContextClassLoader(actualLoader);
}
}
private static class XMLPathClassLoader extends ClassLoader {
public XMLPathClassLoader(ClassLoader wrapper) {
super(wrapper);
}
public URL getResource(String name) {
if (name.equals(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)) {
try {
return nodeLabelsConfigFile.toURI().toURL();
} catch (MalformedURLException e) {
e.printStackTrace();
Assert.fail();
}
}
return super.getResource(name);
}
}
}