YARN-6483. Add nodes transitioning to DECOMMISSIONING state to the list of updated nodes returned to the AM. (Juan Rodriguez Hortala via asuresh)

(cherry picked from commit b46ca7e73b)
This commit is contained in:
Arun Suresh 2017-11-22 19:16:44 -08:00
parent b85eb9bb53
commit 5884b8a80a
26 changed files with 494 additions and 110 deletions

View File

@ -53,7 +53,8 @@ public abstract class NodeReport {
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime) {
return newInstance(nodeId, nodeState, httpAddress, rackName, used,
capability, numContainers, healthReport, lastHealthReportTime, null);
capability, numContainers, healthReport, lastHealthReportTime,
null, null, null);
}
@Private
@ -61,7 +62,8 @@ public abstract class NodeReport {
public static NodeReport newInstance(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
Set<String> nodeLabels) {
Set<String> nodeLabels, Integer decommissioningTimeout,
NodeUpdateType nodeUpdateType) {
NodeReport nodeReport = Records.newRecord(NodeReport.class);
nodeReport.setNodeId(nodeId);
nodeReport.setNodeState(nodeState);
@ -73,6 +75,8 @@ public abstract class NodeReport {
nodeReport.setHealthReport(healthReport);
nodeReport.setLastHealthReportTime(lastHealthReportTime);
nodeReport.setNodeLabels(nodeLabels);
nodeReport.setDecommissioningTimeout(decommissioningTimeout);
nodeReport.setNodeUpdateType(nodeUpdateType);
return nodeReport;
}
@ -186,8 +190,8 @@ public abstract class NodeReport {
public abstract void setLastHealthReportTime(long lastHealthReport);
/**
* Get labels of this node
* @return labels of this node
* Get labels of this node.
* @return labels of this node.
*/
@Public
@Stable
@ -198,8 +202,8 @@ public abstract class NodeReport {
public abstract void setNodeLabels(Set<String> nodeLabels);
/**
* Get containers aggregated resource utilization in a node
* @return containers resource utilization
* Get containers aggregated resource utilization in a node.
* @return containers resource utilization.
*/
@Public
@Stable
@ -217,8 +221,8 @@ public abstract class NodeReport {
}
/**
* Get node resource utilization
* @return node resource utilization
* Get node resource utilization.
* @return node resource utilization.
*/
@Public
@Stable
@ -227,4 +231,31 @@ public abstract class NodeReport {
@Private
@Unstable
public abstract void setNodeUtilization(ResourceUtilization nodeUtilization);
/**
* Optional decommissioning timeout in seconds (null indicates absent
* timeout).
* @return the decommissioning timeout in second.
*/
public Integer getDecommissioningTimeout() {
return null;
}
/**
* Set the decommissioning timeout in seconds (null indicates absent timeout).
* */
public void setDecommissioningTimeout(Integer decommissioningTimeout) {}
/**
* Optional node update type (null indicates absent update type).
* @return the node update.
*/
public NodeUpdateType getNodeUpdateType() {
return NodeUpdateType.NODE_UNUSABLE;
}
/**
* Set the node update type (null indicates absent node update type).
* */
public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
/**
* <p>Taxonomy of the <code>NodeState</code> that a
* <code>Node</code> might transition into.</p>
* */
public enum NodeUpdateType {
NODE_USABLE,
NODE_UNUSABLE,
NODE_DECOMMISSIONING
}

View File

@ -323,6 +323,12 @@ message NodeIdProto {
optional int32 port = 2;
}
enum NodeUpdateTypeProto {
NODE_USABLE = 0;
NODE_UNUSABLE = 1;
NODE_DECOMMISSIONING = 2;
}
message NodeReportProto {
optional NodeIdProto nodeId = 1;
optional string httpAddress = 2;
@ -336,6 +342,8 @@ message NodeReportProto {
repeated string node_labels = 10;
optional ResourceUtilizationProto containers_utilization = 11;
optional ResourceUtilizationProto node_utilization = 12;
optional uint32 decommissioning_timeout = 13;
optional NodeUpdateTypeProto node_update_type = 14;
}
message NodeIdToLabelsProto {

View File

@ -618,7 +618,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
}
public ApplicationReport createFakeAppReport() {
ApplicationId appId = ApplicationId.newInstance(1000l, 1);
ApplicationId appId = ApplicationId.newInstance(1000L, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
// create a fake application report
@ -626,7 +626,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
ApplicationReport.newInstance(appId, attemptId, "fakeUser",
"fakeQueue", "fakeApplicationName", "localhost", 0, null,
YarnApplicationState.FINISHED, "fake an application report", "",
1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
1000L, 1200L, FinalApplicationStatus.FAILED, null, "", 50f,
"fakeApplicationType", null);
return report;
}
@ -638,7 +638,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
}
public ApplicationId createFakeAppId() {
return ApplicationId.newInstance(1000l, 1);
return ApplicationId.newInstance(1000L, 1);
}
public ApplicationAttemptId createFakeApplicationAttemptId() {
@ -657,7 +657,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
NodeId nodeId = NodeId.newInstance("localhost", 0);
NodeReport report =
NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
"rack1", null, null, 4, null, 1000l, null);
"rack1", null, null, 4, null, 1000L);
List<NodeReport> reports = new ArrayList<NodeReport>();
reports.add(report);
return reports;
@ -680,8 +680,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public ApplicationAttemptReport createFakeApplicationAttemptReport() {
return ApplicationAttemptReport.newInstance(
createFakeApplicationAttemptId(), "localhost", 0, "", "", "",
YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000l,
1200l);
YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000L,
1200L);
}
public List<ApplicationAttemptReport>
@ -694,7 +694,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public ContainerReport createFakeContainerReport() {
return ContainerReport.newInstance(createFakeContainerId(), null,
NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
NodeId.newInstance("localhost", 0), null, 1000L, 1200L, "", "", 0,
ContainerState.COMPLETE,
"http://" + NodeId.newInstance("localhost", 0).toString());
}

View File

@ -1992,7 +1992,7 @@ public class TestYarnCLI {
NodeReport nodeReport = NodeReport.newInstance(NodeId
.newInstance("host" + i, 0), state, "host" + 1 + ":8888",
"rack1", Records.newRecord(Resource.class), Records
.newRecord(Resource.class), 0, "", 0, nodeLabels);
.newRecord(Resource.class), 0, "", 0, nodeLabels, null, null);
if (!emptyResourceUtilization) {
ResourceUtilization containersUtilization = ResourceUtilization
.newInstance(1024, 2048, 4);

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@ -150,8 +151,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null)
if (nodeId == null) {
builder.clearNodeId();
}
this.nodeId = nodeId;
}
@ -177,8 +179,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setCapability(Resource capability) {
maybeInitBuilder();
if (capability == null)
if (capability == null) {
builder.clearCapability();
}
this.capability = capability;
}
@ -215,8 +218,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public void setUsed(Resource used) {
maybeInitBuilder();
if (used == null)
if (used == null) {
builder.clearUsed();
}
this.used = used;
}
@ -234,8 +238,9 @@ public class NodeReportPBImpl extends NodeReport {
@Override
public boolean equals(Object other) {
if (other == null)
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
@ -278,8 +283,9 @@ public class NodeReportPBImpl extends NodeReport {
}
private void mergeLocalToProto() {
if (viaProto)
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
@ -387,4 +393,38 @@ public class NodeReportPBImpl extends NodeReport {
}
this.nodeUtilization = nodeResourceUtilization;
}
@Override
public Integer getDecommissioningTimeout() {
NodeReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasDecommissioningTimeout())
? p.getDecommissioningTimeout() : null;
}
@Override
public void setDecommissioningTimeout(Integer decommissioningTimeout) {
maybeInitBuilder();
if (decommissioningTimeout == null || decommissioningTimeout < 0) {
builder.clearDecommissioningTimeout();
return;
}
builder.setDecommissioningTimeout(decommissioningTimeout);
}
@Override
public NodeUpdateType getNodeUpdateType() {
NodeReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNodeUpdateType()) ?
ProtoUtils.convertFromProtoFormat(p.getNodeUpdateType()) : null;
}
@Override
public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {
maybeInitBuilder();
if (nodeUpdateType == null) {
builder.clearNodeUpdateType();
return;
}
builder.setNodeUpdateType(ProtoUtils.convertToProtoFormat(nodeUpdateType));
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto;
import org.apache.hadoop.yarn.server.api.ContainerType;
@ -343,6 +345,16 @@ public class ProtoUtils {
return ContainerType.valueOf(e.name());
}
/*
* NodeUpdateType
*/
public static NodeUpdateTypeProto convertToProtoFormat(NodeUpdateType e) {
return NodeUpdateTypeProto.valueOf(e.name());
}
public static NodeUpdateType convertFromProtoFormat(NodeUpdateTypeProto e) {
return NodeUpdateType.valueOf(e.name());
}
/*
* ExecutionType
*/

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -187,23 +188,26 @@ public class BuilderUtils {
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime) {
return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
capability, numContainers, healthReport, lastHealthReportTime, null);
capability, numContainers, healthReport, lastHealthReportTime,
null, null, null);
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
Set<String> nodeLabels) {
Set<String> nodeLabels, Integer decommissioningTimeout,
NodeUpdateType nodeUpdateType) {
return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
capability, numContainers, healthReport, lastHealthReportTime,
nodeLabels, null, null);
nodeLabels, null, null, decommissioningTimeout, nodeUpdateType);
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, String healthReport, long lastHealthReportTime,
Set<String> nodeLabels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
ResourceUtilization nodeUtilization, Integer decommissioningTimeout,
NodeUpdateType nodeUpdateType) {
NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
nodeReport.setNodeId(nodeId);
nodeReport.setNodeState(nodeState);
@ -217,6 +221,8 @@ public class BuilderUtils {
nodeReport.setNodeLabels(nodeLabels);
nodeReport.setAggregatedContainersUtilization(containersUtilization);
nodeReport.setNodeUtilization(nodeUtilization);
nodeReport.setDecommissioningTimeout(decommissioningTimeout);
nodeReport.setNodeUpdateType(nodeUpdateType);
return nodeReport;
}

View File

@ -1032,7 +1032,8 @@ public class ClientRMService extends AbstractService implements
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(),
rmNode.getNodeUtilization());
rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(),
null);
return report;
}

View File

@ -72,11 +72,6 @@ public class DecommissioningNodesWatcher {
private final RMContext rmContext;
// Default timeout value in mills.
// Negative value indicates no timeout. 0 means immediate.
private long defaultTimeoutMs =
1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
// Once a RMNode is observed in DECOMMISSIONING state,
// All its ContainerStatus update are tracked inside DecomNodeContext.
class DecommissioningNodeContext {
@ -105,16 +100,15 @@ public class DecommissioningNodesWatcher {
private long lastUpdateTime;
public DecommissioningNodeContext(NodeId nodeId) {
public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
this.nodeId = nodeId;
this.appIds = new HashSet<ApplicationId>();
this.decommissioningStartTime = mclock.getTime();
this.timeoutMs = defaultTimeoutMs;
this.timeoutMs = 1000L * timeoutSec;
}
void updateTimeout(Integer timeoutSec) {
this.timeoutMs = (timeoutSec == null)?
defaultTimeoutMs : (1000L * timeoutSec);
void updateTimeout(int timeoutSec) {
this.timeoutMs = 1000L * timeoutSec;
}
}
@ -132,7 +126,6 @@ public class DecommissioningNodesWatcher {
}
public void init(Configuration conf) {
readDecommissioningTimeout(conf);
int v = conf.getInt(
YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
YarnConfiguration
@ -162,7 +155,8 @@ public class DecommissioningNodesWatcher {
}
} else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
if (context == null) {
context = new DecommissioningNodeContext(rmNode.getNodeID());
context = new DecommissioningNodeContext(rmNode.getNodeID(),
rmNode.getDecommissioningTimeout());
decomNodes.put(rmNode.getNodeID(), context);
context.nodeState = rmNode.getState();
context.decommissionedTime = 0;
@ -416,24 +410,4 @@ public class DecommissioningNodesWatcher {
LOG.debug("Decommissioning node: " + sb.toString());
}
}
// Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
// This enables DecommissioningNodesWatcher to pick up new value
// without ResourceManager restart.
private void readDecommissioningTimeout(Configuration conf) {
try {
if (conf == null) {
conf = new YarnConfiguration();
}
int v = conf.getInt(
YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
if (defaultTimeoutMs != 1000L * v) {
defaultTimeoutMs = 1000L * v;
LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
}
} catch (Exception e) {
LOG.info("Error readDecommissioningTimeout ", e);
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@ -82,6 +83,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
@ -317,10 +320,12 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
}
private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
List<RMNode> updatedNodes = new ArrayList<>();
Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<>();
for(RMNode rmNode: updatedNodes) {
for(Map.Entry<RMNode, NodeUpdateType> rmNodeEntry :
updatedNodes.entrySet()) {
RMNode rmNode = rmNodeEntry.getKey();
SchedulerNodeReport schedulerNodeReport =
getScheduler().getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
@ -335,7 +340,8 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout(),
rmNodeEntry.getValue());
updatedNodeReports.add(report);
}

View File

@ -72,6 +72,11 @@ public class NodesListManager extends CompositeService implements
private Configuration conf;
private final RMContext rmContext;
// Default decommissioning timeout value in seconds.
// Negative value indicates no timeout. 0 means immediate.
private int defaultDecTimeoutSecs =
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
private String includesFile;
private String excludesFile;
@ -214,6 +219,11 @@ public class NodesListManager extends CompositeService implements
private void refreshHostsReader(
Configuration yarnConf, boolean graceful, Integer timeout)
throws IOException, YarnException {
// resolve the default timeout to the decommission timeout that is
// configured at this moment
if (null == timeout) {
timeout = readDecommissioningTimeout(yarnConf);
}
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
}
@ -252,7 +262,7 @@ public class NodesListManager extends CompositeService implements
// Gracefully decommission excluded nodes that are not already
// DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
// that are already DECOMMISSIONED or DECOMMISSIONING.
private void handleExcludeNodeList(boolean graceful, Integer timeout) {
private void handleExcludeNodeList(boolean graceful, int timeout) {
// DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
List<RMNode> nodesToRecom = new ArrayList<RMNode>();
@ -458,36 +468,40 @@ public class NodesListManager extends CompositeService implements
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
private void sendRMAppNodeUpdateEventToNonFinalizedApps(
RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) {
for(RMApp app : rmContext.getRMApps().values()) {
if (!app.isAppFinalStateStored()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
appNodeUpdateType));
}
}
}
@Override
public void handle(NodesListManagerEvent event) {
RMNode eventNode = event.getNode();
switch (event.getType()) {
case NODE_UNUSABLE:
LOG.debug(eventNode + " reported unusable");
for(RMApp app: rmContext.getRMApps().values()) {
if (!app.isAppFinalStateStored()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
RMAppNodeUpdateType.NODE_UNUSABLE));
}
}
sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
RMAppNodeUpdateType.NODE_UNUSABLE);
break;
case NODE_USABLE:
LOG.debug(eventNode + " reported usable");
for (RMApp app : rmContext.getRMApps().values()) {
if (!app.isAppFinalStateStored()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
RMAppNodeUpdateType.NODE_USABLE));
}
}
sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
RMAppNodeUpdateType.NODE_USABLE);
break;
case NODE_DECOMMISSIONING:
LOG.debug(eventNode + " reported decommissioning");
sendRMAppNodeUpdateEventToNonFinalizedApps(
eventNode, RMAppNodeUpdateType.NODE_DECOMMISSIONING);
break;
default:
LOG.error("Ignoring invalid eventtype " + event.getType());
}
@ -606,6 +620,28 @@ public class NodesListManager extends CompositeService implements
}
}
// Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
// This enables NodesListManager to pick up new value without
// ResourceManager restart.
private int readDecommissioningTimeout(Configuration pConf) {
try {
if (pConf == null) {
pConf = new YarnConfiguration();
}
int configuredDefaultDecTimeoutSecs =
pConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) {
defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs;
LOG.info("Use new decommissioningTimeoutSecs: "
+ defaultDecTimeoutSecs);
}
} catch (Exception e) {
LOG.warn("Error readDecommissioningTimeout " + e.getMessage());
}
return defaultDecTimeoutSecs;
}
/**
* A NodeId instance needed upon startup for populating inactive nodes Map.
* It only knows the hostname/ip and marks the port to -1 or invalid.

View File

@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
public enum NodesListManagerEventType {
NODE_USABLE,
NODE_UNUSABLE
NODE_UNUSABLE,
NODE_DECOMMISSIONING
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -154,10 +154,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* received by the RMApp. Updates can be node becoming lost or becoming
* healthy etc. The method clears the information from the {@link RMApp}. So
* each call to this method gives the delta from the previous call.
* @param updatedNodes Collection into which the updates are transferred
* @return the number of nodes added to the {@link Collection}
* @param updatedNodes Map into which the updates are transferred, with each
* node updates as the key, and the {@link NodeUpdateType} for that update
* as the corresponding value.
* @return the number of nodes added to the {@link Map}
*/
int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes);
/**
* The finish time of the {@link RMApp}

View File

@ -20,11 +20,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -146,7 +145,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
private final String applicationType;
private final Set<String> applicationTags;
@ -672,11 +671,11 @@ public class RMAppImpl implements RMApp, Recoverable {
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) {
this.writeLock.lock();
try {
int updatedNodeCount = this.updatedNodes.size();
updatedNodes.addAll(this.updatedNodes);
upNodes.putAll(this.updatedNodes);
this.updatedNodes.clear();
return updatedNodeCount;
} finally {
@ -982,7 +981,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
NodeState nodeState = node.getState();
updatedNodes.add(node);
updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type));
LOG.debug("Received node update event:" + type + " for node:" + node
+ " with state:" + nodeState);
}

View File

@ -19,13 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class RMAppNodeUpdateEvent extends RMAppEvent {
public enum RMAppNodeUpdateType {
NODE_USABLE,
NODE_UNUSABLE
NODE_UNUSABLE,
NODE_DECOMMISSIONING;
public static NodeUpdateType convertToNodeUpdateType(
RMAppNodeUpdateType rmAppNodeUpdateType) {
return NodeUpdateType.valueOf(rmAppNodeUpdateType.name());
}
}
private final RMNode node;

View File

@ -183,7 +183,7 @@ public interface RMNode {
void setUntrackedTimeStamp(long timeStamp);
/*
* Optional decommissioning timeout in second
* (null indicates default timeout).
* (null indicates absent timeout).
* @return the decommissioning timeout in second.
*/
Integer getDecommissioningTimeout();

