YARN-6483. Add nodes transitioning to DECOMMISSIONING state to the list of updated nodes returned to the AM. (Juan Rodriguez Hortala via asuresh)

This commit is contained in:
Arun Suresh 2017-11-22 19:16:44 -08:00
parent aab439593b
commit b46ca7e73b
26 changed files with 494 additions and 110 deletions

View File

@ -53,7 +53,8 @@ public abstract class NodeReport {
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime) {
return newInstance(nodeId, nodeState, httpAddress, rackName, used,
capability, numContainers, healthReport, lastHealthReportTime, null);
capability, numContainers, healthReport, lastHealthReportTime,
null, null, null);
}
@Private
@ -61,7 +62,8 @@ public abstract class NodeReport {
public static NodeReport newInstance(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
Set<String> nodeLabels) {
Set<String> nodeLabels, Integer decommissioningTimeout,
NodeUpdateType nodeUpdateType) {
NodeReport nodeReport = Records.newRecord(NodeReport.class);
nodeReport.setNodeId(nodeId);
nodeReport.setNodeState(nodeState);
@ -73,6 +75,8 @@ public abstract class NodeReport {
nodeReport.setHealthReport(healthReport);
nodeReport.setLastHealthReportTime(lastHealthReportTime);
nodeReport.setNodeLabels(nodeLabels);
nodeReport.setDecommissioningTimeout(decommissioningTimeout);
nodeReport.setNodeUpdateType(nodeUpdateType);
return nodeReport;
}
@ -186,8 +190,8 @@ public abstract class NodeReport {
public abstract void setLastHealthReportTime(long lastHealthReport);
/**
* Get labels of this node
* @return labels of this node
* Get labels of this node.
* @return labels of this node.
*/
@Public
@Stable
@ -198,8 +202,8 @@ public abstract class NodeReport {
public abstract void setNodeLabels(Set<String> nodeLabels);
/**
* Get containers aggregated resource utilization in a node
* @return containers resource utilization
* Get containers aggregated resource utilization in a node.
* @return containers resource utilization.
*/
@Public
@Stable
@ -217,8 +221,8 @@ public abstract class NodeReport {
}
/**
* Get node resource utilization
* @return node resource utilization
* Get node resource utilization.
* @return node resource utilization.
*/
@Public
@Stable
@ -227,4 +231,31 @@ public abstract class NodeReport {
@Private
@Unstable
public abstract void setNodeUtilization(ResourceUtilization nodeUtilization);
/**
* Optional decommissioning timeout in seconds (null indicates absent
* timeout).
* @return the decommissioning timeout in second.
*/
public Integer getDecommissioningTimeout() {
return null;
}
/**
* Set the decommissioning timeout in seconds (null indicates absent timeout).
* */
public void setDecommissioningTimeout(Integer decommissioningTimeout) {}
/**
* Optional node update type (null indicates absent update type).
* @return the node update.
*/
public NodeUpdateType getNodeUpdateType() {
return NodeUpdateType.NODE_UNUSABLE;
}
/**
* Set the node update type (null indicates absent node update type).
* */
public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
/**
* <p>Taxonomy of the <code>NodeState</code> that a
* <code>Node</code> might transition into.</p>
* */
public enum NodeUpdateType {
NODE_USABLE,
NODE_UNUSABLE,
NODE_DECOMMISSIONING
}

View File

@ -332,6 +332,12 @@ message NodeIdProto {
optional int32 port = 2;
}
enum NodeUpdateTypeProto {
NODE_USABLE = 0;
NODE_UNUSABLE = 1;
NODE_DECOMMISSIONING = 2;
}
message NodeReportProto {
optional NodeIdProto nodeId = 1;
optional string httpAddress = 2;
@ -345,6 +351,8 @@ message NodeReportProto {
repeated string node_labels = 10;
optional ResourceUtilizationProto containers_utilization = 11;
optional ResourceUtilizationProto node_utilization = 12;
optional uint32 decommissioning_timeout = 13;
optional NodeUpdateTypeProto node_update_type = 14;
}
message NodeIdToLabelsProto {

View File

@ -618,7 +618,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
}
public ApplicationReport createFakeAppReport() {
ApplicationId appId = ApplicationId.newInstance(1000l, 1);
ApplicationId appId = ApplicationId.newInstance(1000L, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
// create a fake application report
@ -626,7 +626,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
ApplicationReport.newInstance(appId, attemptId, "fakeUser",
"fakeQueue", "fakeApplicationName", "localhost", 0, null,
YarnApplicationState.FINISHED, "fake an application report", "",
1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
1000L, 1200L, FinalApplicationStatus.FAILED, null, "", 50f,
"fakeApplicationType", null);
return report;
}
@ -638,7 +638,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
}
public ApplicationId createFakeAppId() {
return ApplicationId.newInstance(1000l, 1);
return ApplicationId.newInstance(1000L, 1);
}
public ApplicationAttemptId createFakeApplicationAttemptId() {
@ -657,7 +657,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
NodeId nodeId = NodeId.newInstance("localhost", 0);
NodeReport report =
NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
"rack1", null, null, 4, null, 1000l, null);
"rack1", null, null, 4, null, 1000L);
List<NodeReport> reports = new ArrayList<NodeReport>();
reports.add(report);
return reports;
@ -680,8 +680,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public ApplicationAttemptReport createFakeApplicationAttemptReport() {
return ApplicationAttemptReport.newInstance(
createFakeApplicationAttemptId(), "localhost", 0, "", "", "",
YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000l,
1200l);
YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000L,
1200L);
}
public List<ApplicationAttemptReport>
@ -694,7 +694,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public ContainerReport createFakeContainerReport() {
return ContainerReport.newInstance(createFakeContainerId(), null,
NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
NodeId.newInstance("localhost", 0), null, 1000L, 1200L, "", "", 0,
ContainerState.COMPLETE,
"http://" + NodeId.newInstance("localhost", 0).toString());
}

View File

@ -1992,7 +1992,7 @@ public class TestYarnCLI {
NodeReport nodeReport = NodeReport.newInstance(NodeId
.newInstance("host" + i, 0), state, "host" + 1 + ":8888",
"rack1", Records.newRecord(Resource.class), Records
.newRecord(Resource.class), 0, "", 0, nodeLabels);
.newRecord(Resource.class), 0, "", 0, nodeLabels, null, null);
if (!emptyResourceUtilization) {
ResourceUtilization containersUtilization = ResourceUtilization
.newInstance(1024, 2048, 4);

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@ -150,8 +151,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null)
if (nodeId == null) {
builder.clearNodeId();
}
this.nodeId = nodeId;
}
@ -177,8 +179,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setCapability(Resource capability) {
maybeInitBuilder();
if (capability == null)
if (capability == null) {
builder.clearCapability();
}
this.capability = capability;
}
@ -215,8 +218,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setUsed(Resource used) {
maybeInitBuilder();
if (used == null)
if (used == null) {
builder.clearUsed();
}
this.used = used;
}
@ -234,8 +238,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public boolean equals(Object other) {
if (other == null)
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
@ -278,8 +283,9 @@ public class NodeReportPBImpl extends NodeReport {
}
private void mergeLocalToProto() {
if (viaProto)
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
@ -387,4 +393,38 @@ public class NodeReportPBImpl extends NodeReport {
}
this.nodeUtilization = nodeResourceUtilization;
}
@Override
public Integer getDecommissioningTimeout() {
NodeReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasDecommissioningTimeout())
? p.getDecommissioningTimeout() : null;
}
@Override
public void setDecommissioningTimeout(Integer decommissioningTimeout) {
maybeInitBuilder();
if (decommissioningTimeout == null || decommissioningTimeout < 0) {
builder.clearDecommissioningTimeout();
return;
}
builder.setDecommissioningTimeout(decommissioningTimeout);
}
@Override
public NodeUpdateType getNodeUpdateType() {
NodeReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNodeUpdateType()) ?
ProtoUtils.convertFromProtoFormat(p.getNodeUpdateType()) : null;
}
@Override
public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {
maybeInitBuilder();
if (nodeUpdateType == null) {
builder.clearNodeUpdateType();
return;
}
builder.setNodeUpdateType(ProtoUtils.convertToProtoFormat(nodeUpdateType));
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto;
import org.apache.hadoop.yarn.server.api.ContainerType;
@ -343,6 +345,16 @@ public class ProtoUtils {
return ContainerType.valueOf(e.name());
}
/*
* NodeUpdateType
*/
public static NodeUpdateTypeProto convertToProtoFormat(NodeUpdateType e) {
return NodeUpdateTypeProto.valueOf(e.name());
}
public static NodeUpdateType convertFromProtoFormat(NodeUpdateTypeProto e) {
return NodeUpdateType.valueOf(e.name());
}
/*
* ExecutionType
*/

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -187,23 +188,26 @@ public class BuilderUtils {
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime) {
return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
capability, numContainers, healthReport, lastHealthReportTime, null);
capability, numContainers, healthReport, lastHealthReportTime,
null, null, null);
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
Set<String> nodeLabels) {
Set<String> nodeLabels, Integer decommissioningTimeout,
NodeUpdateType nodeUpdateType) {
return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
capability, numContainers, healthReport, lastHealthReportTime,
nodeLabels, null, null);
nodeLabels, null, null, decommissioningTimeout, nodeUpdateType);
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
Set<String> nodeLabels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
ResourceUtilization nodeUtilization, Integer decommissioningTimeout,
NodeUpdateType nodeUpdateType) {
NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
nodeReport.setNodeId(nodeId);
nodeReport.setNodeState(nodeState);
@ -217,6 +221,8 @@ public class BuilderUtils {
nodeReport.setNodeLabels(nodeLabels);
nodeReport.setAggregatedContainersUtilization(containersUtilization);
nodeReport.setNodeUtilization(nodeUtilization);
nodeReport.setDecommissioningTimeout(decommissioningTimeout);
nodeReport.setNodeUpdateType(nodeUpdateType);
return nodeReport;
}

View File

@ -1024,7 +1024,7 @@ public class ClientRMService extends AbstractService implements
return response;
}
private NodeReport createNodeReports(RMNode rmNode) {
private NodeReport createNodeReports(RMNode rmNode) {
SchedulerNodeReport schedulerNodeReport =
scheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
@ -1040,7 +1040,8 @@ public class ClientRMService extends AbstractService implements
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(),
rmNode.getNodeUtilization());
rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(),
null);
return report;
}

View File

@ -72,11 +72,6 @@ public class DecommissioningNodesWatcher {
private final RMContext rmContext;
// Default timeout value in mills.
// Negative value indicates no timeout. 0 means immediate.
private long defaultTimeoutMs =
1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
// Once a RMNode is observed in DECOMMISSIONING state,
// All its ContainerStatus update are tracked inside DecomNodeContext.
class DecommissioningNodeContext {
@ -105,16 +100,15 @@ public class DecommissioningNodesWatcher {
private long lastUpdateTime;
public DecommissioningNodeContext(NodeId nodeId) {
public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
this.nodeId = nodeId;
this.appIds = new HashSet<ApplicationId>();
this.decommissioningStartTime = mclock.getTime();
this.timeoutMs = defaultTimeoutMs;
this.timeoutMs = 1000L * timeoutSec;
}
void updateTimeout(Integer timeoutSec) {
this.timeoutMs = (timeoutSec == null)?
defaultTimeoutMs : (1000L * timeoutSec);
void updateTimeout(int timeoutSec) {
this.timeoutMs = 1000L * timeoutSec;
}
}
@ -132,7 +126,6 @@ public class DecommissioningNodesWatcher {
}
public void init(Configuration conf) {
readDecommissioningTimeout(conf);
int v = conf.getInt(
YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
YarnConfiguration
@ -162,7 +155,8 @@ public class DecommissioningNodesWatcher {
}
} else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
if (context == null) {
context = new DecommissioningNodeContext(rmNode.getNodeID());
context = new DecommissioningNodeContext(rmNode.getNodeID(),
rmNode.getDecommissioningTimeout());
decomNodes.put(rmNode.getNodeID(), context);
context.nodeState = rmNode.getState();
context.decommissionedTime = 0;
@ -416,24 +410,4 @@ public class DecommissioningNodesWatcher {
LOG.debug("Decommissioning node: " + sb.toString());
}
}
// Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
// This enables DecommissioningNodesWatcher to pick up new value
// without ResourceManager restart.
private void readDecommissioningTimeout(Configuration conf) {
try {
if (conf == null) {
conf = new YarnConfiguration();
}
int v = conf.getInt(
YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
if (defaultTimeoutMs != 1000L * v) {
defaultTimeoutMs = 1000L * v;
LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
}
} catch (Exception e) {
LOG.info("Error readDecommissioningTimeout ", e);
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@ -83,6 +84,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
@ -326,10 +329,12 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
}
private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
List<RMNode> updatedNodes = new ArrayList<>();
Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<>();
for(RMNode rmNode: updatedNodes) {
for(Map.Entry<RMNode, NodeUpdateType> rmNodeEntry :
updatedNodes.entrySet()) {
RMNode rmNode = rmNodeEntry.getKey();
SchedulerNodeReport schedulerNodeReport =
getScheduler().getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
@ -344,7 +349,8 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout(),
rmNodeEntry.getValue());
updatedNodeReports.add(report);
}

View File

@ -72,6 +72,11 @@ public class NodesListManager extends CompositeService implements
private Configuration conf;
private final RMContext rmContext;
// Default decommissioning timeout value in seconds.
// Negative value indicates no timeout. 0 means immediate.
private int defaultDecTimeoutSecs =
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
private String includesFile;
private String excludesFile;
@ -214,6 +219,11 @@ public class NodesListManager extends CompositeService implements
private void refreshHostsReader(
Configuration yarnConf, boolean graceful, Integer timeout)
throws IOException, YarnException {
// resolve the default timeout to the decommission timeout that is
// configured at this moment
if (null == timeout) {
timeout = readDecommissioningTimeout(yarnConf);
}
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
}
@ -252,7 +262,7 @@ public class NodesListManager extends CompositeService implements
// Gracefully decommission excluded nodes that are not already
// DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
// that are already DECOMMISSIONED or DECOMMISSIONING.
private void handleExcludeNodeList(boolean graceful, Integer timeout) {
private void handleExcludeNodeList(boolean graceful, int timeout) {
// DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
List<RMNode> nodesToRecom = new ArrayList<RMNode>();
@ -463,36 +473,40 @@ public class NodesListManager extends CompositeService implements
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
private void sendRMAppNodeUpdateEventToNonFinalizedApps(
RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) {
for(RMApp app : rmContext.getRMApps().values()) {
if (!app.isAppFinalStateStored()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
appNodeUpdateType));
}
}
}
@Override
public void handle(NodesListManagerEvent event) {
RMNode eventNode = event.getNode();
switch (event.getType()) {
case NODE_UNUSABLE:
LOG.debug(eventNode + " reported unusable");
for(RMApp app: rmContext.getRMApps().values()) {
if (!app.isAppFinalStateStored()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
RMAppNodeUpdateType.NODE_UNUSABLE));
}
}
sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
RMAppNodeUpdateType.NODE_UNUSABLE);
break;
case NODE_USABLE:
LOG.debug(eventNode + " reported usable");
for (RMApp app : rmContext.getRMApps().values()) {
if (!app.isAppFinalStateStored()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
RMAppNodeUpdateType.NODE_USABLE));
}
}
sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
RMAppNodeUpdateType.NODE_USABLE);
break;
case NODE_DECOMMISSIONING:
LOG.debug(eventNode + " reported decommissioning");
sendRMAppNodeUpdateEventToNonFinalizedApps(
eventNode, RMAppNodeUpdateType.NODE_DECOMMISSIONING);
break;
default:
LOG.error("Ignoring invalid eventtype " + event.getType());
}
@ -611,6 +625,28 @@ public class NodesListManager extends CompositeService implements
}
}
// Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
// This enables NodesListManager to pick up new value without
// ResourceManager restart.
private int readDecommissioningTimeout(Configuration pConf) {
try {
if (pConf == null) {
pConf = new YarnConfiguration();
}
int configuredDefaultDecTimeoutSecs =
pConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) {
defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs;
LOG.info("Use new decommissioningTimeoutSecs: "
+ defaultDecTimeoutSecs);
}
} catch (Exception e) {
LOG.warn("Error readDecommissioningTimeout " + e.getMessage());
}
return defaultDecTimeoutSecs;
}
/**
* A NodeId instance needed upon startup for populating inactive nodes Map.
* It only knows the hostname/ip and marks the port to -1 or invalid.

View File

@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
public enum NodesListManagerEventType {
NODE_USABLE,
NODE_UNUSABLE
NODE_UNUSABLE,
NODE_DECOMMISSIONING
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -154,10 +154,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* received by the RMApp. Updates can be node becoming lost or becoming
* healthy etc. The method clears the information from the {@link RMApp}. So
* each call to this method gives the delta from the previous call.
* @param updatedNodes Collection into which the updates are transferred
* @return the number of nodes added to the {@link Collection}
* @param updatedNodes Map into which the updates are transferred, with each
* node updates as the key, and the {@link NodeUpdateType} for that update
* as the corresponding value.
* @return the number of nodes added to the {@link Map}
*/
int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes);
/**
* The finish time of the {@link RMApp}

View File

@ -20,11 +20,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -149,7 +148,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
private final String applicationType;
private final Set<String> applicationTags;
@ -677,11 +676,11 @@ public class RMAppImpl implements RMApp, Recoverable {
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) {
this.writeLock.lock();
try {
int updatedNodeCount = this.updatedNodes.size();
updatedNodes.addAll(this.updatedNodes);
upNodes.putAll(this.updatedNodes);
this.updatedNodes.clear();
return updatedNodeCount;
} finally {
@ -987,7 +986,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
NodeState nodeState = node.getState();
updatedNodes.add(node);
updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type));
LOG.debug("Received node update event:" + type + " for node:" + node
+ " with state:" + nodeState);
}

View File

@ -19,13 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class RMAppNodeUpdateEvent extends RMAppEvent {
public enum RMAppNodeUpdateType {
NODE_USABLE,
NODE_UNUSABLE
NODE_UNUSABLE,
NODE_DECOMMISSIONING;
public static NodeUpdateType convertToNodeUpdateType(
RMAppNodeUpdateType rmAppNodeUpdateType) {
return NodeUpdateType.valueOf(rmAppNodeUpdateType.name());
}
}
private final RMNode node;

View File

@ -183,7 +183,7 @@ public interface RMNode {
void setUntrackedTimeStamp(long timeStamp);
/*
* Optional decommissioning timeout in second
* (null indicates default timeout).
* (null indicates absent timeout).
* @return the decommissioning timeout in second.
*/
Integer getDecommissioningTimeout();

View File

@ -1160,6 +1160,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Update NM metrics during graceful decommissioning.
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
rmNode.decommissioningTimeout = timeout;
// Notify NodesListManager to notify all RMApp so that each
// Application Master could take any required actions.
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_DECOMMISSIONING, rmNode));
if (rmNode.originalTotalCapability == null){
rmNode.originalTotalCapability =
Resources.clone(rmNode.totalCapability);

View File

@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@ -917,12 +918,26 @@ public class MockRM extends ResourceManager {
node.getState());
}
public void sendNodeGracefulDecommission(
MockNM nm, int timeout) throws Exception {
RMNodeImpl node = (RMNodeImpl)
getRMContext().getRMNodes().get(nm.getNodeId());
Assert.assertNotNull("node shouldn't be null", node);
node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout));
}
public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
RMNodeImpl node = (RMNodeImpl)
getRMContext().getRMNodes().get(nm.getNodeId());
Assert.assertNotNull("node shouldn't be null", node);
node.handle(new RMNodeEvent(nm.getNodeId(), event));
}
public Integer getDecommissioningTimeout(NodeId nodeid) {
return this.getRMContext().getRMNodes()
.get(nodeid).getDecommissioningTimeout();
}
public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
KillApplicationRequest req = KillApplicationRequest.newInstance(appId);

View File

@ -180,6 +180,50 @@ public class TestClientRMService {
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
@Test
public void testGetDecommissioningClusterNodes() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
rm.start();
int nodeMemory = 1024;
MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
rm.sendNodeStarted(nm1);
nm1.nodeHeartbeat(true);
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
Integer decommissioningTimeout = 600;
rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout);
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
// Create a client.
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client =
(ApplicationClientProtocol) rpc
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call
List<NodeReport> nodeReports = client.getClusterNodes(
GetClusterNodesRequest.newInstance(
EnumSet.of(NodeState.DECOMMISSIONING)))
.getNodeReports();
Assert.assertEquals(1, nodeReports.size());
NodeReport nr = nodeReports.iterator().next();
Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
Assert.assertNull(nr.getNodeUpdateType());
rpc.stopProxy(client, conf);
rm.close();
}
@Test
public void testGetClusterNodes() throws Exception {
MockRM rm = new MockRM() {
@ -228,6 +272,8 @@ public class TestClientRMService {
// Check node's label = x
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
// Now make the node unhealthy.
node.nodeHeartbeat(false);
@ -251,6 +297,8 @@ public class TestClientRMService {
nodeReports.get(0).getNodeState());
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
// Remove labels of host1
map = new HashMap<NodeId, Set<String>>();
@ -267,6 +315,8 @@ public class TestClientRMService {
for (NodeReport report : nodeReports) {
Assert.assertTrue(report.getNodeLabels() != null
&& report.getNodeLabels().isEmpty());
Assert.assertNull(report.getDecommissioningTimeout());
Assert.assertNull(report.getNodeUpdateType());
}
rpc.stopProxy(client, conf);

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -69,7 +68,8 @@ public class TestDecommissioningNodesWatcher {
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION);
rm.sendNodeGracefulDecommission(nm1,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
// Update status with decreasing number of running containers until 0.

View File

@ -27,7 +27,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
@ -98,13 +100,16 @@ public class TestRMNodeTransitions {
}
private NodesListManagerEvent nodesListManagerEvent = null;
private List<NodeState> nodesListManagerEventsNodeStateSequence =
new LinkedList<>();
private class TestNodeListManagerEventDispatcher implements
EventHandler<NodesListManagerEvent> {
@Override
public void handle(NodesListManagerEvent event) {
nodesListManagerEvent = event;
nodesListManagerEventsNodeStateSequence.add(event.getNode().getState());
}
}
@ -150,7 +155,7 @@ public class TestRMNodeTransitions {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
nodesListManagerEvent = null;
nodesListManagerEventsNodeStateSequence.clear();
}
@After
@ -721,6 +726,8 @@ public class TestRMNodeTransitions {
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert.assertEquals(Arrays.asList(NodeState.NEW, NodeState.RUNNING),
nodesListManagerEventsNodeStateSequence);
Assert
.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
@ -1008,7 +1015,7 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert.assertNotNull(nodesListManagerEvent);
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
Assert.assertEquals(NodesListManagerEventType.NODE_DECOMMISSIONING,
nodesListManagerEvent.getType());
}

View File

@ -39,7 +39,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.OutputKeys;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsSystem;
@ -99,14 +106,19 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
public class TestResourceTrackerService extends NodeLabelTestBase {
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
private final File hostFile =
new File(TEMP_DIR + File.separator + "hostFile.txt");
private final File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
private final File excludeHostXmlFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.xml");
private MockRM rm;
@ -291,6 +303,67 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
}
@Test
public void testGracefulDecommissionDefaultTimeoutResolution()
throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile
.getAbsolutePath());
writeToHostsXmlFile(excludeHostXmlFile, Pair.of("", null));
rm = new MockRM(conf);
rm.start();
int nodeMemory = 1024;
MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
MockNM nm2 = rm.registerNode("host2:5678", nodeMemory);
MockNM nm3 = rm.registerNode("host3:9101", nodeMemory);
NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
Assert.assertTrue(
NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
Assert.assertTrue(
NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
Assert.assertTrue(
NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
// Graceful decommission both host1 and host2, with
// non default timeout for host1
final Integer nm1DecommissionTimeout = 20;
writeToHostsXmlFile(
excludeHostXmlFile,
Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout),
Pair.of(nm2.getNodeId().getHost(), null));
rm.getNodesListManager().refreshNodes(conf, true);
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
Assert.assertEquals(
nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId()));
Integer defaultDecTimeout =
conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
Assert.assertEquals(
defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId()));
// Graceful decommission host3 with a new default timeout
final Integer newDefaultDecTimeout = defaultDecTimeout + 10;
writeToHostsXmlFile(
excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null));
conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
newDefaultDecTimeout);
rm.getNodesListManager().refreshNodes(conf, true);
rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
Assert.assertEquals(
newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId()));
}
/**
* Graceful decommission node with running application.
*/
@ -1967,16 +2040,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.stop();
}
private void ensureFileExists(File file) throws IOException {
if (!file.exists()) {
TEMP_DIR.mkdirs();
file.createNewFile();
}
}
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}
private void writeToHostsFile(File file, String... hosts)
throws IOException {
if (!file.exists()) {
TEMP_DIR.mkdirs();
file.createNewFile();
}
ensureFileExists(file);
FileOutputStream fStream = null;
try {
fStream = new FileOutputStream(file);
@ -1992,6 +2069,33 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
}
private void writeToHostsXmlFile(
File file, Pair<String, Integer>... hostsAndTimeouts) throws Exception {
ensureFileExists(file);
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
Document doc = dbFactory.newDocumentBuilder().newDocument();
Element hosts = doc.createElement("hosts");
doc.appendChild(hosts);
for (Pair<String, Integer> hostsAndTimeout : hostsAndTimeouts) {
Element host = doc.createElement("host");
hosts.appendChild(host);
Element name = doc.createElement("name");
host.appendChild(name);
name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft()));
if (hostsAndTimeout.getRight() != null) {
Element timeout = doc.createElement("timeout");
host.appendChild(timeout);
timeout.appendChild(
doc.createTextNode(hostsAndTimeout.getRight().toString())
);
}
}
TransformerFactory transformerFactory = TransformerFactory.newInstance();
Transformer transformer = transformerFactory.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.transform(new DOMSource(doc), new StreamResult(file));
}
private void checkDecommissionedNMCount(MockRM rm, int count)
throws InterruptedException {
int waitCount = 0;

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -154,7 +154,7 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -82,6 +83,13 @@ public class TestAMRMRPCNodeUpdates {
rm.drainEvents();
}
private void syncNodeGracefulDecommission(
MockNM nm, int timeout) throws Exception {
rm.sendNodeGracefulDecommission(nm, timeout);
rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING);
rm.drainEvents();
}
private AllocateResponse allocate(final ApplicationAttemptId attemptId,
final AllocateRequest req) throws Exception {
UserGroupInformation ugi =
@ -98,6 +106,39 @@ public class TestAMRMRPCNodeUpdates {
});
}
@Test
public void testAMRMDecommissioningNodes() throws Exception {
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
rm.drainEvents();
RMApp app1 = rm.submitApp(2000);
// Trigger the scheduling so the AM gets 'launched' on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
// register AM returns no unusable node
am1.registerAppAttempt();
Integer decommissioningTimeout = 600;
syncNodeGracefulDecommission(nm2, decommissioningTimeout);
AllocateRequest allocateRequest1 =
AllocateRequest.newInstance(0, 0F, null, null, null);
AllocateResponse response1 =
allocate(attempt1.getAppAttemptId(), allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
Assert.assertEquals(
decommissioningTimeout, nr.getDecommissioningTimeout());
Assert.assertEquals(
NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
}
@Test
public void testAMRMUnusableNodes() throws Exception {
@ -138,6 +179,8 @@ public class TestAMRMRPCNodeUpdates {
NodeReport nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
// resending the allocate request returns the same result
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
@ -146,6 +189,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
syncNodeLost(nm3);
@ -159,6 +204,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.LOST, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
// registering another AM gives it the complete failed list
RMApp app2 = rm.submitApp(2000);
@ -190,6 +237,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
allocateRequest2 =
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
@ -200,6 +249,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
// subsequent allocate calls should return no updated nodes
allocateRequest2 =

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -235,7 +235,7 @@ public class MockRMApp implements RMApp {
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
}