YARN-791. Changed RM APIs and web-services related to nodes to ensure that both are consistent with each other. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1500994 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-07-08 22:30:52 +00:00
parent e4e0499fc9
commit 5e4f6ad1d9
17 changed files with 282 additions and 66 deletions

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@ -121,7 +122,8 @@ public class ResourceMgrDelegate extends YarnClient {
public TaskTrackerInfo[] getActiveTrackers() throws IOException, public TaskTrackerInfo[] getActiveTrackers() throws IOException,
InterruptedException { InterruptedException {
try { try {
return TypeConverter.fromYarnNodes(client.getNodeReports()); return TypeConverter.fromYarnNodes(
client.getNodeReports(NodeState.RUNNING));
} catch (YarnException e) { } catch (YarnException e) {
throw new IOException(e); throw new IOException(e);
} }
@ -309,8 +311,9 @@ public class ResourceMgrDelegate extends YarnClient {
} }
@Override @Override
public List<NodeReport> getNodeReports() throws YarnException, IOException { public List<NodeReport> getNodeReports(NodeState... states)
return client.getNodeReports(); throws YarnException, IOException {
return client.getNodeReports(states);
} }
@Override @Override

View File

@ -230,6 +230,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-869. Move ResourceManagerAdministrationProtocol out of main YARN api. YARN-869. Move ResourceManagerAdministrationProtocol out of main YARN api.
(vinodkv via acmurthy) (vinodkv via acmurthy)
YARN-791. Changed RM APIs and web-services related to nodes to ensure that
both are consistent with each other. (Sandy Ryza via vinodkv)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -18,22 +18,34 @@
package org.apache.hadoop.yarn.api.protocolrecords; package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
* <p>The request from clients to get a report of all nodes * <p>The request from clients to get a report of all nodes
* in the cluster from the <code>ResourceManager</code>.</p> * in the cluster from the <code>ResourceManager</code>.</p>
* *
* <p>Currently, this is empty.</p> * The request will ask for all nodes in the given {@link NodeState}s.
* *
* @see ApplicationClientProtocol#getClusterNodes(GetClusterNodesRequest) * @see ApplicationClientProtocol#getClusterNodes(GetClusterNodesRequest)
*/ */
@Public @Public
@Stable @Stable
public abstract class GetClusterNodesRequest { public abstract class GetClusterNodesRequest {
@Public
@Stable
public static GetClusterNodesRequest newInstance(EnumSet<NodeState> states) {
GetClusterNodesRequest request =
Records.newRecord(GetClusterNodesRequest.class);
request.setNodeStates(states);
return request;
}
@Public @Public
@Stable @Stable
public static GetClusterNodesRequest newInstance() { public static GetClusterNodesRequest newInstance() {
@ -41,4 +53,14 @@ public abstract class GetClusterNodesRequest {
Records.newRecord(GetClusterNodesRequest.class); Records.newRecord(GetClusterNodesRequest.class);
return request; return request;
} }
/**
* The state to filter the cluster nodes with.
*/
public abstract EnumSet<NodeState> getNodeStates();
/**
* The state to filter the cluster nodes with.
*/
public abstract void setNodeStates(EnumSet<NodeState> states);
} }

View File

@ -131,6 +131,7 @@ message GetAllApplicationsResponseProto {
} }
message GetClusterNodesRequestProto { message GetClusterNodesRequestProto {
repeated NodeStateProto nodeStates = 1;
} }
message GetClusterNodesResponseProto { message GetClusterNodesResponseProto {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -324,7 +326,8 @@ public class Client {
LOG.info("Got Cluster metric info from ASM" LOG.info("Got Cluster metric info from ASM"
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(); List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
NodeState.RUNNING);
LOG.info("Got Cluster node info from ASM"); LOG.info("Got Cluster node info from ASM");
for (NodeReport node : clusterNodeReports) { for (NodeReport node : clusterNodeReports) {
LOG.info("Got node report from ASM for" LOG.info("Got node report from ASM for"

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
@ -190,14 +191,17 @@ public abstract class YarnClient extends AbstractService {
/** /**
* <p> * <p>
* Get a report of all nodes ({@link NodeReport}) in the cluster. * Get a report of nodes ({@link NodeReport}) in the cluster.
* </p> * </p>
* *
* @return A list of report of all nodes * @param states The {@link NodeState}s to filter on. If no filter states are
* given, nodes in all states will be returned.
* @return A list of node reports
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
public abstract List<NodeReport> getNodeReports() throws YarnException, IOException; public abstract List<NodeReport> getNodeReports(NodeState... states)
throws YarnException, IOException;
/** /**
* <p> * <p>

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
@ -222,10 +224,15 @@ public class YarnClientImpl extends YarnClient {
} }
@Override @Override
public List<NodeReport> getNodeReports() throws YarnException, public List<NodeReport> getNodeReports(NodeState... states) throws YarnException,
IOException { IOException {
GetClusterNodesRequest request = EnumSet<NodeState> statesSet = (states.length == 0) ?
Records.newRecord(GetClusterNodesRequest.class); EnumSet.allOf(NodeState.class) : EnumSet.noneOf(NodeState.class);
for (NodeState state : states) {
statesSet.add(state);
}
GetClusterNodesRequest request = GetClusterNodesRequest
.newInstance(statesSet);
GetClusterNodesResponse response = rmClient.getClusterNodes(request); GetClusterNodesResponse response = rmClient.getClusterNodes(request);
return response.getNodeReports(); return response.getNodeReports();
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -56,7 +57,7 @@ public class NodeCLI extends YarnCLI {
Options opts = new Options(); Options opts = new Options();
opts.addOption(STATUS_CMD, true, "Prints the status report of the node."); opts.addOption(STATUS_CMD, true, "Prints the status report of the node.");
opts.addOption(LIST_CMD, false, "Lists all the nodes."); opts.addOption(LIST_CMD, false, "Lists all the nodes in the RUNNING state.");
CommandLine cliParser = new GnuParser().parse(opts, args); CommandLine cliParser = new GnuParser().parse(opts, args);
int exitCode = -1; int exitCode = -1;
@ -92,7 +93,7 @@ public class NodeCLI extends YarnCLI {
*/ */
private void listClusterNodes() throws YarnException, IOException { private void listClusterNodes() throws YarnException, IOException {
PrintWriter writer = new PrintWriter(sysout); PrintWriter writer = new PrintWriter(sysout);
List<NodeReport> nodesReport = client.getNodeReports(); List<NodeReport> nodesReport = client.getNodeReports(NodeState.RUNNING);
writer.println("Total Nodes:" + nodesReport.size()); writer.println("Total Nodes:" + nodesReport.size());
writer.printf(NODES_PATTERN, "Node-Id", "Node-State", "Node-Http-Address", writer.printf(NODES_PATTERN, "Node-Id", "Node-State", "Node-Http-Address",
"Running-Containers"); "Running-Containers");

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -101,7 +102,7 @@ public class TestAMRMClient {
yarnClient.start(); yarnClient.start();
// get node info // get node info
nodeReports = yarnClient.getNodeReports(); nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
priority = Priority.newInstance(1); priority = Priority.newInstance(1);
capability = Resource.newInstance(1024, 1); capability = Resource.newInstance(1024, 1);

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -95,7 +96,7 @@ public class TestNMClient {
assertEquals(STATE.STARTED, yarnClient.getServiceState()); assertEquals(STATE.STARTED, yarnClient.getServiceState());
// get node info // get node info
nodeReports = yarnClient.getNodeReports(); nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
// submit new app // submit new app
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =

View File

@ -152,12 +152,13 @@ public class TestYarnCLI {
@Test @Test
public void testListClusterNodes() throws Exception { public void testListClusterNodes() throws Exception {
NodeCLI cli = new NodeCLI(); NodeCLI cli = new NodeCLI();
when(client.getNodeReports()).thenReturn(getNodeReports(3)); when(client.getNodeReports(NodeState.RUNNING)).thenReturn(
getNodeReports(3));
cli.setClient(client); cli.setClient(client);
cli.setSysOutPrintStream(sysOut); cli.setSysOutPrintStream(sysOut);
int result = cli.run(new String[] { "-list" }); int result = cli.run(new String[] { "-list" });
assertEquals(0, result); assertEquals(0, result);
verify(client).getNodeReports(); verify(client).getNodeReports(NodeState.RUNNING);
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos); PrintWriter pw = new PrintWriter(baos);
pw.println("Total Nodes:3"); pw.println("Total Nodes:3");

View File

@ -20,8 +20,15 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
@Private @Private
@Unstable @Unstable
@ -31,6 +38,8 @@ public class GetClusterNodesRequestPBImpl extends GetClusterNodesRequest {
GetClusterNodesRequestProto.Builder builder = null; GetClusterNodesRequestProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private EnumSet<NodeState> states = null;
public GetClusterNodesRequestPBImpl() { public GetClusterNodesRequestPBImpl() {
builder = GetClusterNodesRequestProto.newBuilder(); builder = GetClusterNodesRequestProto.newBuilder();
} }
@ -41,11 +50,91 @@ public class GetClusterNodesRequestPBImpl extends GetClusterNodesRequest {
} }
public GetClusterNodesRequestProto getProto() { public GetClusterNodesRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
return proto; return proto;
} }
@Override
public EnumSet<NodeState> getNodeStates() {
initNodeStates();
return this.states;
}
@Override
public void setNodeStates(final EnumSet<NodeState> states) {
initNodeStates();
this.states.clear();
if (states == null) {
return;
}
this.states.addAll(states);
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetClusterNodesRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.states != null) {
maybeInitBuilder();
builder.clearNodeStates();
Iterable<NodeStateProto> iterable = new Iterable<NodeStateProto>() {
@Override
public Iterator<NodeStateProto> iterator() {
return new Iterator<NodeStateProto>() {
Iterator<NodeState> iter = states.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public NodeStateProto next() {
return ProtoUtils.convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllNodeStates(iterable);
}
}
private void initNodeStates() {
if (this.states != null) {
return;
}
GetClusterNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
List<NodeStateProto> list = p.getNodeStatesList();
this.states = EnumSet.noneOf(NodeState.class);
for (NodeStateProto c : list) {
this.states.add(ProtoUtils.convertFromProtoFormat(c));
}
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -419,7 +420,13 @@ public class ClientRMService extends AbstractService implements
throws YarnException { throws YarnException {
GetClusterNodesResponse response = GetClusterNodesResponse response =
recordFactory.newRecordInstance(GetClusterNodesResponse.class); recordFactory.newRecordInstance(GetClusterNodesResponse.class);
Collection<RMNode> nodes = this.rmContext.getRMNodes().values(); EnumSet<NodeState> nodeStates = request.getNodeStates();
if (nodeStates == null || nodeStates.isEmpty()) {
nodeStates = EnumSet.allOf(NodeState.class);
}
Collection<RMNode> nodes = RMServerUtils.queryRMNodes(rmContext,
nodeStates);
List<NodeReport> nodeReports = new ArrayList<NodeReport>(nodes.size()); List<NodeReport> nodeReports = new ArrayList<NodeReport>(nodes.size());
for (RMNode nodeInfo : nodes) { for (RMNode nodeInfo : nodes) {
nodeReports.add(createNodeReports(nodeInfo)); nodeReports.add(createNodeReports(nodeInfo));

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
/**
* Utility methods to aid serving RM data through the REST and RPC APIs
*/
public class RMServerUtils {
public static List<RMNode> queryRMNodes(RMContext context,
EnumSet<NodeState> acceptedStates) {
// nodes contains nodes that are NEW, RUNNING OR UNHEALTHY
ArrayList<RMNode> results = new ArrayList<RMNode>();
if (acceptedStates.contains(NodeState.NEW) ||
acceptedStates.contains(NodeState.RUNNING) ||
acceptedStates.contains(NodeState.UNHEALTHY)) {
for (RMNode rmNode : context.getRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
}
// inactiveNodes contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED
if (acceptedStates.contains(NodeState.DECOMMISSIONED) ||
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
}
return results;
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp; package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -149,57 +152,42 @@ public class RMWebServices {
} }
/** /**
* If no params are given, returns all active nodes, which includes * Returns all nodes in the cluster. If the states param is given, returns
* nodes in the NEW and RUNNING states. If state param is "all", returns all * all nodes that are in the comma-separated list of states.
* nodes in all states. Otherwise, if the state param is set to a state name,
* returns all nodes that are in that state.
*/ */
@GET @GET
@Path("/nodes") @Path("/nodes")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public NodesInfo getNodes(@QueryParam("state") String state) { public NodesInfo getNodes(@QueryParam("states") String states) {
init(); init();
ResourceScheduler sched = this.rm.getResourceScheduler(); ResourceScheduler sched = this.rm.getResourceScheduler();
if (sched == null) { if (sched == null) {
throw new NotFoundException("Null ResourceScheduler instance"); throw new NotFoundException("Null ResourceScheduler instance");
} }
NodeState acceptedState = null; EnumSet<NodeState> acceptedStates;
boolean all = false; if (states == null) {
acceptedStates = EnumSet.allOf(NodeState.class);
if (state != null && !state.isEmpty()) {
if (state.equalsIgnoreCase("all")) {
all = true;
} else { } else {
acceptedState = NodeState.valueOf(state.toUpperCase()); acceptedStates = EnumSet.noneOf(NodeState.class);
for (String stateStr : states.split(",")) {
acceptedStates.add(NodeState.valueOf(stateStr.toUpperCase()));
} }
} }
// getRMNodes() contains nodes that are NEW, RUNNING OR UNHEALTHY Collection<RMNode> rmNodes = RMServerUtils.queryRMNodes(this.rm.getRMContext(),
NodesInfo allNodes = new NodesInfo(); acceptedStates);
for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) { NodesInfo nodesInfo = new NodesInfo();
if (all || (acceptedState == null && ni.getState() != NodeState.UNHEALTHY) for (RMNode rmNode : rmNodes) {
|| acceptedState == ni.getState()) { NodeInfo nodeInfo = new NodeInfo(rmNode, sched);
NodeInfo nodeInfo = new NodeInfo(ni, sched); if (EnumSet.of(NodeState.LOST, NodeState.DECOMMISSIONED, NodeState.REBOOTED)
allNodes.add(nodeInfo); .contains(rmNode.getState())) {
}
}
// getInactiveNodes() contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED
if (all || (acceptedState != null &&
(acceptedState == NodeState.DECOMMISSIONED ||
acceptedState == NodeState.LOST ||
acceptedState == NodeState.REBOOTED))) {
for (RMNode ni : this.rm.getRMContext().getInactiveRMNodes().values()) {
if (all || acceptedState == ni.getState()) {
NodeInfo nodeInfo = new NodeInfo(ni, sched);
nodeInfo.setNodeHTTPAddress(EMPTY); nodeInfo.setNodeHTTPAddress(EMPTY);
allNodes.add(nodeInfo);
}
} }
nodesInfo.add(nodeInfo);
} }
return allNodes; return nodesInfo;
} }
@GET @GET

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -116,9 +117,17 @@ public class TestClientRMService {
rm.start(); rm.start();
// Add a healthy node // Add a healthy node
MockNM node = rm.registerNode("host:1234", 1024); MockNM node = rm.registerNode("host1:1234", 1024);
rm.sendNodeStarted(node);
node.nodeHeartbeat(true); node.nodeHeartbeat(true);
// Add and lose a node
MockNM lostNode = rm.registerNode("host2:1235", 1024);
rm.sendNodeStarted(lostNode);
lostNode.nodeHeartbeat(true);
rm.NMwaitForState(lostNode.getNodeId(), NodeState.RUNNING);
rm.sendNodeLost(lostNode);
// Create a client. // Create a client.
Configuration conf = new Configuration(); Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
@ -130,7 +139,7 @@ public class TestClientRMService {
// Make call // Make call
GetClusterNodesRequest request = GetClusterNodesRequest request =
Records.newRecord(GetClusterNodesRequest.class); GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING));
List<NodeReport> nodeReports = List<NodeReport> nodeReports =
client.getClusterNodes(request).getNodeReports(); client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size()); Assert.assertEquals(1, nodeReports.size());
@ -142,9 +151,21 @@ public class TestClientRMService {
// Call again // Call again
nodeReports = client.getClusterNodes(request).getNodeReports(); nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals("Unhealthy nodes should not show up by default", 0,
nodeReports.size());
// Now query for UNHEALTHY nodes
request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY));
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size()); Assert.assertEquals(1, nodeReports.size());
Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY, Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY,
nodeReports.get(0).getNodeState()); nodeReports.get(0).getNodeState());
// Query all states should return all nodes
rm.registerNode("host3:1236", 1024);
request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(3, nodeReports.size());
} }
@Test @Test

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.io.StringReader; import java.io.StringReader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
@ -55,6 +56,7 @@ import org.w3c.dom.Element;
import org.w3c.dom.NodeList; import org.w3c.dom.NodeList;
import org.xml.sax.InputSource; import org.xml.sax.InputSource;
import com.google.common.base.Joiner;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener; import com.google.inject.servlet.GuiceServletContextListener;
@ -136,8 +138,6 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
// One unhealthy node which should not appear in the list after
// MAPREDUCE-3760.
MockNM nm3 = rm.registerNode("h3:1236", 5122); MockNM nm3 = rm.registerNode("h3:1236", 5122);
rm.NMwaitForState(nm3.getNodeId(), NodeState.NEW); rm.NMwaitForState(nm3.getNodeId(), NodeState.NEW);
rm.sendNodeStarted(nm3); rm.sendNodeStarted(nm3);
@ -160,8 +160,8 @@ public class TestRMWebServicesNodes extends JerseyTest {
JSONObject nodes = json.getJSONObject("nodes"); JSONObject nodes = json.getJSONObject("nodes");
assertEquals("incorrect number of elements", 1, nodes.length()); assertEquals("incorrect number of elements", 1, nodes.length());
JSONArray nodeArray = nodes.getJSONArray("node"); JSONArray nodeArray = nodes.getJSONArray("node");
// Just 2 nodes, leaving behind the unhealthy node. // 3 nodes, including the unhealthy node and the new node.
assertEquals("incorrect number of elements", 2, nodeArray.length()); assertEquals("incorrect number of elements", 3, nodeArray.length());
} }
@Test @Test
@ -174,7 +174,7 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", NodeState.NEW.toString()) .path("nodes").queryParam("states", NodeState.NEW.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
@ -197,7 +197,7 @@ public class TestRMWebServicesNodes extends JerseyTest {
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes") .path("nodes")
.queryParam("state", NodeState.DECOMMISSIONED.toString()) .queryParam("states", NodeState.DECOMMISSIONED.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class); JSONObject json = response.getEntity(JSONObject.class);
@ -213,7 +213,7 @@ public class TestRMWebServicesNodes extends JerseyTest {
try { try {
r.path("ws").path("v1").path("cluster").path("nodes") r.path("ws").path("v1").path("cluster").path("nodes")
.queryParam("state", "BOGUSSTATE").accept(MediaType.APPLICATION_JSON) .queryParam("states", "BOGUSSTATE").accept(MediaType.APPLICATION_JSON)
.get(JSONObject.class); .get(JSONObject.class);
fail("should have thrown exception querying invalid state"); fail("should have thrown exception querying invalid state");
@ -257,7 +257,7 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.sendNodeLost(nm2); rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", NodeState.LOST.toString()) .path("nodes").queryParam("states", NodeState.LOST.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
@ -316,7 +316,7 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", "running") .path("nodes").queryParam("states", "running")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class); JSONObject json = response.getEntity(JSONObject.class);
@ -336,7 +336,7 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", "UNHEALTHY") .path("nodes").queryParam("states", "UNHEALTHY")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class); JSONObject json = response.getEntity(JSONObject.class);
@ -349,6 +349,11 @@ public class TestRMWebServicesNodes extends JerseyTest {
WebResource r = resource(); WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120); MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1235", 5121); MockNM nm2 = rm.registerNode("h2:1235", 5121);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING);
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path(path).accept(media).get(ClientResponse.class); .path(path).accept(media).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
@ -622,7 +627,8 @@ public class TestRMWebServicesNodes extends JerseyTest {
rm.sendNodeLost(nm3); rm.sendNodeLost(nm3);
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", "aLl") .path("nodes")
.queryParam("states", Joiner.on(',').join(EnumSet.allOf(NodeState.class)))
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());