View File

@ -1160,6 +1160,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Update NM metrics during graceful decommissioning.
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
rmNode.decommissioningTimeout = timeout;
// Notify NodesListManager to notify all RMApp so that each
// Application Master could take any required actions.
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_DECOMMISSIONING, rmNode));
if (rmNode.originalTotalCapability == null){
rmNode.originalTotalCapability =
Resources.clone(rmNode.totalCapability);

View File

@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@ -908,12 +909,26 @@ public class MockRM extends ResourceManager {
node.getState());
}
public void sendNodeGracefulDecommission(
MockNM nm, int timeout) throws Exception {
RMNodeImpl node = (RMNodeImpl)
getRMContext().getRMNodes().get(nm.getNodeId());
Assert.assertNotNull("node shouldn't be null", node);
node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout));
}
public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
RMNodeImpl node = (RMNodeImpl)
getRMContext().getRMNodes().get(nm.getNodeId());
Assert.assertNotNull("node shouldn't be null", node);
node.handle(new RMNodeEvent(nm.getNodeId(), event));
}
public Integer getDecommissioningTimeout(NodeId nodeid) {
return this.getRMContext().getRMNodes()
.get(nodeid).getDecommissioningTimeout();
}
public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
KillApplicationRequest req = KillApplicationRequest.newInstance(appId);

