Merge -c 1305230 from trunk to branch-0.23 to fix MAPREDUCE-3533. Add a channel between RM and AM to get information on nodes. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1305231 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-03-26 05:46:55 +00:00
parent bf39110389
commit efe3382159
27 changed files with 773 additions and 23 deletions

View File

@ -34,9 +34,12 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
via acmurthy)
MAPREDUCE-3970 Add ServiceOperations class to aid working with Services
MAPREDUCE-3970. Add ServiceOperations class to aid working with Services
(stevel)
MAPREDUCE-3533. Add a channel between RM and AM to get information on
nodes. (Bikas Saha via acmurthy)
OPTIMIZATIONS
BUG FIXES

View File

@ -122,4 +122,17 @@ public interface AMResponse {
@Private
@Unstable
public void setCompletedContainersStatuses(List<ContainerStatus> containers);
/**
* Get the list of <em>updated <code>NodeReport</code>s</em>. Updates could be
* changes in health, availability etc of the nodes.
* @return The delta of updated nodes since the last response
*/
@Public
@Unstable
public List<NodeReport> getUpdatedNodes();
@Private
@Unstable
public void setUpdatedNodes(final List<NodeReport> updatedNodes);
}

View File

@ -56,6 +56,16 @@ public interface NodeReport {
@Unstable
void setNodeId(NodeId nodeId);
/**
* Get the <code>NodeState</code> of the node.
* @return <code>NodeState</code> of the node
*/
NodeState getNodeState();
@Private
@Unstable
void setNodeState(NodeState nodeState);
/**
* Get the <em>http address</em> of the node.
* @return <em>http address</em> of the node

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
// TODO NodeState is a clone of RMNodeState made for MR-3353. In a subsequent
// patch RMNodeState should be replaced with NodeState
/**
* <p>State of a <code>Node</code>.</p>
*/
public enum NodeState {
/** New node */
NEW,
/** Running node */
RUNNING,
/** Node is unhealthy */
UNHEALTHY,
/** Node is out of service */
DECOMMISSIONED,
/** Node has not sent a heartbeat for some configured time threshold*/
LOST,
/** Node has rebooted */
REBOOTED
}

View File

@ -26,12 +26,14 @@
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@ -47,6 +49,7 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
private List<ContainerStatus> completedContainersStatuses = null;
// private boolean hasLocalContainerList = false;
private List<NodeReport> updatedNodes = null;
public AMResponsePBImpl() {
builder = AMResponseProto.newBuilder();
@ -77,6 +80,12 @@ private synchronized void mergeLocalToBuilder() {
getContainerStatusProtoIterable(this.completedContainersStatuses);
builder.addAllCompletedContainerStatuses(iterable);
}
if (this.updatedNodes != null) {
builder.clearUpdatedNodes();
Iterable<NodeReportProto> iterable =
getNodeReportProtoIterable(this.updatedNodes);
builder.addAllUpdatedNodes(iterable);
}
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
@ -141,6 +150,36 @@ public synchronized void setAvailableResources(Resource limit) {
builder.clearLimit();
this.limit = limit;
}
@Override
public synchronized List<NodeReport> getUpdatedNodes() {
initLocalNewNodeReportList();
return this.updatedNodes;
}
//Once this is called. updatedNodes will never be null - until a getProto is called.
private synchronized void initLocalNewNodeReportList() {
if (this.updatedNodes != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<NodeReportProto> list = p.getUpdatedNodesList();
updatedNodes = new ArrayList<NodeReport>(list.size());
for (NodeReportProto n : list) {
updatedNodes.add(convertFromProtoFormat(n));
}
}
@Override
public synchronized void setUpdatedNodes(final List<NodeReport> updatedNodes) {
if (updatedNodes == null) {
this.updatedNodes.clear();
return;
}
this.updatedNodes = new ArrayList<NodeReport>(updatedNodes.size());
this.updatedNodes.addAll(updatedNodes);
}
@Override
public synchronized List<Container> getAllocatedContainers() {
@ -148,7 +187,7 @@ public synchronized List<Container> getAllocatedContainers() {
return this.allocatedContainers;
}
//Once this is called. containerList will never be null - untill a getProto is called.
//Once this is called. containerList will never be null - until a getProto is called.
private synchronized void initLocalNewContainerList() {
if (this.allocatedContainers != null) {
return;
@ -166,6 +205,7 @@ private synchronized void initLocalNewContainerList() {
public synchronized void setAllocatedContainers(final List<Container> containers) {
if (containers == null)
return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList();
allocatedContainers.addAll(containers);
}
@ -232,6 +272,38 @@ public synchronized void remove() {
}
};
}
private synchronized Iterable<NodeReportProto>
getNodeReportProtoIterable(
final List<NodeReport> newNodeReportsList) {
maybeInitBuilder();
return new Iterable<NodeReportProto>() {
@Override
public synchronized Iterator<NodeReportProto> iterator() {
return new Iterator<NodeReportProto>() {
Iterator<NodeReport> iter = newNodeReportsList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized NodeReportProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
//// Finished containers
@Override
@ -263,6 +335,15 @@ public synchronized void setCompletedContainersStatuses(
completedContainersStatuses.addAll(containers);
}
private synchronized NodeReportPBImpl convertFromProtoFormat(
NodeReportProto p) {
return new NodeReportPBImpl(p);
}
private synchronized NodeReportProto convertToProtoFormat(NodeReport t) {
return ((NodeReportPBImpl)t).getProto();
}
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
@ -28,6 +29,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.util.ProtoUtils;
public class NodeReportPBImpl extends ProtoBase<NodeReportProto>
implements NodeReport {
@ -131,6 +133,25 @@ public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}
@Override
public NodeState getNodeState() {
NodeReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeState()) {
return null;
}
return ProtoUtils.convertFromProtoFormat(p.getNodeState());
}
@Override
public void setNodeState(NodeState nodeState) {
maybeInitBuilder();
if (nodeState == null) {
builder.clearNodeState();
return;
}
builder.setNodeState(ProtoUtils.convertToProtoFormat(nodeState));
}
@Override
public void setCapability(Resource capability) {
maybeInitBuilder();

View File

@ -26,13 +26,18 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
@ -56,6 +61,26 @@ public static ContainerState convertFromProtoFormat(ContainerStateProto e) {
return ContainerState.valueOf(e.name().replace(CONTAINER_STATE_PREFIX, ""));
}
/*
* NodeState
*/
private static String NODE_STATE_PREFIX = "NS_";
public static NodeStateProto convertToProtoFormat(NodeState e) {
return NodeStateProto.valueOf(NODE_STATE_PREFIX + e.name());
}
public static NodeState convertFromProtoFormat(NodeStateProto e) {
return NodeState.valueOf(e.name().replace(NODE_STATE_PREFIX, ""));
}
/*
* NodeId
*/
public static NodeIdProto convertToProtoFormat(NodeId e) {
return ((NodeIdPBImpl)e).getProto();
}
public static NodeId convertFromProtoFormat(NodeIdProto e) {
return new NodeIdPBImpl(e);
}
/*
* YarnApplicationState

View File

@ -172,6 +172,15 @@ message ApplicationReportProto {
optional string originalTrackingUrl = 17;
}
enum NodeStateProto {
NS_NEW = 1;
NS_RUNNING = 2;
NS_UNHEALTHY = 3;
NS_DECOMMISSIONED = 4;
NS_LOST = 5;
NS_REBOOTED = 6;
}
message NodeIdProto {
optional string host = 1;
optional int32 port = 2;
@ -191,6 +200,7 @@ message NodeReportProto {
optional ResourceProto capability = 5;
optional int32 numContainers = 6;
optional NodeHealthStatusProto node_health_status = 8;
optional NodeStateProto node_state = 9;
}
@ -210,6 +220,7 @@ message AMResponseProto {
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
}

View File

@ -25,6 +25,10 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -42,7 +46,10 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.Resource;
@ -52,7 +59,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
/**
* Builder utilities to construct various objects.
@ -204,6 +210,21 @@ public static NodeId newNodeId(String host, int port) {
nodeId.setPort(port);
return nodeId;
}
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
String httpAddress, String rackName, Resource used, Resource capability,
int numContainers, NodeHealthStatus nodeHealthStatus) {
NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
nodeReport.setNodeId(nodeId);
nodeReport.setNodeState(nodeState);
nodeReport.setHttpAddress(httpAddress);
nodeReport.setRackName(rackName);
nodeReport.setUsed(used);
nodeReport.setCapability(capability);
nodeReport.setNumContainers(numContainers);
nodeReport.setNodeHealthStatus(nodeHealthStatus);
return nodeReport;
}
public static ContainerStatus newContainerStatus(ContainerId containerId,
ContainerState containerState, String diagnostics, int exitStatus) {

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -45,6 +46,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -54,6 +57,8 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -61,9 +66,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
public class ApplicationMasterService extends AbstractService implements
@ -279,8 +286,33 @@ public AllocateResponse allocate(AllocateRequest request)
RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
rScheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeReport report = BuilderUtils.newNodeReport(rmNode.getNodeID(),
RMNodeState.toNodeState(rmNode.getState()),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getNodeHealthStatus());
updatedNodeReports.add(report);
}
response.setUpdatedNodes(updatedNodeReports);
}
response.setAllocatedContainers(allocation.getContainers());
response.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());

View File

@ -80,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@ -405,13 +406,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
return response;
}
private NodeReport createNodeReports(RMNode rmNode) {
NodeReport report = recordFactory.newRecordInstance(NodeReport.class);
report.setNodeId(rmNode.getNodeID());
report.setRackName(rmNode.getRackName());
report.setCapability(rmNode.getTotalCapability());
report.setNodeHealthStatus(rmNode.getNodeHealthStatus());
private NodeReport createNodeReports(RMNode rmNode) {
SchedulerNodeReport schedulerNodeReport =
scheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0);
@ -420,8 +415,12 @@ private NodeReport createNodeReports(RMNode rmNode) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
report.setUsed(used);
report.setNumContainers(numContainers);
NodeReport report = BuilderUtils.newNodeReport(rmNode.getNodeID(),
RMNodeState.toNodeState(rmNode.getState()),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getNodeHealthStatus());
return report;
}

View File

@ -19,7 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -27,17 +30,28 @@
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.service.AbstractService;
public class NodesListManager extends AbstractService{
public class NodesListManager extends AbstractService implements
EventHandler<NodesListManagerEvent> {
private static final Log LOG = LogFactory.getLog(NodesListManager.class);
private HostsFileReader hostsReader;
private Configuration conf;
private Set<RMNode> unusableRMNodesConcurrentSet = Collections
.newSetFromMap(new ConcurrentHashMap<RMNode,Boolean>());
private final RMContext rmContext;
public NodesListManager() {
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
this.rmContext = rmContext;
}
@Override
@ -102,4 +116,53 @@ public boolean isValidNode(String hostName) {
!excludeList.contains(hostName));
}
}
/**
* Provides the currently unusable nodes. Copies it into provided collection.
* @param unUsableNodes
* Collection to which the unusable nodes are added
* @return number of unusable nodes added
*/
public int getUnusableNodes(Collection<RMNode> unUsableNodes) {
unUsableNodes.addAll(unusableRMNodesConcurrentSet);
return unusableRMNodesConcurrentSet.size();
}
@Override
public void handle(NodesListManagerEvent event) {
RMNode eventNode = event.getNode();
switch (event.getType()) {
case NODE_UNUSABLE:
LOG.debug(eventNode + " reported unusable");
unusableRMNodesConcurrentSet.add(eventNode);
for(RMApp app: rmContext.getRMApps().values()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
RMAppNodeUpdateType.NODE_UNUSABLE));
}
break;
case NODE_USABLE:
if (unusableRMNodesConcurrentSet.contains(eventNode)) {
LOG.debug(eventNode + " reported usable");
unusableRMNodesConcurrentSet.remove(eventNode);
for (RMApp app : rmContext.getRMApps().values()) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
RMAppNodeUpdateType.NODE_USABLE));
}
} else {
LOG.warn(eventNode
+ " reported usable without first reporting unusable");
}
break;
default:
LOG.error("Ignoring invalid eventtype " + event.getType());
}
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodesListManagerEvent extends
AbstractEvent<NodesListManagerEventType> {
private final RMNode node;
public NodesListManagerEvent(NodesListManagerEventType type, RMNode node) {
super(type);
this.node = node;
}
public RMNode getNode() {
return node;
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
public enum NodesListManagerEventType {
NODE_USABLE,
NODE_UNUSABLE
}

View File

@ -121,7 +121,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
public ResourceManager(Store store) {
super("ResourceManager");
this.store = store;
this.nodesListManager = new NodesListManager();
}
public RMContext getRMContext() {
@ -151,6 +150,10 @@ public synchronized void init(Configuration conf) {
this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer);
// Register event handler for NodesListManager
this.nodesListManager = new NodesListManager(this.rmContext);
this.rmDispatcher.register(NodesListManagerEventType.class,
this.nodesListManager);
addService(nodesListManager);
// Initialize the scheduler
@ -170,7 +173,7 @@ public synchronized void init(Configuration conf) {
// Register event handler for RmNodes
this.rmDispatcher.register(RMNodeEventType.class,
new NodeEventDispatcher(this.rmContext));
new NodeEventDispatcher(this.rmContext));
//TODO change this to be random
this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
@ -422,7 +425,7 @@ public void handle(RMNodeEvent event) {
}
}
}
protected void startWepApp() {
Builder<ApplicationMasterService> builder =
WebApps.$for("cluster", ApplicationMasterService.class, masterService, "ws").at(

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -27,6 +29,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
/**
* The read interface to an Application in the ResourceManager. Take a
@ -108,6 +111,16 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the {@link ApplicationReport} detailing the status of the application.
*/
ApplicationReport createAndGetApplicationReport(boolean allowAccess);
/**
* To receive the collection of all {@link RMNode}s whose updates have been
* received by the RMApp. Updates can be node becoming lost or becoming
* healthy etc. The method clears the information from the {@link RMApp}. So
* each call to this method gives the delta from the previous call.
* @param updatedNodes Collection into which the updates are transferred
* @return the number of nodes added to the {@link Collection}
*/
int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
/**
* Application level metadata is stored in {@link ApplicationStore} which

View File

@ -29,5 +29,6 @@ public enum RMAppEventType {
ATTEMPT_REGISTERED,
ATTEMPT_FINISHED, // Will send the final state
ATTEMPT_FAILED,
ATTEMPT_KILLED
ATTEMPT_KILLED,
NODE_UPDATE
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
@ -47,12 +48,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -87,6 +91,7 @@ public class RMAppImpl implements RMApp {
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
// Mutable fields
private long startTime;
@ -107,6 +112,8 @@ public class RMAppImpl implements RMApp {
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
RMAppEventType.START, new StartAppAttemptTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
@ -115,6 +122,8 @@ RMAppEventType.START, new StartAppAttemptTransition())
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
@ -123,6 +132,8 @@ RMAppEventType.APP_REJECTED, new AppRejectedTransition())
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
@ -133,6 +144,8 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
.addTransition(RMAppState.RUNNING,
@ -145,10 +158,16 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from FINISHED state
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
RMAppEventType.KILL)
// ignorable transitions
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
RMAppEventType.NODE_UPDATE)
// Transitions from FAILED state
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
RMAppEventType.KILL)
// ignorable transitions
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
RMAppEventType.NODE_UPDATE)
// Transitions from KILLED state
.addTransition(
@ -158,6 +177,9 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.ATTEMPT_KILLED))
// ignorable transitions
.addTransition(RMAppState.KILLED, RMAppState.KILLED,
RMAppEventType.NODE_UPDATE)
.installTopology();
@ -330,6 +352,18 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
throw new YarnException("Unknown state passed!");
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
this.writeLock.lock();
try {
int updatedNodeCount = this.updatedNodes.size();
updatedNodes.addAll(this.updatedNodes);
this.updatedNodes.clear();
return updatedNodeCount;
} finally {
this.writeLock.unlock();
}
}
@Override
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
@ -462,6 +496,13 @@ private void createNewAttempt() {
handler.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
}
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
RMNodeState nodeState = node.getState();
updatedNodes.add(node);
LOG.debug("Received node update event:" + type + " for node:" + node
+ " with state:" + nodeState);
}
private static class RMAppTransition implements
SingleArcTransition<RMAppImpl, RMAppEvent> {
@ -470,6 +511,14 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
app.processNodeUpdate(nodeUpdateEvent.getUpdateType(),
nodeUpdateEvent.getNode());
};
}
private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
app.createNewAttempt();

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class RMAppNodeUpdateEvent extends RMAppEvent {
public enum RMAppNodeUpdateType {
NODE_USABLE,
NODE_UNUSABLE
}
private final RMNode node;
private final RMAppNodeUpdateType updateType;
public RMAppNodeUpdateEvent(ApplicationId appId, RMNode node,
RMAppNodeUpdateType updateType) {
super(appId, RMAppEventType.NODE_UPDATE);
this.node = node;
this.updateType = updateType;
}
public RMNode getNode() {
return node;
}
public RMAppNodeUpdateType getUpdateType() {
return updateType;
}
}

View File

@ -46,6 +46,8 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@ -140,6 +142,10 @@ RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
.addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
// create the topology tables
.installTopology();
@ -444,6 +450,9 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
@ -473,6 +482,9 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Update metrics
rmNode.updateMetricsForDeactivatedNode(RMNodeState.UNHEALTHY);
return RMNodeState.UNHEALTHY;
@ -547,6 +559,12 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
// ??? how about updating metrics before notifying to ensure that
// notifiers get update metadata because they will very likely query it
// upon notification
// Update metrics
rmNode.updateMetricsForRejoinedNode(RMNodeState.UNHEALTHY);
return RMNodeState.RUNNING;

View File

@ -18,6 +18,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import org.apache.hadoop.yarn.api.records.NodeState;
//TODO yarn.api.records.NodeState is a clone of RMNodeState made for MR-3353.
// In a subsequent patch RMNodeState should be replaced with NodeState
public enum RMNodeState {
NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST, REBOOTED
}
NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST, REBOOTED;
public static NodeState toNodeState(RMNodeState state) {
switch(state) {
case NEW:
return NodeState.NEW;
case RUNNING:
return NodeState.RUNNING;
case UNHEALTHY:
return NodeState.UNHEALTHY;
case DECOMMISSIONED:
return NodeState.DECOMMISSIONED;
case LOST:
return NodeState.LOST;
case REBOOTED:
return NodeState.REBOOTED;
}
return null;
}
};

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@ -35,6 +36,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Lists;
@ -219,6 +221,10 @@ public void handle(RMAppEvent event) {
public FinalApplicationStatus getFinalApplicationStatus() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
public static RMApp newApplication(int i) {

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class TestAMRMRPCNodeUpdates {
private MockRM rm;
ApplicationMasterService amService = null;
DrainDispatcher dispatcher = null;
@Before
public void setUp() {
dispatcher = new DrainDispatcher();
this.rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
}
};
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
amService = rm.getApplicationMasterService();
}
@After
public void tearDown() {
if (rm != null) {
this.rm.stop();
}
}
private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception {
nm.nodeHeartbeat(health);
dispatcher.await();
}
private void syncNodeLost(MockNM nm) throws Exception {
rm.sendNodeStarted(nm);
rm.NMwaitForState(nm.getNodeId(), RMNodeState.RUNNING);
rm.sendNodeLost(nm);
dispatcher.await();
}
@Test
public void testAMRMUnusableNodes() throws Exception {
MockNM nm1 = rm.registerNode("h1:1234", 5000);
MockNM nm2 = rm.registerNode("h2:1234", 5000);
MockNM nm3 = rm.registerNode("h3:1234", 5000);
MockNM nm4 = rm.registerNode("h4:1234", 5000);
RMApp app1 = rm.submitApp(2000);
// Trigger the scheduling so the AM gets 'launched' on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
// register AM returns no unusable node
am1.registerAppAttempt();
// allocate request returns no updated node
AllocateRequest allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response1 = amService.allocate(allocateRequest1).getAMResponse();
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
syncNodeHeartbeat(nm4, false);
// allocate request returns updated node
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
// resending the allocate request returns the same result
response1 = amService.allocate(allocateRequest1).getAMResponse();
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
syncNodeLost(nm3);
// subsequent allocate request returns delta
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.LOST, nr.getNodeState());
// registering another AM gives it the complete failed list
RMApp app2 = rm.submitApp(2000);
// Trigger nm2 heartbeat so that AM gets launched on it
nm2.nodeHeartbeat(true);
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
// register AM returns all unusable nodes
am2.registerAppAttempt();
// allocate request returns no updated node
AllocateRequest allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), 0, 0F, null, null);
AMResponse response2 = amService.allocate(allocateRequest2).getAMResponse();
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
syncNodeHeartbeat(nm4, true);
// both AM's should get delta updated nodes
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
response1 = amService.allocate(allocateRequest1).getAMResponse();
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
response2 = amService.allocate(allocateRequest2).getAMResponse();
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
// subsequent allocate calls should return no updated nodes
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
response2 = amService.allocate(allocateRequest2).getAMResponse();
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
// how to do the above for LOST node
}
}

View File

@ -82,7 +82,7 @@ public void setUp() {
dispatcher);
nmLivelinessMonitor.init(conf);
nmLivelinessMonitor.start();
NodesListManager nodesListManager = new NodesListManager();
NodesListManager nodesListManager = new NodesListManager(context);
nodesListManager.init(conf);
resourceTrackerService = new ResourceTrackerService(context,
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager);

View File

@ -70,7 +70,7 @@ public void handle(Event event) {
new RMContextImpl(new MemStore(), dispatcher, null, null, null);
dispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(context));
NodesListManager nodesListManager = new NodesListManager();
NodesListManager nodesListManager = new NodesListManager(context);
Configuration conf = new Configuration();
nodesListManager.init(conf);
resourceTrackerService = new ResourceTrackerService(context,

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -25,6 +27,7 @@
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class MockRMApp implements RMApp {
static final int DT = 1000000; // ms
@ -176,6 +179,11 @@ public void handle(RMAppEvent event) {
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
return FinalApplicationStatus.UNDEFINED;
}
@Override
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
throw new UnsupportedOperationException("Not supported yet.");
};
}