MAPREDUCE-3533. Add a channel between RM and AM to get information on nodes. Contributed by Bikas Saha.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1305230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1d5e7dde95
commit
606114d602
|
@ -127,9 +127,12 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
|
||||
via acmurthy)
|
||||
|
||||
MAPREDUCE-3970 Add ServiceOperations class to aid working with Services
|
||||
MAPREDUCE-3970. Add ServiceOperations class to aid working with Services
|
||||
(stevel)
|
||||
|
||||
MAPREDUCE-3533. Add a channel between RM and AM to get information on
|
||||
nodes. (Bikas Saha via acmurthy)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -122,4 +122,17 @@ public interface AMResponse {
|
|||
@Private
|
||||
@Unstable
|
||||
public void setCompletedContainersStatuses(List<ContainerStatus> containers);
|
||||
|
||||
/**
|
||||
* Get the list of <em>updated <code>NodeReport</code>s</em>. Updates could be
|
||||
* changes in health, availability etc of the nodes.
|
||||
* @return The delta of updated nodes since the last response
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public List<NodeReport> getUpdatedNodes();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setUpdatedNodes(final List<NodeReport> updatedNodes);
|
||||
}
|
|
@ -56,6 +56,16 @@ public interface NodeReport {
|
|||
@Unstable
|
||||
void setNodeId(NodeId nodeId);
|
||||
|
||||
/**
|
||||
* Get the <code>NodeState</code> of the node.
|
||||
* @return <code>NodeState</code> of the node
|
||||
*/
|
||||
NodeState getNodeState();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setNodeState(NodeState nodeState);
|
||||
|
||||
/**
|
||||
* Get the <em>http address</em> of the node.
|
||||
* @return <em>http address</em> of the node
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
// TODO NodeState is a clone of RMNodeState made for MR-3353. In a subsequent
|
||||
// patch RMNodeState should be replaced with NodeState
|
||||
/**
|
||||
* <p>State of a <code>Node</code>.</p>
|
||||
*/
|
||||
public enum NodeState {
|
||||
/** New node */
|
||||
NEW,
|
||||
|
||||
/** Running node */
|
||||
RUNNING,
|
||||
|
||||
/** Node is unhealthy */
|
||||
UNHEALTHY,
|
||||
|
||||
/** Node is out of service */
|
||||
DECOMMISSIONED,
|
||||
|
||||
/** Node has not sent a heartbeat for some configured time threshold*/
|
||||
LOST,
|
||||
|
||||
/** Node has rebooted */
|
||||
REBOOTED
|
||||
}
|
|
@ -26,12 +26,14 @@ import java.util.List;
|
|||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
|
||||
|
||||
|
@ -47,6 +49,7 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
|
|||
private List<ContainerStatus> completedContainersStatuses = null;
|
||||
// private boolean hasLocalContainerList = false;
|
||||
|
||||
private List<NodeReport> updatedNodes = null;
|
||||
|
||||
public AMResponsePBImpl() {
|
||||
builder = AMResponseProto.newBuilder();
|
||||
|
@ -77,6 +80,12 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
|
|||
getContainerStatusProtoIterable(this.completedContainersStatuses);
|
||||
builder.addAllCompletedContainerStatuses(iterable);
|
||||
}
|
||||
if (this.updatedNodes != null) {
|
||||
builder.clearUpdatedNodes();
|
||||
Iterable<NodeReportProto> iterable =
|
||||
getNodeReportProtoIterable(this.updatedNodes);
|
||||
builder.addAllUpdatedNodes(iterable);
|
||||
}
|
||||
if (this.limit != null) {
|
||||
builder.setLimit(convertToProtoFormat(this.limit));
|
||||
}
|
||||
|
@ -141,6 +150,36 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
|
|||
builder.clearLimit();
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<NodeReport> getUpdatedNodes() {
|
||||
initLocalNewNodeReportList();
|
||||
return this.updatedNodes;
|
||||
}
|
||||
|
||||
//Once this is called. updatedNodes will never be null - until a getProto is called.
|
||||
private synchronized void initLocalNewNodeReportList() {
|
||||
if (this.updatedNodes != null) {
|
||||
return;
|
||||
}
|
||||
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<NodeReportProto> list = p.getUpdatedNodesList();
|
||||
updatedNodes = new ArrayList<NodeReport>(list.size());
|
||||
|
||||
for (NodeReportProto n : list) {
|
||||
updatedNodes.add(convertFromProtoFormat(n));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setUpdatedNodes(final List<NodeReport> updatedNodes) {
|
||||
if (updatedNodes == null) {
|
||||
this.updatedNodes.clear();
|
||||
return;
|
||||
}
|
||||
this.updatedNodes = new ArrayList<NodeReport>(updatedNodes.size());
|
||||
this.updatedNodes.addAll(updatedNodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<Container> getAllocatedContainers() {
|
||||
|
@ -148,7 +187,7 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
|
|||
return this.allocatedContainers;
|
||||
}
|
||||
|
||||
//Once this is called. containerList will never be null - untill a getProto is called.
|
||||
//Once this is called. containerList will never be null - until a getProto is called.
|
||||
private synchronized void initLocalNewContainerList() {
|
||||
if (this.allocatedContainers != null) {
|
||||
return;
|
||||
|
@ -166,6 +205,7 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
|
|||
public synchronized void setAllocatedContainers(final List<Container> containers) {
|
||||
if (containers == null)
|
||||
return;
|
||||
// this looks like a bug because it results in append and not set
|
||||
initLocalNewContainerList();
|
||||
allocatedContainers.addAll(containers);
|
||||
}
|
||||
|
@ -232,6 +272,38 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private synchronized Iterable<NodeReportProto>
|
||||
getNodeReportProtoIterable(
|
||||
final List<NodeReport> newNodeReportsList) {
|
||||
maybeInitBuilder();
|
||||
return new Iterable<NodeReportProto>() {
|
||||
@Override
|
||||
public synchronized Iterator<NodeReportProto> iterator() {
|
||||
return new Iterator<NodeReportProto>() {
|
||||
|
||||
Iterator<NodeReport> iter = newNodeReportsList.iterator();
|
||||
|
||||
@Override
|
||||
public synchronized boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized NodeReportProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
//// Finished containers
|
||||
@Override
|
||||
|
@ -263,6 +335,15 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
|
|||
completedContainersStatuses.addAll(containers);
|
||||
}
|
||||
|
||||
private synchronized NodeReportPBImpl convertFromProtoFormat(
|
||||
NodeReportProto p) {
|
||||
return new NodeReportPBImpl(p);
|
||||
}
|
||||
|
||||
private synchronized NodeReportProto convertToProtoFormat(NodeReport t) {
|
||||
return ((NodeReportPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private synchronized ContainerPBImpl convertFromProtoFormat(
|
||||
ContainerProto p) {
|
||||
return new ContainerPBImpl(p);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
|
|||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
public class NodeReportPBImpl extends ProtoBase<NodeReportProto>
|
||||
implements NodeReport {
|
||||
|
@ -131,6 +133,25 @@ public class NodeReportPBImpl extends ProtoBase<NodeReportProto>
|
|||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeState getNodeState() {
|
||||
NodeReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasNodeState()) {
|
||||
return null;
|
||||
}
|
||||
return ProtoUtils.convertFromProtoFormat(p.getNodeState());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeState(NodeState nodeState) {
|
||||
maybeInitBuilder();
|
||||
if (nodeState == null) {
|
||||
builder.clearNodeState();
|
||||
return;
|
||||
}
|
||||
builder.setNodeState(ProtoUtils.convertToProtoFormat(nodeState));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCapability(Resource capability) {
|
||||
maybeInitBuilder();
|
||||
|
|
|
@ -26,13 +26,18 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
|
||||
|
@ -56,6 +61,26 @@ public class ProtoUtils {
|
|||
return ContainerState.valueOf(e.name().replace(CONTAINER_STATE_PREFIX, ""));
|
||||
}
|
||||
|
||||
/*
|
||||
* NodeState
|
||||
*/
|
||||
private static String NODE_STATE_PREFIX = "NS_";
|
||||
public static NodeStateProto convertToProtoFormat(NodeState e) {
|
||||
return NodeStateProto.valueOf(NODE_STATE_PREFIX + e.name());
|
||||
}
|
||||
public static NodeState convertFromProtoFormat(NodeStateProto e) {
|
||||
return NodeState.valueOf(e.name().replace(NODE_STATE_PREFIX, ""));
|
||||
}
|
||||
|
||||
/*
|
||||
* NodeId
|
||||
*/
|
||||
public static NodeIdProto convertToProtoFormat(NodeId e) {
|
||||
return ((NodeIdPBImpl)e).getProto();
|
||||
}
|
||||
public static NodeId convertFromProtoFormat(NodeIdProto e) {
|
||||
return new NodeIdPBImpl(e);
|
||||
}
|
||||
|
||||
/*
|
||||
* YarnApplicationState
|
||||
|
|
|
@ -172,6 +172,15 @@ message ApplicationReportProto {
|
|||
optional string originalTrackingUrl = 17;
|
||||
}
|
||||
|
||||
enum NodeStateProto {
|
||||
NS_NEW = 1;
|
||||
NS_RUNNING = 2;
|
||||
NS_UNHEALTHY = 3;
|
||||
NS_DECOMMISSIONED = 4;
|
||||
NS_LOST = 5;
|
||||
NS_REBOOTED = 6;
|
||||
}
|
||||
|
||||
message NodeIdProto {
|
||||
optional string host = 1;
|
||||
optional int32 port = 2;
|
||||
|
@ -191,6 +200,7 @@ message NodeReportProto {
|
|||
optional ResourceProto capability = 5;
|
||||
optional int32 numContainers = 6;
|
||||
optional NodeHealthStatusProto node_health_status = 8;
|
||||
optional NodeStateProto node_state = 9;
|
||||
}
|
||||
|
||||
|
||||
|
@ -210,6 +220,7 @@ message AMResponseProto {
|
|||
repeated ContainerProto allocated_containers = 3;
|
||||
repeated ContainerStatusProto completed_container_statuses = 4;
|
||||
optional ResourceProto limit = 5;
|
||||
repeated NodeReportProto updated_nodes = 6;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -25,6 +25,10 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
|
@ -42,7 +46,10 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -52,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
|
||||
/**
|
||||
* Builder utilities to construct various objects.
|
||||
|
@ -204,6 +210,21 @@ public class BuilderUtils {
|
|||
nodeId.setPort(port);
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
|
||||
String httpAddress, String rackName, Resource used, Resource capability,
|
||||
int numContainers, NodeHealthStatus nodeHealthStatus) {
|
||||
NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
|
||||
nodeReport.setNodeId(nodeId);
|
||||
nodeReport.setNodeState(nodeState);
|
||||
nodeReport.setHttpAddress(httpAddress);
|
||||
nodeReport.setRackName(rackName);
|
||||
nodeReport.setUsed(used);
|
||||
nodeReport.setCapability(capability);
|
||||
nodeReport.setNumContainers(numContainers);
|
||||
nodeReport.setNodeHealthStatus(nodeHealthStatus);
|
||||
return nodeReport;
|
||||
}
|
||||
|
||||
public static ContainerStatus newContainerStatus(ContainerId containerId,
|
||||
ContainerState containerState, String diagnostics, int exitStatus) {
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.api.records.AMResponse;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
|
@ -54,6 +57,8 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
|
@ -61,9 +66,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
@Private
|
||||
public class ApplicationMasterService extends AbstractService implements
|
||||
|
@ -279,8 +286,33 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
|
||||
RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
|
||||
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
|
||||
|
||||
|
||||
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
|
||||
|
||||
// update the response with the deltas of node status changes
|
||||
List<RMNode> updatedNodes = new ArrayList<RMNode>();
|
||||
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
|
||||
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
|
||||
for(RMNode rmNode: updatedNodes) {
|
||||
SchedulerNodeReport schedulerNodeReport =
|
||||
rScheduler.getNodeReport(rmNode.getNodeID());
|
||||
Resource used = BuilderUtils.newResource(0);
|
||||
int numContainers = 0;
|
||||
if (schedulerNodeReport != null) {
|
||||
used = schedulerNodeReport.getUsedResource();
|
||||
numContainers = schedulerNodeReport.getNumContainers();
|
||||
}
|
||||
NodeReport report = BuilderUtils.newNodeReport(rmNode.getNodeID(),
|
||||
RMNodeState.toNodeState(rmNode.getState()),
|
||||
rmNode.getHttpAddress(), rmNode.getRackName(), used,
|
||||
rmNode.getTotalCapability(), numContainers,
|
||||
rmNode.getNodeHealthStatus());
|
||||
|
||||
updatedNodeReports.add(report);
|
||||
}
|
||||
response.setUpdatedNodes(updatedNodeReports);
|
||||
}
|
||||
|
||||
response.setAllocatedContainers(allocation.getContainers());
|
||||
response.setCompletedContainersStatuses(appAttempt
|
||||
.pullJustFinishedContainers());
|
||||
|
|
|
@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
|
@ -405,13 +406,7 @@ public class ClientRMService extends AbstractService implements
|
|||
return response;
|
||||
}
|
||||
|
||||
private NodeReport createNodeReports(RMNode rmNode) {
|
||||
NodeReport report = recordFactory.newRecordInstance(NodeReport.class);
|
||||
report.setNodeId(rmNode.getNodeID());
|
||||
report.setRackName(rmNode.getRackName());
|
||||
report.setCapability(rmNode.getTotalCapability());
|
||||
report.setNodeHealthStatus(rmNode.getNodeHealthStatus());
|
||||
|
||||
private NodeReport createNodeReports(RMNode rmNode) {
|
||||
SchedulerNodeReport schedulerNodeReport =
|
||||
scheduler.getNodeReport(rmNode.getNodeID());
|
||||
Resource used = BuilderUtils.newResource(0);
|
||||
|
@ -420,8 +415,12 @@ public class ClientRMService extends AbstractService implements
|
|||
used = schedulerNodeReport.getUsedResource();
|
||||
numContainers = schedulerNodeReport.getNumContainers();
|
||||
}
|
||||
report.setUsed(used);
|
||||
report.setNumContainers(numContainers);
|
||||
|
||||
NodeReport report = BuilderUtils.newNodeReport(rmNode.getNodeID(),
|
||||
RMNodeState.toNodeState(rmNode.getState()),
|
||||
rmNode.getHttpAddress(), rmNode.getRackName(), used,
|
||||
rmNode.getTotalCapability(), numContainers,
|
||||
rmNode.getNodeHealthStatus());
|
||||
|
||||
return report;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,10 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -27,17 +30,28 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.util.HostsFileReader;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
public class NodesListManager extends AbstractService{
|
||||
public class NodesListManager extends AbstractService implements
|
||||
EventHandler<NodesListManagerEvent> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(NodesListManager.class);
|
||||
|
||||
private HostsFileReader hostsReader;
|
||||
private Configuration conf;
|
||||
private Set<RMNode> unusableRMNodesConcurrentSet = Collections
|
||||
.newSetFromMap(new ConcurrentHashMap<RMNode,Boolean>());
|
||||
|
||||
private final RMContext rmContext;
|
||||
|
||||
public NodesListManager() {
|
||||
public NodesListManager(RMContext rmContext) {
|
||||
super(NodesListManager.class.getName());
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,4 +116,53 @@ public class NodesListManager extends AbstractService{
|
|||
!excludeList.contains(hostName));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides the currently unusable nodes. Copies it into provided collection.
|
||||
* @param unUsableNodes
|
||||
* Collection to which the unusable nodes are added
|
||||
* @return number of unusable nodes added
|
||||
*/
|
||||
public int getUnusableNodes(Collection<RMNode> unUsableNodes) {
|
||||
unUsableNodes.addAll(unusableRMNodesConcurrentSet);
|
||||
return unusableRMNodesConcurrentSet.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(NodesListManagerEvent event) {
|
||||
RMNode eventNode = event.getNode();
|
||||
switch (event.getType()) {
|
||||
case NODE_UNUSABLE:
|
||||
LOG.debug(eventNode + " reported unusable");
|
||||
unusableRMNodesConcurrentSet.add(eventNode);
|
||||
for(RMApp app: rmContext.getRMApps().values()) {
|
||||
this.rmContext
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
|
||||
RMAppNodeUpdateType.NODE_UNUSABLE));
|
||||
}
|
||||
break;
|
||||
case NODE_USABLE:
|
||||
if (unusableRMNodesConcurrentSet.contains(eventNode)) {
|
||||
LOG.debug(eventNode + " reported usable");
|
||||
unusableRMNodesConcurrentSet.remove(eventNode);
|
||||
for (RMApp app : rmContext.getRMApps().values()) {
|
||||
this.rmContext
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
|
||||
RMAppNodeUpdateType.NODE_USABLE));
|
||||
}
|
||||
} else {
|
||||
LOG.warn(eventNode
|
||||
+ " reported usable without first reporting unusable");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG.error("Ignoring invalid eventtype " + event.getType());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
public class NodesListManagerEvent extends
|
||||
AbstractEvent<NodesListManagerEventType> {
|
||||
private final RMNode node;
|
||||
|
||||
public NodesListManagerEvent(NodesListManagerEventType type, RMNode node) {
|
||||
super(type);
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
public RMNode getNode() {
|
||||
return node;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
public enum NodesListManagerEventType {
|
||||
NODE_USABLE,
|
||||
NODE_UNUSABLE
|
||||
}
|
|
@ -121,7 +121,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
public ResourceManager(Store store) {
|
||||
super("ResourceManager");
|
||||
this.store = store;
|
||||
this.nodesListManager = new NodesListManager();
|
||||
}
|
||||
|
||||
public RMContext getRMContext() {
|
||||
|
@ -151,6 +150,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
|
||||
this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer);
|
||||
|
||||
// Register event handler for NodesListManager
|
||||
this.nodesListManager = new NodesListManager(this.rmContext);
|
||||
this.rmDispatcher.register(NodesListManagerEventType.class,
|
||||
this.nodesListManager);
|
||||
addService(nodesListManager);
|
||||
|
||||
// Initialize the scheduler
|
||||
|
@ -170,7 +173,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
// Register event handler for RmNodes
|
||||
this.rmDispatcher.register(RMNodeEventType.class,
|
||||
new NodeEventDispatcher(this.rmContext));
|
||||
new NodeEventDispatcher(this.rmContext));
|
||||
|
||||
//TODO change this to be random
|
||||
this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
|
||||
|
@ -422,7 +425,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void startWepApp() {
|
||||
Builder<ApplicationMasterService> builder =
|
||||
WebApps.$for("cluster", ApplicationMasterService.class, masterService, "ws").at(
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -27,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
/**
|
||||
* The read interface to an Application in the ResourceManager. Take a
|
||||
|
@ -108,6 +111,16 @@ public interface RMApp extends EventHandler<RMAppEvent> {
|
|||
* @return the {@link ApplicationReport} detailing the status of the application.
|
||||
*/
|
||||
ApplicationReport createAndGetApplicationReport(boolean allowAccess);
|
||||
|
||||
/**
|
||||
* To receive the collection of all {@link RMNode}s whose updates have been
|
||||
* received by the RMApp. Updates can be node becoming lost or becoming
|
||||
* healthy etc. The method clears the information from the {@link RMApp}. So
|
||||
* each call to this method gives the delta from the previous call.
|
||||
* @param updatedNodes Collection into which the updates are transferred
|
||||
* @return the number of nodes added to the {@link Collection}
|
||||
*/
|
||||
int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
|
||||
|
||||
/**
|
||||
* Application level metadata is stored in {@link ApplicationStore} which
|
||||
|
|
|
@ -29,5 +29,6 @@ public enum RMAppEventType {
|
|||
ATTEMPT_REGISTERED,
|
||||
ATTEMPT_FINISHED, // Will send the final state
|
||||
ATTEMPT_FAILED,
|
||||
ATTEMPT_KILLED
|
||||
ATTEMPT_KILLED,
|
||||
NODE_UPDATE
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -47,12 +48,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||
|
@ -87,6 +91,7 @@ public class RMAppImpl implements RMApp {
|
|||
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
|
||||
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
|
||||
private final long submitTime;
|
||||
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
|
||||
|
||||
// Mutable fields
|
||||
private long startTime;
|
||||
|
@ -107,6 +112,8 @@ public class RMAppImpl implements RMApp {
|
|||
|
||||
|
||||
// Transitions from NEW state
|
||||
.addTransition(RMAppState.NEW, RMAppState.NEW,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
|
||||
RMAppEventType.START, new StartAppAttemptTransition())
|
||||
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
|
||||
|
@ -115,6 +122,8 @@ public class RMAppImpl implements RMApp {
|
|||
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
|
||||
|
||||
// Transitions from SUBMITTED state
|
||||
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
|
||||
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
|
||||
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
|
||||
|
@ -123,6 +132,8 @@ public class RMAppImpl implements RMApp {
|
|||
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||
|
||||
// Transitions from ACCEPTED state
|
||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
|
||||
RMAppEventType.ATTEMPT_REGISTERED)
|
||||
.addTransition(RMAppState.ACCEPTED,
|
||||
|
@ -133,6 +144,8 @@ public class RMAppImpl implements RMApp {
|
|||
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||
|
||||
// Transitions from RUNNING state
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
||||
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
|
||||
.addTransition(RMAppState.RUNNING,
|
||||
|
@ -145,10 +158,16 @@ public class RMAppImpl implements RMApp {
|
|||
// Transitions from FINISHED state
|
||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||
RMAppEventType.KILL)
|
||||
// ignorable transitions
|
||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||
RMAppEventType.NODE_UPDATE)
|
||||
|
||||
// Transitions from FAILED state
|
||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||
RMAppEventType.KILL)
|
||||
// ignorable transitions
|
||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||
RMAppEventType.NODE_UPDATE)
|
||||
|
||||
// Transitions from KILLED state
|
||||
.addTransition(
|
||||
|
@ -158,6 +177,9 @@ public class RMAppImpl implements RMApp {
|
|||
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
|
||||
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
|
||||
RMAppEventType.ATTEMPT_KILLED))
|
||||
// ignorable transitions
|
||||
.addTransition(RMAppState.KILLED, RMAppState.KILLED,
|
||||
RMAppEventType.NODE_UPDATE)
|
||||
|
||||
.installTopology();
|
||||
|
||||
|
@ -330,6 +352,18 @@ public class RMAppImpl implements RMApp {
|
|||
throw new YarnException("Unknown state passed!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
int updatedNodeCount = this.updatedNodes.size();
|
||||
updatedNodes.addAll(this.updatedNodes);
|
||||
this.updatedNodes.clear();
|
||||
return updatedNodeCount;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
|
||||
|
@ -462,6 +496,13 @@ public class RMAppImpl implements RMApp {
|
|||
handler.handle(
|
||||
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
|
||||
}
|
||||
|
||||
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
|
||||
RMNodeState nodeState = node.getState();
|
||||
updatedNodes.add(node);
|
||||
LOG.debug("Received node update event:" + type + " for node:" + node
|
||||
+ " with state:" + nodeState);
|
||||
}
|
||||
|
||||
private static class RMAppTransition implements
|
||||
SingleArcTransition<RMAppImpl, RMAppEvent> {
|
||||
|
@ -470,6 +511,14 @@ public class RMAppImpl implements RMApp {
|
|||
|
||||
}
|
||||
|
||||
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
|
||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
|
||||
app.processNodeUpdate(nodeUpdateEvent.getUpdateType(),
|
||||
nodeUpdateEvent.getNode());
|
||||
};
|
||||
}
|
||||
|
||||
private static final class StartAppAttemptTransition extends RMAppTransition {
|
||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
app.createNewAttempt();
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
public class RMAppNodeUpdateEvent extends RMAppEvent {
|
||||
|
||||
public enum RMAppNodeUpdateType {
|
||||
NODE_USABLE,
|
||||
NODE_UNUSABLE
|
||||
}
|
||||
|
||||
private final RMNode node;
|
||||
private final RMAppNodeUpdateType updateType;
|
||||
|
||||
public RMAppNodeUpdateEvent(ApplicationId appId, RMNode node,
|
||||
RMAppNodeUpdateType updateType) {
|
||||
super(appId, RMAppEventType.NODE_UPDATE);
|
||||
this.node = node;
|
||||
this.updateType = updateType;
|
||||
}
|
||||
|
||||
public RMNode getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public RMAppNodeUpdateType getUpdateType() {
|
||||
return updateType;
|
||||
}
|
||||
|
||||
}
|
|
@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
|
@ -140,6 +142,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
|
||||
.addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
|
||||
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
|
||||
.addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
|
||||
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
|
||||
.addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY,
|
||||
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
|
||||
|
||||
// create the topology tables
|
||||
.installTopology();
|
||||
|
@ -444,6 +450,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
// Inform the scheduler
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeRemovedSchedulerEvent(rmNode));
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
|
||||
|
||||
// Deactivate the node
|
||||
rmNode.context.getRMNodes().remove(rmNode.nodeId);
|
||||
|
@ -473,6 +482,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
// Inform the scheduler
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeRemovedSchedulerEvent(rmNode));
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
|
||||
// Update metrics
|
||||
rmNode.updateMetricsForDeactivatedNode(RMNodeState.UNHEALTHY);
|
||||
return RMNodeState.UNHEALTHY;
|
||||
|
@ -547,6 +559,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeAddedSchedulerEvent(rmNode));
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||
// ??? how about updating metrics before notifying to ensure that
|
||||
// notifiers get update metadata because they will very likely query it
|
||||
// upon notification
|
||||
// Update metrics
|
||||
rmNode.updateMetricsForRejoinedNode(RMNodeState.UNHEALTHY);
|
||||
return RMNodeState.RUNNING;
|
||||
|
|
|
@ -18,6 +18,28 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
|
||||
//TODO yarn.api.records.NodeState is a clone of RMNodeState made for MR-3353.
|
||||
// In a subsequent patch RMNodeState should be replaced with NodeState
|
||||
public enum RMNodeState {
|
||||
NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST, REBOOTED
|
||||
}
|
||||
NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST, REBOOTED;
|
||||
|
||||
public static NodeState toNodeState(RMNodeState state) {
|
||||
switch(state) {
|
||||
case NEW:
|
||||
return NodeState.NEW;
|
||||
case RUNNING:
|
||||
return NodeState.RUNNING;
|
||||
case UNHEALTHY:
|
||||
return NodeState.UNHEALTHY;
|
||||
case DECOMMISSIONED:
|
||||
return NodeState.DECOMMISSIONED;
|
||||
case LOST:
|
||||
return NodeState.LOST;
|
||||
case REBOOTED:
|
||||
return NodeState.REBOOTED;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -219,6 +221,10 @@ public abstract class MockAsm extends MockApps {
|
|||
public FinalApplicationStatus getFinalApplicationStatus() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
@Override
|
||||
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
}
|
||||
|
||||
public static RMApp newApplication(int i) {
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import java.util.List;
|
||||
|
||||
public class TestAMRMRPCNodeUpdates {
|
||||
private MockRM rm;
|
||||
ApplicationMasterService amService = null;
|
||||
DrainDispatcher dispatcher = null;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
dispatcher = new DrainDispatcher();
|
||||
this.rm = new MockRM() {
|
||||
@Override
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new SchedulerEventDispatcher(this.scheduler) {
|
||||
@Override
|
||||
public void handle(SchedulerEvent event) {
|
||||
scheduler.handle(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
amService = rm.getApplicationMasterService();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (rm != null) {
|
||||
this.rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception {
|
||||
nm.nodeHeartbeat(health);
|
||||
dispatcher.await();
|
||||
}
|
||||
|
||||
private void syncNodeLost(MockNM nm) throws Exception {
|
||||
rm.sendNodeStarted(nm);
|
||||
rm.NMwaitForState(nm.getNodeId(), RMNodeState.RUNNING);
|
||||
rm.sendNodeLost(nm);
|
||||
dispatcher.await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMRMUnusableNodes() throws Exception {
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5000);
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 5000);
|
||||
MockNM nm3 = rm.registerNode("h3:1234", 5000);
|
||||
MockNM nm4 = rm.registerNode("h4:1234", 5000);
|
||||
|
||||
RMApp app1 = rm.submitApp(2000);
|
||||
|
||||
// Trigger the scheduling so the AM gets 'launched' on nm1
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
|
||||
// register AM returns no unusable node
|
||||
am1.registerAppAttempt();
|
||||
|
||||
// allocate request returns no updated node
|
||||
AllocateRequest allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
|
||||
.getAppAttemptId(), 0, 0F, null, null);
|
||||
AMResponse response1 = amService.allocate(allocateRequest1).getAMResponse();
|
||||
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(0, updatedNodes.size());
|
||||
|
||||
syncNodeHeartbeat(nm4, false);
|
||||
|
||||
// allocate request returns updated node
|
||||
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
|
||||
response1 = amService.allocate(allocateRequest1).getAMResponse();
|
||||
updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
NodeReport nr = updatedNodes.iterator().next();
|
||||
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
||||
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
||||
|
||||
// resending the allocate request returns the same result
|
||||
response1 = amService.allocate(allocateRequest1).getAMResponse();
|
||||
updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
nr = updatedNodes.iterator().next();
|
||||
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
||||
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
||||
|
||||
syncNodeLost(nm3);
|
||||
|
||||
// subsequent allocate request returns delta
|
||||
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
|
||||
response1 = amService.allocate(allocateRequest1).getAMResponse();
|
||||
updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
nr = updatedNodes.iterator().next();
|
||||
Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
|
||||
Assert.assertEquals(NodeState.LOST, nr.getNodeState());
|
||||
|
||||
// registering another AM gives it the complete failed list
|
||||
RMApp app2 = rm.submitApp(2000);
|
||||
// Trigger nm2 heartbeat so that AM gets launched on it
|
||||
nm2.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
|
||||
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
|
||||
|
||||
// register AM returns all unusable nodes
|
||||
am2.registerAppAttempt();
|
||||
|
||||
// allocate request returns no updated node
|
||||
AllocateRequest allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
|
||||
.getAppAttemptId(), 0, 0F, null, null);
|
||||
AMResponse response2 = amService.allocate(allocateRequest2).getAMResponse();
|
||||
updatedNodes = response2.getUpdatedNodes();
|
||||
Assert.assertEquals(0, updatedNodes.size());
|
||||
|
||||
syncNodeHeartbeat(nm4, true);
|
||||
|
||||
// both AM's should get delta updated nodes
|
||||
allocateRequest1 = BuilderUtils.newAllocateRequest(attempt1
|
||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null);
|
||||
response1 = amService.allocate(allocateRequest1).getAMResponse();
|
||||
updatedNodes = response1.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
nr = updatedNodes.iterator().next();
|
||||
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
||||
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
|
||||
|
||||
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
|
||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
|
||||
response2 = amService.allocate(allocateRequest2).getAMResponse();
|
||||
updatedNodes = response2.getUpdatedNodes();
|
||||
Assert.assertEquals(1, updatedNodes.size());
|
||||
nr = updatedNodes.iterator().next();
|
||||
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
||||
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
|
||||
|
||||
// subsequent allocate calls should return no updated nodes
|
||||
allocateRequest2 = BuilderUtils.newAllocateRequest(attempt2
|
||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null);
|
||||
response2 = amService.allocate(allocateRequest2).getAMResponse();
|
||||
updatedNodes = response2.getUpdatedNodes();
|
||||
Assert.assertEquals(0, updatedNodes.size());
|
||||
|
||||
// how to do the above for LOST node
|
||||
|
||||
}
|
||||
}
|
|
@ -82,7 +82,7 @@ public class TestNMExpiry {
|
|||
dispatcher);
|
||||
nmLivelinessMonitor.init(conf);
|
||||
nmLivelinessMonitor.start();
|
||||
NodesListManager nodesListManager = new NodesListManager();
|
||||
NodesListManager nodesListManager = new NodesListManager(context);
|
||||
nodesListManager.init(conf);
|
||||
resourceTrackerService = new ResourceTrackerService(context,
|
||||
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager);
|
||||
|
|
|
@ -70,7 +70,7 @@ public class TestRMNMRPCResponseId {
|
|||
new RMContextImpl(new MemStore(), dispatcher, null, null, null);
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
new ResourceManager.NodeEventDispatcher(context));
|
||||
NodesListManager nodesListManager = new NodesListManager();
|
||||
NodesListManager nodesListManager = new NodesListManager(context);
|
||||
Configuration conf = new Configuration();
|
||||
nodesListManager.init(conf);
|
||||
resourceTrackerService = new ResourceTrackerService(context,
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -25,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
||||
public class MockRMApp implements RMApp {
|
||||
static final int DT = 1000000; // ms
|
||||
|
@ -176,6 +179,11 @@ public class MockRMApp implements RMApp {
|
|||
@Override
|
||||
public FinalApplicationStatus getFinalApplicationStatus() {
|
||||
return FinalApplicationStatus.UNDEFINED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue