YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed Node Label Configuration Setup. (Naganarasimha G R)
This commit is contained in:
parent
0bc15cb6e6
commit
fc07464d1a
|
@ -175,6 +175,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4055. Report node resource utilization in heartbeat.
|
||||
(Inigo Goiri via kasha)
|
||||
|
||||
YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed
|
||||
Node Label Configuration Setup. (Naganarasimha G R)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -1967,6 +1967,36 @@ public class YarnConfiguration extends Configuration {
|
|||
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
|
||||
}
|
||||
|
||||
private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
|
||||
+ "node-labels.";
|
||||
|
||||
public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
|
||||
NM_NODE_LABELS_PREFIX + "provider";
|
||||
|
||||
// whitelist names for the yarn.nodemanager.node-labels.provider
|
||||
public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
|
||||
|
||||
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
|
||||
NM_NODE_LABELS_PREFIX + "provider.";
|
||||
|
||||
// If -1 is configured then no timer task should be created
|
||||
public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
|
||||
|
||||
public static final String NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-timeout-ms";
|
||||
|
||||
// once in 10 mins
|
||||
public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
|
||||
10 * 60 * 1000;
|
||||
|
||||
// Twice of default interval time
|
||||
public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS =
|
||||
DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS * 2;
|
||||
|
||||
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
|
||||
|
||||
public YarnConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
|
|
@ -916,7 +916,7 @@ public class CommonNodeLabelsManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkAndThrowLabelName(String label) throws IOException {
|
||||
public static void checkAndThrowLabelName(String label) throws IOException {
|
||||
if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
|
||||
throw new IOException("label added is empty or exceeds "
|
||||
+ MAX_LABEL_LENGTH + " character(s)");
|
||||
|
|
|
@ -2092,6 +2092,53 @@
|
|||
<value>centralized</value>
|
||||
</property>
|
||||
|
||||
<!-- Distributed Node Labels Configuration -->
|
||||
<property>
|
||||
<description>
|
||||
When node labels "yarn.node-labels.configuration-type" is
|
||||
of type "distributed" Administrators can configure the source of the
|
||||
node labels provider by configuring this parameter. Administrators can
|
||||
specify either "config" or the class name of the provider. Configured
|
||||
class needs to extend
|
||||
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider.
|
||||
If "config" is specified then, "ConfigurationNodeLabelsProvider" will
|
||||
be used.
|
||||
</description>
|
||||
<name>yarn.nodemanager.node-labels.provider</name>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
When node labels "yarn.nodemanager.node-labels.provider" is of type
|
||||
"config" or the configured class extends AbstractNodeLabelsProvider then
|
||||
periodically node labels are retrieved from the node labels provider.
|
||||
This configuration is to define the interval. If -1 is configured then
|
||||
node labels are retrieved from. provider only during initialization.
|
||||
Defaults to 10 mins.
|
||||
</description>
|
||||
<name>yarn.nodemanager.node-labels.provider.fetch-interval-ms</name>
|
||||
<value>600000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
When node labels "yarn.nodemanager.node-labels.provider"
|
||||
is of type "config" then ConfigurationNodeLabelsProvider fetches the
|
||||
labels this parameter.
|
||||
</description>
|
||||
<name>yarn.nodemanager.node-labels.provider.configured-node-labels</name>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
When node labels "yarn.nodemanager.node-labels.provider" is a class
|
||||
which extends AbstractNodeLabelsProvider then this configuration provides
|
||||
the timeout period after which it will stop querying the Node labels
|
||||
provider. Defaults to 20 mins.
|
||||
</description>
|
||||
<name>yarn.nodemanager.node-labels.provider.fetch-timeout-ms</name>
|
||||
<value>1200000</value>
|
||||
</property>
|
||||
<!-- Other Configuration -->
|
||||
|
||||
<property>
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
|
@ -122,12 +123,38 @@ public class NodeManager extends CompositeService
|
|||
metrics, nodeLabelsProvider);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected NodeLabelsProvider createNodeLabelsProvider(
|
||||
Configuration conf) throws IOException {
|
||||
// TODO as part of YARN-2729
|
||||
// Need to get the implementation of provider service and return
|
||||
return null;
|
||||
protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
|
||||
throws IOException {
|
||||
NodeLabelsProvider provider = null;
|
||||
String providerString =
|
||||
conf.get(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null);
|
||||
if (providerString == null || providerString.trim().length() == 0) {
|
||||
// Seems like Distributed Node Labels configuration is not enabled
|
||||
return provider;
|
||||
}
|
||||
switch (providerString.trim().toLowerCase()) {
|
||||
case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
|
||||
provider = new ConfigurationNodeLabelsProvider();
|
||||
break;
|
||||
default:
|
||||
try {
|
||||
Class<? extends NodeLabelsProvider> labelsProviderClass =
|
||||
conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null,
|
||||
NodeLabelsProvider.class);
|
||||
provider = labelsProviderClass.newInstance();
|
||||
} catch (InstantiationException | IllegalAccessException
|
||||
| RuntimeException e) {
|
||||
LOG.error("Failed to create NodeLabelsProvider based on Configuration",
|
||||
e);
|
||||
throw new IOException("Failed to create NodeLabelsProvider : "
|
||||
+ e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Distributed Node Labels is enabled"
|
||||
+ " with provider class as : " + provider.getClass().toString());
|
||||
}
|
||||
return provider;
|
||||
}
|
||||
|
||||
protected NodeResourceMonitor createNodeResourceMonitor() {
|
||||
|
|
|
@ -137,8 +137,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
private boolean registeredWithRM = false;
|
||||
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
|
||||
|
||||
private final NodeLabelsProvider nodeLabelsProvider;
|
||||
private final boolean hasNodeLabelsProvider;
|
||||
private NMNodeLabelsHandler nodeLabelsHandler;
|
||||
|
||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||
|
@ -150,8 +149,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
NodeLabelsProvider nodeLabelsProvider) {
|
||||
super(NodeStatusUpdaterImpl.class.getName());
|
||||
this.healthChecker = healthChecker;
|
||||
this.nodeLabelsProvider = nodeLabelsProvider;
|
||||
this.hasNodeLabelsProvider = (nodeLabelsProvider != null);
|
||||
nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
|
||||
this.context = context;
|
||||
this.dispatcher = dispatcher;
|
||||
this.metrics = metrics;
|
||||
|
@ -313,13 +311,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
protected void registerWithRM()
|
||||
throws YarnException, IOException {
|
||||
List<NMContainerStatus> containerReports = getNMContainerStatuses();
|
||||
Set<NodeLabel> nodeLabels = null;
|
||||
if (hasNodeLabelsProvider) {
|
||||
nodeLabels = nodeLabelsProvider.getNodeLabels();
|
||||
nodeLabels =
|
||||
(null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
|
||||
: nodeLabels;
|
||||
}
|
||||
Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
|
||||
RegisterNodeManagerRequest request =
|
||||
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
|
||||
nodeManagerVersionId, containerReports, getRunningApplications(),
|
||||
|
@ -380,14 +372,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
.append(this.nodeId).append(" with total resource of ")
|
||||
.append(this.totalResource);
|
||||
|
||||
if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
|
||||
successfullRegistrationMsg
|
||||
.append(" and with following Node label(s) : {")
|
||||
.append(StringUtils.join(",", nodeLabels)).append("}");
|
||||
} else if (hasNodeLabelsProvider) {
|
||||
//case where provider is set but RM did not accept the Node Labels
|
||||
LOG.error(regNMResponse.getDiagnosticsMessage());
|
||||
}
|
||||
successfullRegistrationMsg.append(nodeLabelsHandler
|
||||
.verifyRMRegistrationResponseForNodeLabels(regNMResponse));
|
||||
|
||||
LOG.info(successfullRegistrationMsg);
|
||||
LOG.info("Notifying ContainerManager to unblock new container-requests");
|
||||
|
@ -688,33 +674,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
@SuppressWarnings("unchecked")
|
||||
public void run() {
|
||||
int lastHeartbeatID = 0;
|
||||
Set<NodeLabel> lastUpdatedNodeLabelsToRM = null;
|
||||
if (hasNodeLabelsProvider) {
|
||||
lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
|
||||
lastUpdatedNodeLabelsToRM =
|
||||
(null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
|
||||
: lastUpdatedNodeLabelsToRM;
|
||||
}
|
||||
while (!isStopped) {
|
||||
// Send heartbeat
|
||||
try {
|
||||
NodeHeartbeatResponse response = null;
|
||||
Set<NodeLabel> nodeLabelsForHeartbeat = null;
|
||||
Set<NodeLabel> nodeLabelsForHeartbeat =
|
||||
nodeLabelsHandler.getNodeLabelsForHeartbeat();
|
||||
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
|
||||
|
||||
if (hasNodeLabelsProvider) {
|
||||
nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
|
||||
// if the provider returns null then consider empty labels are set
|
||||
nodeLabelsForHeartbeat =
|
||||
(nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
|
||||
: nodeLabelsForHeartbeat;
|
||||
if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
|
||||
lastUpdatedNodeLabelsToRM)) {
|
||||
// if nodelabels have not changed then no need to send
|
||||
nodeLabelsForHeartbeat = null;
|
||||
}
|
||||
}
|
||||
|
||||
NodeHeartbeatRequest request =
|
||||
NodeHeartbeatRequest.newInstance(nodeStatus,
|
||||
NodeStatusUpdaterImpl.this.context
|
||||
|
@ -740,9 +707,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
updateMasterKeys(response);
|
||||
|
||||
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
||||
LOG
|
||||
.warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat,"
|
||||
+ " hence shutting down.");
|
||||
LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
|
||||
+ " heartbeat, hence shutting down.");
|
||||
LOG.warn("Message from ResourceManager: "
|
||||
+ response.getDiagnosticsMessage());
|
||||
context.setDecommissioned(true);
|
||||
|
@ -764,16 +730,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
break;
|
||||
}
|
||||
|
||||
if (response.getAreNodeLabelsAcceptedByRM()) {
|
||||
lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat;
|
||||
LOG.info("Node Labels {"
|
||||
+ StringUtils.join(",", nodeLabelsForHeartbeat)
|
||||
+ "} were Accepted by RM ");
|
||||
} else if (nodeLabelsForHeartbeat != null) {
|
||||
// case where NodeLabelsProvider is set and updated labels were
|
||||
// sent to RM and RM rejected the labels
|
||||
LOG.error(response.getDiagnosticsMessage());
|
||||
}
|
||||
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response);
|
||||
|
||||
// Explicitly put this method after checking the resync response. We
|
||||
// don't want to remove the completed containers before resync
|
||||
|
@ -833,23 +790,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Caller should take care of sending non null nodelabels for both
|
||||
* arguments
|
||||
*
|
||||
* @param nodeLabelsNew
|
||||
* @param nodeLabelsOld
|
||||
* @return if the New node labels are diff from the older one.
|
||||
*/
|
||||
private boolean areNodeLabelsUpdated(Set<NodeLabel> nodeLabelsNew,
|
||||
Set<NodeLabel> nodeLabelsOld) {
|
||||
if (nodeLabelsNew.size() != nodeLabelsOld.size()
|
||||
|| !nodeLabelsOld.containsAll(nodeLabelsNew)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void updateMasterKeys(NodeHeartbeatResponse response) {
|
||||
// See if the master-key has rolled over
|
||||
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
|
||||
|
@ -879,4 +819,183 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
reports.addAll(logAggregationReportForAppsTempList);
|
||||
return reports;
|
||||
}
|
||||
|
||||
private NMNodeLabelsHandler createNMNodeLabelsHandler(
|
||||
NodeLabelsProvider nodeLabelsProvider) {
|
||||
if (nodeLabelsProvider == null) {
|
||||
return new NMCentralizedNodeLabelsHandler();
|
||||
} else {
|
||||
return new NMDistributedNodeLabelsHandler(nodeLabelsProvider);
|
||||
}
|
||||
}
|
||||
|
||||
private static interface NMNodeLabelsHandler {
|
||||
/**
|
||||
* validates nodeLabels From Provider and returns it to the caller. Also
|
||||
* ensures that if provider returns null then empty label set is considered
|
||||
*/
|
||||
Set<NodeLabel> getNodeLabelsForRegistration();
|
||||
|
||||
/**
|
||||
* @return RMRegistration Success message and on failure will log
|
||||
* independently and returns empty string
|
||||
*/
|
||||
String verifyRMRegistrationResponseForNodeLabels(
|
||||
RegisterNodeManagerResponse regNMResponse);
|
||||
|
||||
/**
|
||||
* If nodeLabels From Provider is different previous node labels then it
|
||||
* will check the syntax correctness and throws exception if invalid. If
|
||||
* valid, returns nodeLabels From Provider. Also ensures that if provider
|
||||
* returns null then empty label set is considered and If labels are not
|
||||
* modified it returns null.
|
||||
*/
|
||||
Set<NodeLabel> getNodeLabelsForHeartbeat();
|
||||
|
||||
/**
|
||||
* check whether if updated labels sent to RM was accepted or not
|
||||
* @param response
|
||||
*/
|
||||
void verifyRMHeartbeatResponseForNodeLabels(NodeHeartbeatResponse response);
|
||||
}
|
||||
|
||||
/**
|
||||
* In centralized configuration, NM need not send Node labels or process the
|
||||
* response
|
||||
*/
|
||||
private static class NMCentralizedNodeLabelsHandler
|
||||
implements NMNodeLabelsHandler {
|
||||
@Override
|
||||
public Set<NodeLabel> getNodeLabelsForHeartbeat() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<NodeLabel> getNodeLabelsForRegistration() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void verifyRMHeartbeatResponseForNodeLabels(
|
||||
NodeHeartbeatResponse response) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String verifyRMRegistrationResponseForNodeLabels(
|
||||
RegisterNodeManagerResponse regNMResponse) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
private static class NMDistributedNodeLabelsHandler
|
||||
implements NMNodeLabelsHandler {
|
||||
private NMDistributedNodeLabelsHandler(
|
||||
NodeLabelsProvider nodeLabelsProvider) {
|
||||
this.nodeLabelsProvider = nodeLabelsProvider;
|
||||
}
|
||||
|
||||
private final NodeLabelsProvider nodeLabelsProvider;
|
||||
private Set<NodeLabel> previousNodeLabels;
|
||||
private boolean updatedLabelsSentToRM;
|
||||
|
||||
@Override
|
||||
public Set<NodeLabel> getNodeLabelsForRegistration() {
|
||||
Set<NodeLabel> nodeLabels = nodeLabelsProvider.getNodeLabels();
|
||||
nodeLabels = (null == nodeLabels)
|
||||
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels;
|
||||
previousNodeLabels = nodeLabels;
|
||||
try {
|
||||
validateNodeLabels(nodeLabels);
|
||||
} catch (IOException e) {
|
||||
nodeLabels = null;
|
||||
}
|
||||
return nodeLabels;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String verifyRMRegistrationResponseForNodeLabels(
|
||||
RegisterNodeManagerResponse regNMResponse) {
|
||||
StringBuilder successfulNodeLabelsRegistrationMsg = new StringBuilder("");
|
||||
if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
|
||||
successfulNodeLabelsRegistrationMsg
|
||||
.append(" and with following Node label(s) : {")
|
||||
.append(StringUtils.join(",", previousNodeLabels)).append("}");
|
||||
} else {
|
||||
// case where provider is set but RM did not accept the Node Labels
|
||||
LOG.error(regNMResponse.getDiagnosticsMessage());
|
||||
}
|
||||
return successfulNodeLabelsRegistrationMsg.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<NodeLabel> getNodeLabelsForHeartbeat() {
|
||||
Set<NodeLabel> nodeLabelsForHeartbeat =
|
||||
nodeLabelsProvider.getNodeLabels();
|
||||
// if the provider returns null then consider empty labels are set
|
||||
nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null)
|
||||
? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
|
||||
: nodeLabelsForHeartbeat;
|
||||
// take some action only on modification of labels
|
||||
boolean areNodeLabelsUpdated =
|
||||
nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
|
||||
|| !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
|
||||
|
||||
updatedLabelsSentToRM = false;
|
||||
if (areNodeLabelsUpdated) {
|
||||
previousNodeLabels = nodeLabelsForHeartbeat;
|
||||
try {
|
||||
validateNodeLabels(nodeLabelsForHeartbeat);
|
||||
updatedLabelsSentToRM = true;
|
||||
} catch (IOException e) {
|
||||
// set previous node labels to invalid set, so that invalid
|
||||
// labels are not verified for every HB, and send empty set
|
||||
// to RM to have same nodeLabels which was earlier set.
|
||||
nodeLabelsForHeartbeat = null;
|
||||
}
|
||||
} else {
|
||||
// if nodelabels have not changed then no need to send
|
||||
nodeLabelsForHeartbeat = null;
|
||||
}
|
||||
return nodeLabelsForHeartbeat;
|
||||
}
|
||||
|
||||
private void validateNodeLabels(Set<NodeLabel> nodeLabelsForHeartbeat)
|
||||
throws IOException {
|
||||
Iterator<NodeLabel> iterator = nodeLabelsForHeartbeat.iterator();
|
||||
boolean hasInvalidLabel = false;
|
||||
StringBuilder errorMsg = new StringBuilder("");
|
||||
while (iterator.hasNext()) {
|
||||
try {
|
||||
CommonNodeLabelsManager
|
||||
.checkAndThrowLabelName(iterator.next().getName());
|
||||
} catch (IOException e) {
|
||||
errorMsg.append(e.getMessage());
|
||||
errorMsg.append(" , ");
|
||||
hasInvalidLabel = true;
|
||||
}
|
||||
}
|
||||
if (hasInvalidLabel) {
|
||||
LOG.error("Invalid Node Label(s) from Provider : " + errorMsg);
|
||||
throw new IOException(errorMsg.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void verifyRMHeartbeatResponseForNodeLabels(
|
||||
NodeHeartbeatResponse response) {
|
||||
if (updatedLabelsSentToRM) {
|
||||
if (response.getAreNodeLabelsAcceptedByRM()) {
|
||||
LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
|
||||
+ "} were Accepted by RM ");
|
||||
} else {
|
||||
// case where updated labels from NodeLabelsProvider is sent to RM and
|
||||
// RM rejected the labels
|
||||
LOG.error(
|
||||
"NM node labels {" + StringUtils.join(",", previousNodeLabels)
|
||||
+ "} were not accepted by RM and message from RM : "
|
||||
+ response.getDiagnosticsMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Provides base implementation of NodeLabelsProvider with Timer and expects
|
||||
* subclass to provide TimerTask which can fetch NodeLabels
|
||||
*/
|
||||
public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider {
|
||||
public static final long DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER = -1;
|
||||
|
||||
// Delay after which timer task are triggered to fetch NodeLabels
|
||||
protected long intervalTime;
|
||||
|
||||
// Timer used to schedule node labels fetching
|
||||
protected Timer nodeLabelsScheduler;
|
||||
|
||||
public static final String NODE_LABELS_SEPRATOR = ",";
|
||||
|
||||
protected Lock readLock = null;
|
||||
protected Lock writeLock = null;
|
||||
|
||||
protected TimerTask timerTask;
|
||||
|
||||
protected Set<NodeLabel> nodeLabels =
|
||||
CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
|
||||
|
||||
@VisibleForTesting
|
||||
long startTime = 0;
|
||||
|
||||
public AbstractNodeLabelsProvider(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.intervalTime =
|
||||
conf.getLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
|
||||
|
||||
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
readLock = readWriteLock.readLock();
|
||||
writeLock = readWriteLock.writeLock();
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
timerTask = createTimerTask();
|
||||
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
|
||||
nodeLabelsScheduler =
|
||||
new Timer("DistributedNodeLabelsRunner-Timer", true);
|
||||
// Start the timer task and then periodically at the configured interval
|
||||
// time. Illegal values for intervalTime is handled by timer api
|
||||
nodeLabelsScheduler.scheduleAtFixedRate(timerTask, startTime,
|
||||
intervalTime);
|
||||
}
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
/**
|
||||
* terminate the timer
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (nodeLabelsScheduler != null) {
|
||||
nodeLabelsScheduler.cancel();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns output from provider.
|
||||
*/
|
||||
@Override
|
||||
public Set<NodeLabel> getNodeLabels() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return nodeLabels;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void setNodeLabels(Set<NodeLabel> nodeLabelsSet) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
nodeLabels = nodeLabelsSet;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used only by tests to access the timer task directly
|
||||
*
|
||||
* @return the timer task
|
||||
*/
|
||||
TimerTask getTimerTask() {
|
||||
return timerTask;
|
||||
}
|
||||
|
||||
static Set<NodeLabel> convertToNodeLabelSet(Set<String> nodeLabels) {
|
||||
if (null == nodeLabels) {
|
||||
return null;
|
||||
}
|
||||
Set<NodeLabel> labels = new HashSet<NodeLabel>();
|
||||
for (String label : nodeLabels) {
|
||||
labels.add(NodeLabel.newInstance(label));
|
||||
}
|
||||
return labels;
|
||||
}
|
||||
|
||||
public abstract TimerTask createTimerTask();
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* Provides Node's Labels by constantly monitoring the configuration.
|
||||
*/
|
||||
public class ConfigurationNodeLabelsProvider extends AbstractNodeLabelsProvider {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(ConfigurationNodeLabelsProvider.class);
|
||||
|
||||
public ConfigurationNodeLabelsProvider() {
|
||||
super("Configuration Based NodeLabels Provider");
|
||||
}
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
// In case timer is not configured avoid calling timertask.run thus avoiding
|
||||
// unnecessary creation of YarnConfiguration Object
|
||||
updateNodeLabelsFromConfig(conf);
|
||||
if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
|
||||
startTime = new Date().getTime() + intervalTime;
|
||||
}
|
||||
}
|
||||
|
||||
private void updateNodeLabelsFromConfig(Configuration conf)
|
||||
throws IOException {
|
||||
String confLabelString =
|
||||
conf.get(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, null);
|
||||
String[] nodeLabelsFromConfiguration =
|
||||
(confLabelString == null || confLabelString.isEmpty()) ? new String[] {}
|
||||
: StringUtils.getStrings(confLabelString);
|
||||
setNodeLabels(convertToNodeLabelSet(new HashSet<String>(
|
||||
Arrays.asList(nodeLabelsFromConfiguration))));
|
||||
}
|
||||
|
||||
private class ConfigurationMonitorTimerTask extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
updateNodeLabelsFromConfig(new YarnConfiguration());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to update node Labels from configuration.xml ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimerTask createTimerTask() {
|
||||
return new ConfigurationMonitorTimerTask();
|
||||
}
|
||||
}
|
|
@ -22,8 +22,11 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestNodeManager {
|
||||
|
@ -54,4 +57,49 @@ public class TestNodeManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreationOfNodeLabelsProviderService()
|
||||
throws InterruptedException {
|
||||
try {
|
||||
NodeManager nodeManager = new NodeManager();
|
||||
Configuration conf = new Configuration();
|
||||
NodeLabelsProvider labelsProviderService =
|
||||
nodeManager.createNodeLabelsProvider(conf);
|
||||
Assert
|
||||
.assertNull(
|
||||
"LabelsProviderService should not be initialized in default configuration",
|
||||
labelsProviderService);
|
||||
|
||||
// With valid className
|
||||
conf.set(
|
||||
YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
|
||||
"org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider");
|
||||
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
|
||||
Assert.assertNotNull("LabelsProviderService should be initialized When "
|
||||
+ "node labels provider class is configured", labelsProviderService);
|
||||
|
||||
// With invalid className
|
||||
conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
|
||||
"org.apache.hadoop.yarn.server.nodemanager.NodeManager");
|
||||
try {
|
||||
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
|
||||
Assert.fail("Expected to throw IOException on Invalid configuration");
|
||||
} catch (IOException e) {
|
||||
// exception expected on invalid configuration
|
||||
}
|
||||
Assert.assertNotNull("LabelsProviderService should be initialized When "
|
||||
+ "node labels provider class is configured", labelsProviderService);
|
||||
|
||||
// With valid whitelisted configurations
|
||||
conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
|
||||
YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER);
|
||||
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
|
||||
Assert.assertNotNull("LabelsProviderService should be initialized When "
|
||||
+ "node labels provider class is configured", labelsProviderService);
|
||||
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Exception caught");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -108,7 +109,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
|
|||
if (receivedNMHeartbeat) {
|
||||
return;
|
||||
}
|
||||
int i = 500;
|
||||
int i = 10;
|
||||
while (!receivedNMHeartbeat && i > 0) {
|
||||
synchronized (ResourceTrackerForLabels.class) {
|
||||
if (!receivedNMHeartbeat) {
|
||||
|
@ -193,7 +194,6 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
|
|||
|
||||
public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Set<NodeLabel> nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
|
||||
|
||||
public DummyNodeLabelsProvider() {
|
||||
|
@ -224,8 +224,8 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
|
|||
new ResourceTrackerForLabels();
|
||||
nm = new NodeManager() {
|
||||
@Override
|
||||
protected NodeLabelsProvider createNodeLabelsProvider(
|
||||
Configuration conf) throws IOException {
|
||||
protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
|
||||
throws IOException {
|
||||
return dummyLabelsProviderRef;
|
||||
}
|
||||
|
||||
|
@ -255,8 +255,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
|
|||
nm.start();
|
||||
resourceTracker.waitTillRegister();
|
||||
assertNLCollectionEquals(resourceTracker.labels,
|
||||
dummyLabelsProviderRef
|
||||
.getNodeLabels());
|
||||
dummyLabelsProviderRef.getNodeLabels());
|
||||
|
||||
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
|
||||
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||
|
@ -283,10 +282,71 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
|
|||
dummyLabelsProviderRef.setNodeLabels(null);
|
||||
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||
resourceTracker.waitTillHeartbeat();
|
||||
assertNotNull(
|
||||
"If provider sends null then empty label set should be sent and not null",
|
||||
resourceTracker.labels);
|
||||
assertTrue("If provider sends null then empty labels should be sent",
|
||||
resourceTracker.labels.isEmpty());
|
||||
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||
|
||||
nm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidNodeLabelsFromProvider() throws InterruptedException,
|
||||
IOException {
|
||||
final ResourceTrackerForLabels resourceTracker =
|
||||
new ResourceTrackerForLabels();
|
||||
nm = new NodeManager() {
|
||||
@Override
|
||||
protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
|
||||
throws IOException {
|
||||
return dummyLabelsProviderRef;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
||||
NodeLabelsProvider labelsProvider) {
|
||||
|
||||
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
|
||||
metrics, labelsProvider) {
|
||||
@Override
|
||||
protected ResourceTracker getRMClient() {
|
||||
return resourceTracker;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopRMProxy() {
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P"));
|
||||
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
|
||||
nm.init(conf);
|
||||
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||
nm.start();
|
||||
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
|
||||
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||
|
||||
// heartbeat with invalid labels
|
||||
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("_.P"));
|
||||
|
||||
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||
resourceTracker.waitTillHeartbeat();
|
||||
assertNull("On Invalid Labels we need to retain earlier labels, HB "
|
||||
+ "needs to send null", resourceTracker.labels);
|
||||
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||
|
||||
// on next heartbeat same invalid labels will be given by the provider, but
|
||||
// again label validation check and reset RM with empty labels set should
|
||||
// not happen
|
||||
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||
resourceTracker.waitTillHeartbeat();
|
||||
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||
assertNull("NodeStatusUpdater need not send repeatedly empty labels on "
|
||||
+ "invalid labels from provider ", resourceTracker.labels);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
|
||||
|
||||
protected static File testRootDir = new File("target",
|
||||
TestConfigurationNodeLabelsProvider.class.getName() + "-localDir")
|
||||
.getAbsoluteFile();
|
||||
|
||||
final static File nodeLabelsConfigFile = new File(testRootDir,
|
||||
"yarn-site.xml");
|
||||
|
||||
private static XMLPathClassLoader loader;
|
||||
|
||||
private ConfigurationNodeLabelsProvider nodeLabelsProvider;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
loader =
|
||||
new XMLPathClassLoader(
|
||||
TestConfigurationNodeLabelsProvider.class.getClassLoader());
|
||||
testRootDir.mkdirs();
|
||||
|
||||
nodeLabelsProvider = new ConfigurationNodeLabelsProvider();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (nodeLabelsProvider != null) {
|
||||
nodeLabelsProvider.close();
|
||||
}
|
||||
if (testRootDir.exists()) {
|
||||
FileContext.getLocalFSFileContext().delete(
|
||||
new Path(testRootDir.getAbsolutePath()), true);
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration getConfForNodeLabels() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, "A,B,CX");
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLabelsFromConfig() throws IOException,
|
||||
InterruptedException {
|
||||
Configuration conf = getConfForNodeLabels();
|
||||
nodeLabelsProvider.init(conf);
|
||||
// test for ensuring labels are set during initialization of the class
|
||||
nodeLabelsProvider.start();
|
||||
Thread.sleep(1000l); // sleep so that timer has run once during
|
||||
// initialization
|
||||
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
// test for valid Modification
|
||||
TimerTask timerTask = nodeLabelsProvider.getTimerTask();
|
||||
modifyConfAndCallTimer(timerTask, "X,y,Z");
|
||||
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigForNoTimer() throws Exception {
|
||||
Configuration conf = getConfForNodeLabels();
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
|
||||
|
||||
nodeLabelsProvider.init(conf);
|
||||
nodeLabelsProvider.start();
|
||||
Assert
|
||||
.assertNull(
|
||||
"Timer is not expected to be created when interval is configured as -1",
|
||||
nodeLabelsProvider.nodeLabelsScheduler);
|
||||
// Ensure that even though timer is not run, node labels are fetched at least once so
|
||||
// that NM registers/updates Labels with RM
|
||||
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
|
||||
private static void modifyConfAndCallTimer(TimerTask timerTask,
|
||||
String nodeLabels) throws FileNotFoundException, IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels);
|
||||
conf.writeXml(new FileOutputStream(nodeLabelsConfigFile));
|
||||
ClassLoader actualLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
timerTask.run();
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(actualLoader);
|
||||
}
|
||||
}
|
||||
|
||||
private static class XMLPathClassLoader extends ClassLoader {
|
||||
public XMLPathClassLoader(ClassLoader wrapper) {
|
||||
super(wrapper);
|
||||
}
|
||||
|
||||
public URL getResource(String name) {
|
||||
if (name.equals(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)) {
|
||||
try {
|
||||
return nodeLabelsConfigFile.toURI().toURL();
|
||||
} catch (MalformedURLException e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
return super.getResource(name);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue