YARN-7757. Refactor NodeLabelsProvider to be more generic and reusable for node attributes providers. Contributed by Weiwei Yang.

This commit is contained in:
Naganarasimha 2018-02-05 05:47:02 +08:00 committed by Sunil G
parent 4458b2772f
commit d312b5cf9f
16 changed files with 795 additions and 183 deletions

View File

@ -3542,6 +3542,9 @@ public static boolean areNodeLabelsEnabled(
private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
+ "node-labels.";
private static final String NM_NODE_ATTRIBUTES_PREFIX = NM_PREFIX
+ "node-attributes.";
public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
NM_NODE_LABELS_PREFIX + "provider";
@ -3552,6 +3555,9 @@ public static boolean areNodeLabelsEnabled(
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
private static final String NM_NODE_ATTRIBUTES_PROVIDER_PREFIX =
NM_NODE_ATTRIBUTES_PREFIX + "provider.";
public static final String NM_NODE_LABELS_RESYNC_INTERVAL =
NM_NODE_LABELS_PREFIX + "resync-interval-ms";
@ -3623,6 +3629,33 @@ public static boolean areNodeLabelsEnabled(
NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PREFIX + "opts";
/**
* Node attribute provider fetch attributes interval and timeout.
*/
public static final String NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS =
NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "fetch-interval-ms";
public static final long
DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS = 10 * 60 * 1000;
public static final String NM_NODE_ATTRIBUTES_PROVIDER_FETCH_TIMEOUT_MS =
NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "fetch-timeout-ms";
public static final long DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_TIMEOUT_MS
= DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS * 2;
/**
* Script to collect node attributes.
*/
private static final String NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_PREFIX =
NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "script.";
public static final String NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_PATH =
NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_PREFIX + "path";
public static final String NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_OPTS =
NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_PREFIX + "opts";
/*
* Support to view apps for given user in secure cluster.
* @deprecated This field is deprecated for {@link #FILTER_ENTITY_LIST_BY_USER}
*/

View File

@ -2901,6 +2901,44 @@
<value>1800000</value>
</property>
<!-- Distributed Node Attributes Configuration -->
<property>
<description>
The node attribute script NM runs to collect node attributes.
Script output Line starting with "NODE_ATTRIBUTE:" will be
considered as a record of node attribute, attribute name, type
and value should be delimited by comma. Each of such lines
will be parsed to a node attribute.
</description>
<name>yarn.nodemanager.node-attributes.provider.script.path</name>
</property>
<property>
<description>
Command arguments passed to the node attribute script.
</description>
<name>yarn.nodemanager.node-attributes.provider.script.opts</name>
</property>
<property>
<description>
Time interval that determines how long NM fetches node attributes
from a given provider. If -1 is configured then node labels are
retrieved from provider only during initialization. Defaults to 10 mins.
</description>
<name>yarn.nodemanager.node-attributes.provider.fetch-interval-ms</name>
<value>600000</value>
</property>
<property>
<description>
Timeout period after which NM will interrupt the node attribute
provider script which queries node attributes. Defaults to 20 mins.
</description>
<name>yarn.nodemanager.node-attributes.provider.fetch-timeout-ms</name>
<value>1200000</value>
</property>
<property>
<description>
Timeout in seconds for YARN node graceful decommission.

View File

@ -932,7 +932,7 @@ private NMDistributedNodeLabelsHandler(
@Override
public Set<NodeLabel> getNodeLabelsForRegistration() {
Set<NodeLabel> nodeLabels = nodeLabelsProvider.getNodeLabels();
Set<NodeLabel> nodeLabels = nodeLabelsProvider.getDescriptors();
nodeLabels = (null == nodeLabels)
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels;
previousNodeLabels = nodeLabels;
@ -967,7 +967,7 @@ public String verifyRMRegistrationResponseForNodeLabels(
@Override
public Set<NodeLabel> getNodeLabelsForHeartbeat() {
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsProvider.getNodeLabels();
nodeLabelsProvider.getDescriptors();
// if the provider returns null then consider empty labels are set
nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null)
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
@ -24,48 +26,52 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.Collections;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
/**
* Provides base implementation of NodeLabelsProvider with Timer and expects
* subclass to provide TimerTask which can fetch NodeLabels
* Provides base implementation of NodeDescriptorsProvider with Timer and
* expects subclass to provide TimerTask which can fetch node descriptors.
*/
public abstract class AbstractNodeLabelsProvider extends AbstractService
implements NodeLabelsProvider {
public static final long DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER = -1;
public abstract class AbstractNodeDescriptorsProvider<T>
extends AbstractService implements NodeDescriptorsProvider<T> {
public static final long DISABLE_NODE_DESCRIPTORS_PROVIDER_FETCH_TIMER = -1;
// Delay after which timer task are triggered to fetch NodeLabels
protected long intervalTime;
// Delay after which timer task are triggered to fetch node descriptors.
// Default interval is -1 means it is an one time task, each implementation
// will override this value from configuration.
private long intervalTime = -1;
// Timer used to schedule node labels fetching
protected Timer nodeLabelsScheduler;
public static final String NODE_LABELS_SEPRATOR = ",";
// Timer used to schedule node descriptors fetching
private Timer scheduler;
protected Lock readLock = null;
protected Lock writeLock = null;
protected TimerTask timerTask;
protected Set<NodeLabel> nodeLabels =
CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
private Set<T> nodeDescriptors = Collections
.unmodifiableSet(new HashSet<>(0));
public AbstractNodeLabelsProvider(String name) {
public AbstractNodeDescriptorsProvider(String name) {
super(name);
}
public long getIntervalTime() {
return intervalTime;
}
public void setIntervalTime(long intervalMS) {
this.intervalTime = intervalMS;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.intervalTime =
conf.getLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
@ -76,13 +82,13 @@ protected void serviceInit(Configuration conf) throws Exception {
protected void serviceStart() throws Exception {
timerTask = createTimerTask();
timerTask.run();
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
nodeLabelsScheduler =
new Timer("DistributedNodeLabelsRunner-Timer", true);
long taskInterval = getIntervalTime();
if (taskInterval != DISABLE_NODE_DESCRIPTORS_PROVIDER_FETCH_TIMER) {
scheduler =
new Timer("DistributedNodeDescriptorsRunner-Timer", true);
// Start the timer task and then periodically at the configured interval
// time. Illegal values for intervalTime is handled by timer api
nodeLabelsScheduler.scheduleAtFixedRate(timerTask, intervalTime,
intervalTime);
scheduler.schedule(timerTask, taskInterval, taskInterval);
}
super.serviceStart();
}
@ -93,8 +99,8 @@ protected void serviceStart() throws Exception {
*/
@Override
protected void serviceStop() throws Exception {
if (nodeLabelsScheduler != null) {
nodeLabelsScheduler.cancel();
if (scheduler != null) {
scheduler.cancel();
}
cleanUp();
super.serviceStop();
@ -109,24 +115,56 @@ protected void serviceStop() throws Exception {
* @return Returns output from provider.
*/
@Override
public Set<NodeLabel> getNodeLabels() {
public Set<T> getDescriptors() {
readLock.lock();
try {
return nodeLabels;
return this.nodeDescriptors;
} finally {
readLock.unlock();
}
}
protected void setNodeLabels(Set<NodeLabel> nodeLabelsSet) {
@Override
public void setDescriptors(Set<T> descriptorsSet) {
writeLock.lock();
try {
nodeLabels = nodeLabelsSet;
this.nodeDescriptors = descriptorsSet;
} finally {
writeLock.unlock();
}
}
/**
* Method used to determine if or not node descriptors fetching script is
* configured and whether it is fit to run. Returns true if following
* conditions are met:
*
* <ol>
* <li>Path to the script is not empty</li>
* <li>The script file exists</li>
* </ol>
*
* @throws IOException
*/
protected void verifyConfiguredScript(String scriptPath)
throws IOException {
boolean invalidConfiguration;
if (scriptPath == null
|| scriptPath.trim().isEmpty()) {
invalidConfiguration = true;
} else {
File f = new File(scriptPath);
invalidConfiguration = !f.exists() || !FileUtil.canExecute(f);
}
if (invalidConfiguration) {
throw new IOException(
"Node descriptors provider script \"" + scriptPath
+ "\" is not configured properly. Please check whether"
+ " the script path exists, owner and the access rights"
+ " are suitable for NM process to execute it");
}
}
static Set<NodeLabel> convertToNodeLabelSet(String partitionNodeLabel) {
if (null == partitionNodeLabel) {
return null;
@ -145,5 +183,15 @@ TimerTask getTimerTask() {
return timerTask;
}
@VisibleForTesting
public Timer getScheduler() {
return this.scheduler;
}
/**
* Creates a timer task which be scheduled periodically by the provider,
* and the task is responsible to update node descriptors to the provider.
* @return a timer task.
*/
public abstract TimerTask createTimerTask();
}

View File

@ -29,7 +29,7 @@
/**
* Provides Node's Labels by constantly monitoring the configuration.
*/
public class ConfigurationNodeLabelsProvider extends AbstractNodeLabelsProvider {
public class ConfigurationNodeLabelsProvider extends NodeLabelsProvider {
private static final Logger LOG =
LoggerFactory.getLogger(ConfigurationNodeLabelsProvider.class);
@ -38,11 +38,20 @@ public ConfigurationNodeLabelsProvider() {
super("Configuration Based NodeLabels Provider");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
long taskInterval = conf.getLong(
YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
this.setIntervalTime(taskInterval);
super.serviceInit(conf);
}
private void updateNodeLabelsFromConfig(Configuration conf)
throws IOException {
String configuredNodePartition =
conf.get(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_PARTITION, null);
setNodeLabels(convertToNodeLabelSet(configuredNodePartition));
setDescriptors(convertToNodeLabelSet(configuredNodePartition));
}
private class ConfigurationMonitorTimerTask extends TimerTask {

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import java.util.Set;
/**
* Abstract class which will be responsible for fetching the node attributes.
*
*/
public abstract class NodeAttributesProvider
extends AbstractNodeDescriptorsProvider<NodeAttribute> {
public NodeAttributesProvider(String name) {
super(name);
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.util.Set;
/**
* Interface which will be responsible for fetching node descriptors,
* a node descriptor could be a
* {@link org.apache.hadoop.yarn.api.records.NodeLabel} or a
* {@link org.apache.hadoop.yarn.api.records.NodeAttribute}.
*/
public interface NodeDescriptorsProvider<T> {
/**
* Provides the descriptors. The provider is expected to give same
* descriptors continuously until there is a change.
* If null is returned then an empty set is assumed by the caller.
*
* @return Set of node descriptors applicable for a node
*/
Set<T> getDescriptors();
/**
* Sets a set of descriptors to the provider.
* @param descriptors node descriptors.
*/
void setDescriptors(Set<T> descriptors);
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import org.apache.hadoop.util.Shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.TimerTask;
/**
* A node descriptors script runner periodically runs a script,
* parses the output to collect desired descriptors, and then
* post these descriptors to the given {@link NodeDescriptorsProvider}.
* @param <T> a certain type of descriptor.
*/
public abstract class NodeDescriptorsScriptRunner<T> extends TimerTask {
private final static Logger LOG = LoggerFactory
.getLogger(NodeDescriptorsScriptRunner.class);
private final Shell.ShellCommandExecutor exec;
private final NodeDescriptorsProvider provider;
public NodeDescriptorsScriptRunner(String scriptPath,
String[] scriptArgs, long scriptTimeout,
NodeDescriptorsProvider ndProvider) {
ArrayList<String> execScript = new ArrayList<>();
execScript.add(scriptPath);
if (scriptArgs != null) {
execScript.addAll(Arrays.asList(scriptArgs));
}
this.provider = ndProvider;
this.exec = new Shell.ShellCommandExecutor(
execScript.toArray(new String[execScript.size()]), null, null,
scriptTimeout);
}
@Override
public void run() {
try {
exec.execute();
provider.setDescriptors(parseOutput(exec.getOutput()));
} catch (Exception e) {
if (exec.isTimedOut()) {
LOG.warn("Node Labels script timed out, Caught exception : "
+ e.getMessage(), e);
} else {
LOG.warn("Execution of Node Labels script failed, Caught exception : "
+ e.getMessage(), e);
}
}
}
public void cleanUp() {
if (exec != null) {
Process p = exec.getProcess();
if (p != null) {
p.destroy();
}
}
}
abstract Set<T> parseOutput(String scriptOutput) throws IOException;
}

View File

@ -18,22 +18,16 @@
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeLabel;
/**
* Interface which will be responsible for fetching the labels
*
* Abstract class which will be responsible for fetching the node labels.
*
*/
public interface NodeLabelsProvider {
public abstract class NodeLabelsProvider
extends AbstractNodeDescriptorsProvider<NodeLabel>{
/**
* Provides the labels. LabelProvider is expected to give same Labels
* continuously until there is a change in labels.
* If null is returned then Empty label set is assumed by the caller.
*
* @return Set of node label strings applicable for a node
*/
public abstract Set<NodeLabel> getNodeLabels();
}
public NodeLabelsProvider(String name) {
super(name);
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.TimerTask;
import static org.apache.hadoop.yarn.conf.YarnConfiguration
.NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_PATH;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.
NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_OPTS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.
NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.
NM_NODE_ATTRIBUTES_PROVIDER_FETCH_TIMEOUT_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.
DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.
DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_TIMEOUT_MS;
/**
* Node attribute provider that periodically runs a script to collect
* node attributes.
*/
public class ScriptBasedNodeAttributesProvider extends NodeAttributesProvider{
private static final String NODE_ATTRIBUTE_PATTERN = "NODE_ATTRIBUTE:";
private static final String NODE_ATTRIBUTE_DELIMITER = ",";
private NodeAttributeScriptRunner runner;
public ScriptBasedNodeAttributesProvider() {
super(ScriptBasedNodeAttributesProvider.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
String nodeAttributeProviderScript = conf.get(
NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_PATH);
long scriptTimeout = conf.getLong(
NM_NODE_ATTRIBUTES_PROVIDER_FETCH_TIMEOUT_MS,
DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_TIMEOUT_MS);
String[] scriptArgs = conf.getStrings(
NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_OPTS,
new String[] {});
verifyConfiguredScript(nodeAttributeProviderScript);
long intervalTime = conf.getLong(
NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS,
DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS);
this.setIntervalTime(intervalTime);
this.runner = new NodeAttributeScriptRunner(nodeAttributeProviderScript,
scriptArgs, scriptTimeout, this);
}
@Override
protected void cleanUp() throws Exception {
runner.cleanUp();
}
@Override
public TimerTask createTimerTask() {
return runner;
}
private static class NodeAttributeScriptRunner extends
NodeDescriptorsScriptRunner<NodeAttribute> {
NodeAttributeScriptRunner(String scriptPath, String[] scriptArgs,
long scriptTimeout, ScriptBasedNodeAttributesProvider provider) {
super(scriptPath, scriptArgs, scriptTimeout, provider);
}
@Override
Set<NodeAttribute> parseOutput(String scriptOutput) throws IOException {
Set<NodeAttribute> attributeSet = new HashSet<>();
// TODO finalize format
// each line is a record of ndoe attribute like following:
// NODE_ATTRIBUTE:ATTRIBUTE_NAME,ATTRIBUTE_TYPE,ATTRIBUTE_VALUE
String[] splits = scriptOutput.split("\n");
for (String line : splits) {
String trimmedLine = line.trim();
if (trimmedLine.startsWith(NODE_ATTRIBUTE_PATTERN)) {
String nodeAttribute = trimmedLine
.substring(NODE_ATTRIBUTE_PATTERN.length());
String[] attributeStrs = nodeAttribute
.split(NODE_ATTRIBUTE_DELIMITER);
if (attributeStrs.length != 3) {
throw new IOException("Malformed output, expecting format "
+ NODE_ATTRIBUTE_PATTERN + ":" + "ATTRIBUTE_NAME"
+ NODE_ATTRIBUTE_DELIMITER + "ATTRIBUTE_TYPE"
+ NODE_ATTRIBUTE_DELIMITER + "ATTRIBUTE_VALUE; but get "
+ nodeAttribute);
}
NodeAttribute na = NodeAttribute
.newInstance(attributeStrs[0],
NodeAttributeType.valueOf(attributeStrs[1]),
attributeStrs[2]);
attributeSet.add(na);
}
}
return attributeSet;
}
}
}

View File

@ -18,19 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -40,20 +32,12 @@
* pattern which will be used to search node label partition from the out put of
* the NodeLabels provider script
*/
public class ScriptBasedNodeLabelsProvider extends AbstractNodeLabelsProvider {
/** Absolute path to the node labels script. */
private String nodeLabelsScriptPath;
/** Time after which the script should be timed out */
private long scriptTimeout;
/** ShellCommandExecutor used to execute monitoring script */
ShellCommandExecutor shexec = null;
public class ScriptBasedNodeLabelsProvider extends NodeLabelsProvider {
/** Pattern used for searching in the output of the node labels script */
public static final String NODE_LABEL_PARTITION_PATTERN = "NODE_PARTITION:";
private String[] scriptArgs;
private NodeDescriptorsScriptRunner runner;
public ScriptBasedNodeLabelsProvider() {
super(ScriptBasedNodeLabelsProvider.class.getName());
@ -64,48 +48,24 @@ public ScriptBasedNodeLabelsProvider() {
*/
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
this.nodeLabelsScriptPath =
String nodeLabelsScriptPath =
conf.get(YarnConfiguration.NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PATH);
this.scriptTimeout =
long scriptTimeout =
conf.getLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS,
YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS);
scriptArgs = conf.getStrings(
String[] scriptArgs = conf.getStrings(
YarnConfiguration.NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_SCRIPT_OPTS,
new String[] {});
verifyConfiguredScript(nodeLabelsScriptPath);
verifyConfiguredScript();
}
long taskInterval = conf.getLong(
YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
this.setIntervalTime(taskInterval);
this.runner = new NodeLabelScriptRunner(nodeLabelsScriptPath, scriptArgs,
scriptTimeout, this);
/**
* Method used to determine if or not node labels fetching script is
* configured and whether it is fit to run. Returns true if following
* conditions are met:
*
* <ol>
* <li>Path to Node Labels fetch script is not empty</li>
* <li>Node Labels fetch script file exists</li>
* </ol>
*
* @throws IOException
*/
private void verifyConfiguredScript()
throws IOException {
boolean invalidConfiguration = false;
if (nodeLabelsScriptPath == null
|| nodeLabelsScriptPath.trim().isEmpty()) {
invalidConfiguration = true;
} else {
File f = new File(nodeLabelsScriptPath);
invalidConfiguration = !f.exists() || !FileUtil.canExecute(f);
}
if (invalidConfiguration) {
throw new IOException(
"Distributed Node labels provider script \"" + nodeLabelsScriptPath
+ "\" is not configured properly. Please check whether the script "
+ "path exists, owner and the access rights are suitable for NM "
+ "process to execute it");
}
super.serviceInit(conf);
}
/**
@ -113,53 +73,19 @@ private void verifyConfiguredScript()
*/
@Override
public void cleanUp() {
if (shexec != null) {
Process p = shexec.getProcess();
if (p != null) {
p.destroy();
}
if (runner != null) {
runner.cleanUp();
}
}
@Override
public TimerTask createTimerTask() {
return new NodeLabelsScriptRunner();
}
// A script runner periodically runs a script to get node labels,
// and sets these labels to the given provider.
private static class NodeLabelScriptRunner extends
NodeDescriptorsScriptRunner<NodeLabel> {
/**
* Class which is used by the {@link Timer} class to periodically execute the
* node labels script.
*/
private class NodeLabelsScriptRunner extends TimerTask {
private final Logger LOG =
LoggerFactory.getLogger(NodeLabelsScriptRunner.class);
public NodeLabelsScriptRunner() {
ArrayList<String> execScript = new ArrayList<String>();
execScript.add(nodeLabelsScriptPath);
if (scriptArgs != null) {
execScript.addAll(Arrays.asList(scriptArgs));
}
shexec = new ShellCommandExecutor(
execScript.toArray(new String[execScript.size()]), null, null,
scriptTimeout);
}
@Override
public void run() {
try {
shexec.execute();
setNodeLabels(fetchLabelsFromScriptOutput(shexec.getOutput()));
} catch (Exception e) {
if (shexec.isTimedOut()) {
LOG.warn("Node Labels script timed out, Caught exception : "
+ e.getMessage(), e);
} else {
LOG.warn("Execution of Node Labels script failed, Caught exception : "
+ e.getMessage(), e);
}
}
NodeLabelScriptRunner(String scriptPath, String[] scriptArgs,
long scriptTimeout, ScriptBasedNodeLabelsProvider provider) {
super(scriptPath, scriptArgs, scriptTimeout, provider);
}
/**
@ -170,7 +96,8 @@ public void run() {
* @return true if output string has error pattern in it.
* @throws IOException
*/
private Set<NodeLabel> fetchLabelsFromScriptOutput(String scriptOutput)
@Override
Set<NodeLabel> parseOutput(String scriptOutput)
throws IOException {
String nodePartitionLabel = null;
String[] splits = scriptOutput.split("\n");
@ -184,4 +111,9 @@ private Set<NodeLabel> fetchLabelsFromScriptOutput(String scriptOutput)
return convertToNodeLabelSet(nodePartitionLabel);
}
}
@Override
public TimerTask createTimerTask() {
return runner;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package to encapsulate classes used to handle node labels and node
* attributes in NM.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -26,6 +26,7 @@
import java.lang.Thread.State;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.TimerTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.ServerSocketUtil;
@ -179,17 +180,27 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager(
}
}
public static class DummyNodeLabelsProvider implements NodeLabelsProvider {
public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
private Set<NodeLabel> nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
@Override
public synchronized Set<NodeLabel> getNodeLabels() {
return nodeLabels;
public DummyNodeLabelsProvider() {
super("DummyNodeLabelsProvider");
// disable the fetch timer.
setIntervalTime(-1);
}
synchronized void setNodeLabels(Set<NodeLabel> nodeLabels) {
this.nodeLabels = nodeLabels;
@Override
protected void cleanUp() throws Exception {
// fake implementation, nothing to cleanup
}
@Override
public TimerTask createTimerTask() {
return new TimerTask() {
@Override
public void run() {
setDescriptors(CommonNodeLabelsManager.EMPTY_NODELABEL_SET);
}
};
}
}
@ -241,18 +252,18 @@ protected void stopRMProxy() {
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
resourceTracker.waitTillRegister();
assertNLCollectionEquals(dummyLabelsProviderRef.getNodeLabels(),
assertNLCollectionEquals(dummyLabelsProviderRef.getDescriptors(),
resourceTracker.labels);
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat with updated labels
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P"));
dummyLabelsProviderRef.setDescriptors(toNodeLabelSet("P"));
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNLCollectionEquals(dummyLabelsProviderRef.getNodeLabels(),
assertNLCollectionEquals(dummyLabelsProviderRef.getDescriptors(),
resourceTracker.labels);
resourceTracker.resetNMHeartbeatReceiveFlag();
@ -265,7 +276,7 @@ protected void stopRMProxy() {
resourceTracker.labels);
// provider return with null labels
dummyLabelsProviderRef.setNodeLabels(null);
dummyLabelsProviderRef.setDescriptors(null);
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNotNull(
@ -279,7 +290,7 @@ protected void stopRMProxy() {
// so that every sec 1 heartbeat is send.
int nullLabels = 0;
int nonNullLabels = 0;
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1"));
dummyLabelsProviderRef.setDescriptors(toNodeLabelSet("P1"));
for (int i = 0; i < 5; i++) {
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
@ -331,19 +342,19 @@ protected void stopRMProxy() {
};
}
};
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P"));
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:"
+ ServerSocketUtil.getPort(8040, 10));
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
dummyLabelsProviderRef.setDescriptors(toNodeLabelSet("P"));
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat with invalid labels
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("_.P"));
dummyLabelsProviderRef.setDescriptors(toNodeLabelSet("_.P"));
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();

View File

@ -98,32 +98,34 @@ public void testNodeLabelsFromConfig() throws IOException,
// test for ensuring labels are set during initialization of the class
nodeLabelsProvider.start();
assertNLCollectionEquals(toNodeLabelSet("A"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
// test for valid Modification
TimerTask timerTask = nodeLabelsProvider.getTimerTask();
modifyConf("X");
timerTask.run();
assertNLCollectionEquals(toNodeLabelSet("X"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
}
@Test
public void testConfigForNoTimer() throws Exception {
Configuration conf = new Configuration();
modifyConf("A");
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
conf.setLong(YarnConfiguration
.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
AbstractNodeDescriptorsProvider
.DISABLE_NODE_DESCRIPTORS_PROVIDER_FETCH_TIMER);
nodeLabelsProvider.init(conf);
nodeLabelsProvider.start();
Assert
.assertNull(
"Timer is not expected to be created when interval is configured as -1",
nodeLabelsProvider.nodeLabelsScheduler);
// Ensure that even though timer is not run, node labels are fetched at least once so
// that NM registers/updates Labels with RM
.assertNull("Timer is not expected to be"
+ " created when interval is configured as -1",
nodeLabelsProvider.getScheduler());
// Ensure that even though timer is not run, node labels
// are fetched at least once so that NM registers/updates Labels with RM
assertNLCollectionEquals(toNodeLabelSet("A"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
}
@Test
@ -138,11 +140,11 @@ public void testConfigTimer() throws Exception {
// least once so
// that NM registers/updates Labels with RM
assertNLCollectionEquals(toNodeLabelSet("A"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
modifyConf("X");
Thread.sleep(1500);
assertNLCollectionEquals(toNodeLabelSet("X"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Set;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
/**
* Test cases for script based node attributes provider.
*/
public class TestScriptBasedNodeAttributesProvider {
private static File testRootDir = new File("target",
TestScriptBasedNodeAttributesProvider.class.getName() + "-localDir")
.getAbsoluteFile();
private final File nodeAttributeScript =
new File(testRootDir, Shell.appendScriptExtension("attributeScript"));
private ScriptBasedNodeAttributesProvider nodeAttributesProvider;
@Before
public void setup() {
testRootDir.mkdirs();
nodeAttributesProvider = new ScriptBasedNodeAttributesProvider();
}
@After
public void tearDown() throws Exception {
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext()
.delete(new Path(testRootDir.getAbsolutePath()), true);
}
if (nodeAttributesProvider != null) {
nodeAttributesProvider.stop();
}
}
private Configuration getConfForNodeAttributeScript() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_SCRIPT_BASED_NODE_ATTRIBUTES_PROVIDER_PATH,
nodeAttributeScript.getAbsolutePath());
// set bigger interval so that test cases can be run
conf.setLong(
YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS,
1000);
conf.setLong(
YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_TIMEOUT_MS,
1000);
return conf;
}
private void writeNodeAttributeScriptFile(String scriptStr,
boolean setExecutable) throws IOException {
PrintWriter pw = null;
try {
FileUtil.setWritable(nodeAttributeScript, true);
FileUtil.setReadable(nodeAttributeScript, true);
pw = new PrintWriter(new FileOutputStream(nodeAttributeScript));
pw.println(scriptStr);
pw.flush();
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
} finally {
if (null != pw) {
pw.close();
}
}
FileUtil.setExecutable(nodeAttributeScript, setExecutable);
}
@Test
public void testNodeAttributeScriptProvider()
throws IOException, InterruptedException {
String simpleScript = "echo NODE_ATTRIBUTE:host,STRING,host1234\n "
+ "echo NODE_ATTRIBUTE:os,STRING,redhat_6_3\n "
+ "echo NODE_ATTRIBUTE:ip,STRING,10.0.0.1";
writeNodeAttributeScriptFile(simpleScript, true);
nodeAttributesProvider.init(getConfForNodeAttributeScript());
nodeAttributesProvider.start();
try {
GenericTestUtils.waitFor(
() -> nodeAttributesProvider.getDescriptors().size() == 3,
500, 3000);
} catch (TimeoutException e) {
Assert.fail("Expecting node attributes size is 3, but got "
+ nodeAttributesProvider.getDescriptors().size());
}
Iterator<NodeAttribute> it = nodeAttributesProvider
.getDescriptors().iterator();
while (it.hasNext()) {
NodeAttribute att = it.next();
switch (att.getAttributeName()) {
case "host":
Assert.assertEquals(NodeAttributeType.STRING, att.getAttributeType());
Assert.assertEquals("host1234", att.getAttributeValue());
break;
case "os":
Assert.assertEquals(NodeAttributeType.STRING, att.getAttributeType());
Assert.assertEquals("redhat_6_3", att.getAttributeValue());
break;
case "ip":
Assert.assertEquals(NodeAttributeType.STRING, att.getAttributeType());
Assert.assertEquals("10.0.0.1", att.getAttributeValue());
break;
default:
Assert.fail("Unexpected attribute name " + att.getAttributeName());
break;
}
}
}
@Test
public void testInvalidScriptOutput()
throws IOException, InterruptedException {
// Script output doesn't have correct prefix.
String scriptContent = "echo host,STRING,host1234";
writeNodeAttributeScriptFile(scriptContent, true);
nodeAttributesProvider.init(getConfForNodeAttributeScript());
nodeAttributesProvider.start();
try {
GenericTestUtils.waitFor(
() -> nodeAttributesProvider.getDescriptors().size() == 1,
500, 3000);
Assert.fail("This test should timeout because the provide is unable"
+ " to parse any attributes from the script output.");
} catch (TimeoutException e) {
Assert.assertEquals(0, nodeAttributesProvider
.getDescriptors().size());
}
}
@Test
public void testMalformedScriptOutput() throws Exception{
// Script output has correct prefix but each line is malformed.
String scriptContent =
"echo NODE_ATTRIBUTE:host,STRING,host1234,a_extra_column";
writeNodeAttributeScriptFile(scriptContent, true);
nodeAttributesProvider.init(getConfForNodeAttributeScript());
nodeAttributesProvider.start();
// There should be no attributes found, and we should
// see Malformed output warnings in the log
try {
GenericTestUtils
.waitFor(() -> nodeAttributesProvider
.getDescriptors().size() == 1,
500, 3000);
Assert.fail("This test should timeout because the provide is unable"
+ " to parse any attributes from the script output.");
} catch (TimeoutException e) {
Assert.assertEquals(0, nodeAttributesProvider
.getDescriptors().size());
}
}
@Test
public void testFetchInterval() throws Exception {
// The script returns the pid (as an attribute) each time runs this script
String simpleScript = "echo NODE_ATTRIBUTE:pid,STRING,$$";
writeNodeAttributeScriptFile(simpleScript, true);
nodeAttributesProvider.init(getConfForNodeAttributeScript());
nodeAttributesProvider.start();
// Wait for at most 3 seconds until we get at least 1
// different attribute value.
Set<String> resultSet = new HashSet<>();
GenericTestUtils.waitFor(() -> {
Set<NodeAttribute> attributes =
nodeAttributesProvider.getDescriptors();
if (attributes != null) {
Assert.assertEquals(1, attributes.size());
resultSet.add(attributes.iterator().next().getAttributeValue());
return resultSet.size() > 1;
} else {
return false;
}
}, 500, 3000);
}
}

View File

@ -151,19 +151,21 @@ private void initilizeServiceFailTest(String message,
@Test
public void testConfigForNoTimer() throws Exception {
Configuration conf = getConfForNodeLabelScript();
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
conf.setLong(YarnConfiguration
.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
AbstractNodeDescriptorsProvider
.DISABLE_NODE_DESCRIPTORS_PROVIDER_FETCH_TIMER);
String normalScript = "echo NODE_PARTITION:X86";
writeNodeLabelsScriptFile(normalScript, true);
nodeLabelsProvider.init(conf);
nodeLabelsProvider.start();
Assert.assertNull(
"Timer is not expected to be created when interval is configured as -1",
nodeLabelsProvider.nodeLabelsScheduler);
nodeLabelsProvider.getScheduler());
// Ensure that even though timer is not run script is run at least once so
// that NM registers/updates Labels with RM
assertNLCollectionEquals(toNodeLabelSet("X86"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
}
@Test
@ -185,25 +187,25 @@ public void testNodeLabelsScript() throws Exception {
Assert.assertNull(
"Node Label Script runner should return null when script doesnt "
+ "give any Labels output",
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
writeNodeLabelsScriptFile(normalScript, true);
timerTask.run();
assertNLCollectionEquals(toNodeLabelSet("Windows"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
// multiple lines with partition tag then the last line's partition info
// needs to be taken.
writeNodeLabelsScriptFile(scrptWithMultipleLinesHavingNodeLabels, true);
timerTask.run();
assertNLCollectionEquals(toNodeLabelSet("JDK1_6"),
nodeLabelsProvider.getNodeLabels());
nodeLabelsProvider.getDescriptors());
// timeout script.
writeNodeLabelsScriptFile(timeOutScript, true);
timerTask.run();
Assert.assertNotEquals("Node Labels should not be set after timeout ",
toNodeLabelSet("ALL"), nodeLabelsProvider.getNodeLabels());
toNodeLabelSet("ALL"), nodeLabelsProvider.getDescriptors());
}
}