diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index f4c89f68be0..c687de665f1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; @@ -121,7 +122,8 @@ protected void serviceStop() throws Exception { public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { try { - return TypeConverter.fromYarnNodes(client.getNodeReports()); + return TypeConverter.fromYarnNodes( + client.getNodeReports(NodeState.RUNNING)); } catch (YarnException e) { throw new IOException(e); } @@ -309,8 +311,9 @@ public YarnClusterMetrics getYarnClusterMetrics() throws YarnException, } @Override - public List getNodeReports() throws YarnException, IOException { - return client.getNodeReports(); + public List getNodeReports(NodeState... states) + throws YarnException, IOException { + return client.getNodeReports(states); } @Override diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 06019087806..2968a87c505 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -213,6 +213,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-869. Move ResourceManagerAdministrationProtocol out of main YARN api. (vinodkv via acmurthy) + YARN-791. Changed RM APIs and web-services related to nodes to ensure that + both are consistent with each other. (Sandy Ryza via vinodkv) + NEW FEATURES YARN-482. FS: Extend SchedulingMode to intermediate queues. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterNodesRequest.java index 5904554907e..bf642cc129e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterNodesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterNodesRequest.java @@ -18,22 +18,34 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.EnumSet; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.util.Records; /** *

The request from clients to get a report of all nodes * in the cluster from the ResourceManager.

* - *

Currently, this is empty.

+ * The request will ask for all nodes in the given {@link NodeState}s. * * @see ApplicationClientProtocol#getClusterNodes(GetClusterNodesRequest) */ @Public @Stable public abstract class GetClusterNodesRequest { + @Public + @Stable + public static GetClusterNodesRequest newInstance(EnumSet states) { + GetClusterNodesRequest request = + Records.newRecord(GetClusterNodesRequest.class); + request.setNodeStates(states); + return request; + } + @Public @Stable public static GetClusterNodesRequest newInstance() { @@ -41,4 +53,14 @@ public static GetClusterNodesRequest newInstance() { Records.newRecord(GetClusterNodesRequest.class); return request; } + + /** + * The state to filter the cluster nodes with. + */ + public abstract EnumSet getNodeStates(); + + /** + * The state to filter the cluster nodes with. + */ + public abstract void setNodeStates(EnumSet states); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index b417245dedf..c82f5cc62e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -131,6 +131,7 @@ message GetAllApplicationsResponseProto { } message GetClusterNodesRequestProto { + repeated NodeStateProto nodeStates = 1; } message GetClusterNodesResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 43507256e36..e49c09aa365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -324,7 +326,8 @@ public boolean run() throws IOException, YarnException { LOG.info("Got Cluster metric info from ASM" + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); - List clusterNodeReports = yarnClient.getNodeReports(); + List clusterNodeReports = yarnClient.getNodeReports( + NodeState.RUNNING); LOG.info("Got Cluster node info from ASM"); for (NodeReport node : clusterNodeReports) { LOG.info("Got node report from ASM for" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 869a52bccc5..f55c5f36915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Token; @@ -190,14 +191,17 @@ public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException, /** *

- * Get a report of all nodes ({@link NodeReport}) in the cluster. + * Get a report of nodes ({@link NodeReport}) in the cluster. *

* - * @return A list of report of all nodes + * @param states The {@link NodeState}s to filter on. If no filter states are + * given, nodes in all states will be returned. + * @return A list of node reports * @throws YarnException * @throws IOException */ - public abstract List getNodeReports() throws YarnException, IOException; + public abstract List getNodeReports(NodeState... states) + throws YarnException, IOException; /** *

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index cbe8120eac6..7a7affbeb34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import org.apache.commons.logging.Log; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Token; @@ -222,10 +224,15 @@ public YarnClusterMetrics getYarnClusterMetrics() throws YarnException, } @Override - public List getNodeReports() throws YarnException, + public List getNodeReports(NodeState... states) throws YarnException, IOException { - GetClusterNodesRequest request = - Records.newRecord(GetClusterNodesRequest.class); + EnumSet statesSet = (states.length == 0) ? + EnumSet.allOf(NodeState.class) : EnumSet.noneOf(NodeState.class); + for (NodeState state : states) { + statesSet.add(state); + } + GetClusterNodesRequest request = GetClusterNodesRequest + .newInstance(statesSet); GetClusterNodesResponse response = rmClient.getClusterNodes(request); return response.getNodeReports(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java index 1de45ed96ae..6ad27942c2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java @@ -33,6 +33,7 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -56,7 +57,7 @@ public int run(String[] args) throws Exception { Options opts = new Options(); opts.addOption(STATUS_CMD, true, "Prints the status report of the node."); - opts.addOption(LIST_CMD, false, "Lists all the nodes."); + opts.addOption(LIST_CMD, false, "Lists all the nodes in the RUNNING state."); CommandLine cliParser = new GnuParser().parse(opts, args); int exitCode = -1; @@ -92,7 +93,7 @@ private void printUsage(Options opts) { */ private void listClusterNodes() throws YarnException, IOException { PrintWriter writer = new PrintWriter(sysout); - List nodesReport = client.getNodeReports(); + List nodesReport = client.getNodeReports(NodeState.RUNNING); writer.println("Total Nodes:" + nodesReport.size()); writer.printf(NODES_PATTERN, "Node-Id", "Node-State", "Node-Http-Address", "Running-Containers"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 4c034ae0218..34bc48b1362 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -101,7 +102,7 @@ public static void setup() throws Exception { yarnClient.start(); // get node info - nodeReports = yarnClient.getNodeReports(); + nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); priority = Priority.newInstance(1); capability = Resource.newInstance(1024, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index dc6367b10b2..adc92aeb584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -95,7 +96,7 @@ public void setup() throws YarnException, IOException { assertEquals(STATE.STARTED, yarnClient.getServiceState()); // get node info - nodeReports = yarnClient.getNodeReports(); + nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); // submit new app ApplicationSubmissionContext appContext = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 365bc8e0bb1..d1f52a4f897 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -152,12 +152,13 @@ public void testKillApplication() throws Exception { @Test public void testListClusterNodes() throws Exception { NodeCLI cli = new NodeCLI(); - when(client.getNodeReports()).thenReturn(getNodeReports(3)); + when(client.getNodeReports(NodeState.RUNNING)).thenReturn( + getNodeReports(3)); cli.setClient(client); cli.setSysOutPrintStream(sysOut); int result = cli.run(new String[] { "-list" }); assertEquals(0, result); - verify(client).getNodeReports(); + verify(client).getNodeReports(NodeState.RUNNING); ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("Total Nodes:3"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesRequestPBImpl.java index 766db2240b4..4e51320bf17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterNodesRequestPBImpl.java @@ -20,8 +20,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProtoOrBuilder; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; @Private @Unstable @@ -31,6 +38,8 @@ public class GetClusterNodesRequestPBImpl extends GetClusterNodesRequest { GetClusterNodesRequestProto.Builder builder = null; boolean viaProto = false; + private EnumSet states = null; + public GetClusterNodesRequestPBImpl() { builder = GetClusterNodesRequestProto.newBuilder(); } @@ -41,11 +50,91 @@ public GetClusterNodesRequestPBImpl(GetClusterNodesRequestProto proto) { } public GetClusterNodesRequestProto getProto() { + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } + + @Override + public EnumSet getNodeStates() { + initNodeStates(); + return this.states; + } + + @Override + public void setNodeStates(final EnumSet states) { + initNodeStates(); + this.states.clear(); + if (states == null) { + return; + } + this.states.addAll(states); + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetClusterNodesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.states != null) { + maybeInitBuilder(); + builder.clearNodeStates(); + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = states.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public NodeStateProto next() { + return ProtoUtils.convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllNodeStates(iterable); + } + } + + private void initNodeStates() { + if (this.states != null) { + return; + } + GetClusterNodesRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNodeStatesList(); + this.states = EnumSet.noneOf(NodeState.class); + + for (NodeStateProto c : list) { + this.states.add(ProtoUtils.convertFromProtoFormat(c)); + } + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 512ba83c006..897d53459b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -419,7 +420,13 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnException { GetClusterNodesResponse response = recordFactory.newRecordInstance(GetClusterNodesResponse.class); - Collection nodes = this.rmContext.getRMNodes().values(); + EnumSet nodeStates = request.getNodeStates(); + if (nodeStates == null || nodeStates.isEmpty()) { + nodeStates = EnumSet.allOf(NodeState.class); + } + Collection nodes = RMServerUtils.queryRMNodes(rmContext, + nodeStates); + List nodeReports = new ArrayList(nodes.size()); for (RMNode nodeInfo : nodes) { nodeReports.add(createNodeReports(nodeInfo)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java new file mode 100644 index 00000000000..5618100b1aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +/** + * Utility methods to aid serving RM data through the REST and RPC APIs + */ +public class RMServerUtils { + public static List queryRMNodes(RMContext context, + EnumSet acceptedStates) { + // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY + ArrayList results = new ArrayList(); + if (acceptedStates.contains(NodeState.NEW) || + acceptedStates.contains(NodeState.RUNNING) || + acceptedStates.contains(NodeState.UNHEALTHY)) { + for (RMNode rmNode : context.getRMNodes().values()) { + if (acceptedStates.contains(rmNode.getState())) { + results.add(rmNode); + } + } + } + + // inactiveNodes contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED + if (acceptedStates.contains(NodeState.DECOMMISSIONED) || + acceptedStates.contains(NodeState.LOST) || + acceptedStates.contains(NodeState.REBOOTED)) { + for (RMNode rmNode : context.getInactiveRMNodes().values()) { + if (acceptedStates.contains(rmNode.getState())) { + results.add(rmNode); + } + } + } + return results; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index abb3cfc9259..d17c8804356 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; import java.util.concurrent.ConcurrentMap; import javax.servlet.http.HttpServletRequest; @@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -149,57 +152,42 @@ public SchedulerTypeInfo getSchedulerInfo() { } /** - * If no params are given, returns all active nodes, which includes - * nodes in the NEW and RUNNING states. If state param is "all", returns all - * nodes in all states. Otherwise, if the state param is set to a state name, - * returns all nodes that are in that state. + * Returns all nodes in the cluster. If the states param is given, returns + * all nodes that are in the comma-separated list of states. */ @GET @Path("/nodes") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) - public NodesInfo getNodes(@QueryParam("state") String state) { + public NodesInfo getNodes(@QueryParam("states") String states) { init(); ResourceScheduler sched = this.rm.getResourceScheduler(); if (sched == null) { throw new NotFoundException("Null ResourceScheduler instance"); } - NodeState acceptedState = null; - boolean all = false; - - if (state != null && !state.isEmpty()) { - if (state.equalsIgnoreCase("all")) { - all = true; - } else { - acceptedState = NodeState.valueOf(state.toUpperCase()); + EnumSet acceptedStates; + if (states == null) { + acceptedStates = EnumSet.allOf(NodeState.class); + } else { + acceptedStates = EnumSet.noneOf(NodeState.class); + for (String stateStr : states.split(",")) { + acceptedStates.add(NodeState.valueOf(stateStr.toUpperCase())); } } - // getRMNodes() contains nodes that are NEW, RUNNING OR UNHEALTHY - NodesInfo allNodes = new NodesInfo(); - for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) { - if (all || (acceptedState == null && ni.getState() != NodeState.UNHEALTHY) - || acceptedState == ni.getState()) { - NodeInfo nodeInfo = new NodeInfo(ni, sched); - allNodes.add(nodeInfo); + Collection rmNodes = RMServerUtils.queryRMNodes(this.rm.getRMContext(), + acceptedStates); + NodesInfo nodesInfo = new NodesInfo(); + for (RMNode rmNode : rmNodes) { + NodeInfo nodeInfo = new NodeInfo(rmNode, sched); + if (EnumSet.of(NodeState.LOST, NodeState.DECOMMISSIONED, NodeState.REBOOTED) + .contains(rmNode.getState())) { + nodeInfo.setNodeHTTPAddress(EMPTY); } + nodesInfo.add(nodeInfo); } - // getInactiveNodes() contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED - if (all || (acceptedState != null && - (acceptedState == NodeState.DECOMMISSIONED || - acceptedState == NodeState.LOST || - acceptedState == NodeState.REBOOTED))) { - for (RMNode ni : this.rm.getRMContext().getInactiveRMNodes().values()) { - if (all || acceptedState == ni.getState()) { - NodeInfo nodeInfo = new NodeInfo(ni, sched); - nodeInfo.setNodeHTTPAddress(EMPTY); - allNodes.add(nodeInfo); - } - } - } - - return allNodes; + return nodesInfo; } @GET diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index b156d5fbebd..69107290669 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; @@ -116,8 +117,16 @@ protected ClientRMService createClientRMService() { rm.start(); // Add a healthy node - MockNM node = rm.registerNode("host:1234", 1024); + MockNM node = rm.registerNode("host1:1234", 1024); + rm.sendNodeStarted(node); node.nodeHeartbeat(true); + + // Add and lose a node + MockNM lostNode = rm.registerNode("host2:1235", 1024); + rm.sendNodeStarted(lostNode); + lostNode.nodeHeartbeat(true); + rm.NMwaitForState(lostNode.getNodeId(), NodeState.RUNNING); + rm.sendNodeLost(lostNode); // Create a client. Configuration conf = new Configuration(); @@ -130,7 +139,7 @@ protected ClientRMService createClientRMService() { // Make call GetClusterNodesRequest request = - Records.newRecord(GetClusterNodesRequest.class); + GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING)); List nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(1, nodeReports.size()); @@ -142,9 +151,21 @@ protected ClientRMService createClientRMService() { // Call again nodeReports = client.getClusterNodes(request).getNodeReports(); + Assert.assertEquals("Unhealthy nodes should not show up by default", 0, + nodeReports.size()); + + // Now query for UNHEALTHY nodes + request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY)); + nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(1, nodeReports.size()); Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY, nodeReports.get(0).getNodeState()); + + // Query all states should return all nodes + rm.registerNode("host3:1236", 1024); + request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); + nodeReports = client.getClusterNodes(request).getNodeReports(); + Assert.assertEquals(3, nodeReports.size()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index e3e896bd816..18bc91664d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -24,6 +24,7 @@ import java.io.StringReader; import java.util.ArrayList; +import java.util.EnumSet; import javax.ws.rs.core.MediaType; import javax.xml.parsers.DocumentBuilder; @@ -55,6 +56,7 @@ import org.w3c.dom.NodeList; import org.xml.sax.InputSource; +import com.google.common.base.Joiner; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.servlet.GuiceServletContextListener; @@ -136,8 +138,6 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException, rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); - // One unhealthy node which should not appear in the list after - // MAPREDUCE-3760. MockNM nm3 = rm.registerNode("h3:1236", 5122); rm.NMwaitForState(nm3.getNodeId(), NodeState.NEW); rm.sendNodeStarted(nm3); @@ -160,8 +160,8 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException, JSONObject nodes = json.getJSONObject("nodes"); assertEquals("incorrect number of elements", 1, nodes.length()); JSONArray nodeArray = nodes.getJSONArray("node"); - // Just 2 nodes, leaving behind the unhealthy node. - assertEquals("incorrect number of elements", 2, nodeArray.length()); + // 3 nodes, including the unhealthy node and the new node. + assertEquals("incorrect number of elements", 3, nodeArray.length()); } @Test @@ -174,7 +174,7 @@ public void testNodesQueryNew() throws JSONException, Exception { rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("nodes").queryParam("state", NodeState.NEW.toString()) + .path("nodes").queryParam("states", NodeState.NEW.toString()) .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); @@ -197,7 +197,7 @@ public void testNodesQueryStateNone() throws JSONException, Exception { ClientResponse response = r.path("ws").path("v1").path("cluster") .path("nodes") - .queryParam("state", NodeState.DECOMMISSIONED.toString()) + .queryParam("states", NodeState.DECOMMISSIONED.toString()) .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); JSONObject json = response.getEntity(JSONObject.class); @@ -213,7 +213,7 @@ public void testNodesQueryStateInvalid() throws JSONException, Exception { try { r.path("ws").path("v1").path("cluster").path("nodes") - .queryParam("state", "BOGUSSTATE").accept(MediaType.APPLICATION_JSON) + .queryParam("states", "BOGUSSTATE").accept(MediaType.APPLICATION_JSON) .get(JSONObject.class); fail("should have thrown exception querying invalid state"); @@ -257,7 +257,7 @@ public void testNodesQueryStateLost() throws JSONException, Exception { rm.sendNodeLost(nm2); ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("nodes").queryParam("state", NodeState.LOST.toString()) + .path("nodes").queryParam("states", NodeState.LOST.toString()) .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); @@ -316,7 +316,7 @@ public void testNodesQueryRunning() throws JSONException, Exception { rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("nodes").queryParam("state", "running") + .path("nodes").queryParam("states", "running") .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); JSONObject json = response.getEntity(JSONObject.class); @@ -336,7 +336,7 @@ public void testNodesQueryHealthyFalse() throws JSONException, Exception { rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW); ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("nodes").queryParam("state", "UNHEALTHY") + .path("nodes").queryParam("states", "UNHEALTHY") .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); JSONObject json = response.getEntity(JSONObject.class); @@ -349,6 +349,11 @@ public void testNodesHelper(String path, String media) throws JSONException, WebResource r = resource(); MockNM nm1 = rm.registerNode("h1:1234", 5120); MockNM nm2 = rm.registerNode("h2:1235", 5121); + rm.sendNodeStarted(nm1); + rm.sendNodeStarted(nm2); + rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); + rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING); + ClientResponse response = r.path("ws").path("v1").path("cluster") .path(path).accept(media).get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); @@ -622,7 +627,8 @@ public void testQueryAll() throws Exception { rm.sendNodeLost(nm3); ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("nodes").queryParam("state", "aLl") + .path("nodes") + .queryParam("states", Joiner.on(',').join(EnumSet.allOf(NodeState.class))) .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());