View File

@ -180,6 +180,50 @@ public class TestClientRMService {
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
@Test
public void testGetDecommissioningClusterNodes() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
rm.start();
int nodeMemory = 1024;
MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
rm.sendNodeStarted(nm1);
nm1.nodeHeartbeat(true);
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
Integer decommissioningTimeout = 600;
rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout);
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
// Create a client.
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client =
(ApplicationClientProtocol) rpc
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call
List<NodeReport> nodeReports = client.getClusterNodes(
GetClusterNodesRequest.newInstance(
EnumSet.of(NodeState.DECOMMISSIONING)))
.getNodeReports();
Assert.assertEquals(1, nodeReports.size());
NodeReport nr = nodeReports.iterator().next();
Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
Assert.assertNull(nr.getNodeUpdateType());
rpc.stopProxy(client, conf);
rm.close();
}
@Test
public void testGetClusterNodes() throws Exception {
MockRM rm = new MockRM() {
@ -228,6 +272,8 @@ public class TestClientRMService {
// Check node's label = x
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
// Now make the node unhealthy.
node.nodeHeartbeat(false);
@ -251,6 +297,8 @@ public class TestClientRMService {
nodeReports.get(0).getNodeState());
Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
// Remove labels of host1
map = new HashMap<NodeId, Set<String>>();
@ -267,6 +315,8 @@ public class TestClientRMService {
for (NodeReport report : nodeReports) {
Assert.assertTrue(report.getNodeLabels() != null
&& report.getNodeLabels().isEmpty());
Assert.assertNull(report.getDecommissioningTimeout());
Assert.assertNull(report.getNodeUpdateType());
}
rpc.stopProxy(client, conf);

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -69,7 +68,8 @@ public class TestDecommissioningNodesWatcher {
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION);
rm.sendNodeGracefulDecommission(nm1,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
// Update status with decreasing number of running containers until 0.

View File

@ -27,7 +27,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
@ -98,6 +100,8 @@ public class TestRMNodeTransitions {
}
private NodesListManagerEvent nodesListManagerEvent = null;
private List<NodeState> nodesListManagerEventsNodeStateSequence =
new LinkedList<>();
private class TestNodeListManagerEventDispatcher implements
EventHandler<NodesListManagerEvent> {
@ -105,6 +109,7 @@ public class TestRMNodeTransitions {
@Override
public void handle(NodesListManagerEvent event) {
nodesListManagerEvent = event;
nodesListManagerEventsNodeStateSequence.add(event.getNode().getState());
}
}
@ -150,7 +155,7 @@ public class TestRMNodeTransitions {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
nodesListManagerEvent = null;
nodesListManagerEventsNodeStateSequence.clear();
}
@After
@ -721,6 +726,8 @@ public class TestRMNodeTransitions {
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert.assertEquals(Arrays.asList(NodeState.NEW, NodeState.RUNNING),
nodesListManagerEventsNodeStateSequence);
Assert
.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
@ -1008,7 +1015,7 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
Assert.assertNotNull(nodesListManagerEvent);
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
Assert.assertEquals(NodesListManagerEventType.NODE_DECOMMISSIONING,
nodesListManagerEvent.getType());
}

View File

@ -39,7 +39,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.OutputKeys;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsSystem;
@ -99,14 +106,19 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
public class TestResourceTrackerService extends NodeLabelTestBase {
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
private final File hostFile =
new File(TEMP_DIR + File.separator + "hostFile.txt");
private final File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
private final File excludeHostXmlFile =
new File(TEMP_DIR + File.separator + "excludeHostFile.xml");
private MockRM rm;
@ -291,6 +303,67 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
}
@Test
public void testGracefulDecommissionDefaultTimeoutResolution()
throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile
.getAbsolutePath());
writeToHostsXmlFile(excludeHostXmlFile, Pair.of("", null));
rm = new MockRM(conf);
rm.start();
int nodeMemory = 1024;
MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
MockNM nm2 = rm.registerNode("host2:5678", nodeMemory);
MockNM nm3 = rm.registerNode("host3:9101", nodeMemory);
NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
Assert.assertTrue(
NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
Assert.assertTrue(
NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
Assert.assertTrue(
NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
// Graceful decommission both host1 and host2, with
// non default timeout for host1
final Integer nm1DecommissionTimeout = 20;
writeToHostsXmlFile(
excludeHostXmlFile,
Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout),
Pair.of(nm2.getNodeId().getHost(), null));
rm.getNodesListManager().refreshNodes(conf, true);
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
Assert.assertEquals(
nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId()));
Integer defaultDecTimeout =
conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
Assert.assertEquals(
defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId()));
// Graceful decommission host3 with a new default timeout
final Integer newDefaultDecTimeout = defaultDecTimeout + 10;
writeToHostsXmlFile(
excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null));
conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
newDefaultDecTimeout);
rm.getNodesListManager().refreshNodes(conf, true);
rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
Assert.assertEquals(
newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId()));
}
/**
* Graceful decommission node with running application.
*/
@ -1967,16 +2040,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.stop();
}
private void ensureFileExists(File file) throws IOException {
if (!file.exists()) {
TEMP_DIR.mkdirs();
file.createNewFile();
}
}
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}
private void writeToHostsFile(File file, String... hosts)
throws IOException {
if (!file.exists()) {
TEMP_DIR.mkdirs();
file.createNewFile();
}
ensureFileExists(file);
FileOutputStream fStream = null;
try {
fStream = new FileOutputStream(file);
@ -1992,6 +2069,33 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
}
private void writeToHostsXmlFile(
File file, Pair<String, Integer>... hostsAndTimeouts) throws Exception {
ensureFileExists(file);
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
Document doc = dbFactory.newDocumentBuilder().newDocument();
Element hosts = doc.createElement("hosts");
doc.appendChild(hosts);
for (Pair<String, Integer> hostsAndTimeout : hostsAndTimeouts) {
Element host = doc.createElement("host");
hosts.appendChild(host);
Element name = doc.createElement("name");
host.appendChild(name);
name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft()));
if (hostsAndTimeout.getRight() != null) {
Element timeout = doc.createElement("timeout");
host.appendChild(timeout);
timeout.appendChild(
doc.createTextNode(hostsAndTimeout.getRight().toString())
);
}
}
TransformerFactory transformerFactory = TransformerFactory.newInstance();
Transformer transformer = transformerFactory.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.transform(new DOMSource(doc), new StreamResult(file));
}
private void checkDecommissionedNMCount(MockRM rm, int count)
throws InterruptedException {
int waitCount = 0;

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -154,7 +154,7 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -82,6 +83,13 @@ public class TestAMRMRPCNodeUpdates {
rm.drainEvents();
}
private void syncNodeGracefulDecommission(
MockNM nm, int timeout) throws Exception {
rm.sendNodeGracefulDecommission(nm, timeout);
rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING);
rm.drainEvents();
}
private AllocateResponse allocate(final ApplicationAttemptId attemptId,
final AllocateRequest req) throws Exception {
UserGroupInformation ugi =
@ -98,6 +106,39 @@ public class TestAMRMRPCNodeUpdates {
});
}
@Test
public void testAMRMDecommissioningNodes() throws Exception {
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
rm.drainEvents();
RMApp app1 = rm.submitApp(2000);
// Trigger the scheduling so the AM gets 'launched' on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
// register AM returns no unusable node
am1.registerAppAttempt();
Integer decommissioningTimeout = 600;
syncNodeGracefulDecommission(nm2, decommissioningTimeout);
AllocateRequest allocateRequest1 =
AllocateRequest.newInstance(0, 0F, null, null, null);
AllocateResponse response1 =
allocate(attempt1.getAppAttemptId(), allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
Assert.assertEquals(
decommissioningTimeout, nr.getDecommissioningTimeout());
Assert.assertEquals(
NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
}
@Test
public void testAMRMUnusableNodes() throws Exception {
@ -138,6 +179,8 @@ public class TestAMRMRPCNodeUpdates {
NodeReport nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
// resending the allocate request returns the same result
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
@ -146,6 +189,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
syncNodeLost(nm3);
@ -159,6 +204,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.LOST, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
// registering another AM gives it the complete failed list
RMApp app2 = rm.submitApp(2000);
@ -190,6 +237,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
allocateRequest2 =
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
@ -200,6 +249,8 @@ public class TestAMRMRPCNodeUpdates {
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
Assert.assertNull(nr.getDecommissioningTimeout());
Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
// subsequent allocate calls should return no updated nodes
allocateRequest2 =

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -235,7 +235,7 @@ public class MockRMApp implements RMApp {
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
}