YARN-791. Changed RM APIs and web-services related to nodes to ensure that both are consistent with each other. Contributed by Sandy Ryza.

svn merge --ignore-ancestry -c 1500994 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1500995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-07-08 22:33:43 +00:00
parent fd159ded9b
commit 16841d50c5
17 changed files with 282 additions and 66 deletions

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@ -121,7 +122,8 @@ protected void serviceStop() throws Exception {
public TaskTrackerInfo[] getActiveTrackers() throws IOException,
InterruptedException {
try {
return TypeConverter.fromYarnNodes(client.getNodeReports());
return TypeConverter.fromYarnNodes(
client.getNodeReports(NodeState.RUNNING));
} catch (YarnException e) {
throw new IOException(e);
}
@ -309,8 +311,9 @@ public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
}
@Override
public List<NodeReport> getNodeReports() throws YarnException, IOException {
return client.getNodeReports();
public List<NodeReport> getNodeReports(NodeState... states)
throws YarnException, IOException {
return client.getNodeReports(states);
}
@Override

View File

@ -213,6 +213,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-869. Move ResourceManagerAdministrationProtocol out of main YARN api.
(vinodkv via acmurthy)
YARN-791. Changed RM APIs and web-services related to nodes to ensure that
both are consistent with each other. (Sandy Ryza via vinodkv)
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -18,22 +18,34 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The request from clients to get a report of all nodes
* in the cluster from the <code>ResourceManager</code>.</p>
*
* <p>Currently, this is empty.</p>
* The request will ask for all nodes in the given {@link NodeState}s.
*
* @see ApplicationClientProtocol#getClusterNodes(GetClusterNodesRequest)
*/
@Public
@Stable
public abstract class GetClusterNodesRequest {
@Public
@Stable
public static GetClusterNodesRequest newInstance(EnumSet<NodeState> states) {
GetClusterNodesRequest request =
Records.newRecord(GetClusterNodesRequest.class);
request.setNodeStates(states);
return request;
}
@Public
@Stable
public static GetClusterNodesRequest newInstance() {
@ -41,4 +53,14 @@ public static GetClusterNodesRequest newInstance() {
Records.newRecord(GetClusterNodesRequest.class);
return request;
}
/**
* The state to filter the cluster nodes with.
*/
public abstract EnumSet<NodeState> getNodeStates();
/**
* The state to filter the cluster nodes with.
*/
public abstract void setNodeStates(EnumSet<NodeState> states);
}

View File

@ -131,6 +131,7 @@ message GetAllApplicationsResponseProto {
}
message GetClusterNodesRequestProto {
repeated NodeStateProto nodeStates = 1;
}
message GetClusterNodesResponseProto {

View File

@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -53,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -324,7 +326,8 @@ public boolean run() throws IOException, YarnException {
LOG.info("Got Cluster metric info from ASM"
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
List<NodeReport> clusterNodeReports = yarnClient.getNodeReports();
List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
NodeState.RUNNING);
LOG.info("Got Cluster node info from ASM");
for (NodeReport node : clusterNodeReports) {
LOG.info("Got node report from ASM for"

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token;
@ -190,14 +191,17 @@ public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
/**
* <p>
* Get a report of all nodes ({@link NodeReport}) in the cluster.
* Get a report of nodes ({@link NodeReport}) in the cluster.
* </p>
*
* @return A list of report of all nodes
* @param states The {@link NodeState}s to filter on. If no filter states are
* given, nodes in all states will be returned.
* @return A list of node reports
* @throws YarnException
* @throws IOException
*/
public abstract List<NodeReport> getNodeReports() throws YarnException, IOException;
public abstract List<NodeReport> getNodeReports(NodeState... states)
throws YarnException, IOException;
/**
* <p>

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
@ -51,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token;
@ -222,10 +224,15 @@ public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
}
@Override
public List<NodeReport> getNodeReports() throws YarnException,
public List<NodeReport> getNodeReports(NodeState... states) throws YarnException,
IOException {
GetClusterNodesRequest request =
Records.newRecord(GetClusterNodesRequest.class);
EnumSet<NodeState> statesSet = (states.length == 0) ?
EnumSet.allOf(NodeState.class) : EnumSet.noneOf(NodeState.class);
for (NodeState state : states) {
statesSet.add(state);
}
GetClusterNodesRequest request = GetClusterNodesRequest
.newInstance(statesSet);
GetClusterNodesResponse response = rmClient.getClusterNodes(request);
return response.getNodeReports();
}

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -56,7 +57,7 @@ public int run(String[] args) throws Exception {
Options opts = new Options();
opts.addOption(STATUS_CMD, true, "Prints the status report of the node.");
opts.addOption(LIST_CMD, false, "Lists all the nodes.");
opts.addOption(LIST_CMD, false, "Lists all the nodes in the RUNNING state.");
CommandLine cliParser = new GnuParser().parse(opts, args);
int exitCode = -1;
@ -92,7 +93,7 @@ private void printUsage(Options opts) {
*/
private void listClusterNodes() throws YarnException, IOException {
PrintWriter writer = new PrintWriter(sysout);
List<NodeReport> nodesReport = client.getNodeReports();
List<NodeReport> nodesReport = client.getNodeReports(NodeState.RUNNING);
writer.println("Total Nodes:" + nodesReport.size());
writer.printf(NODES_PATTERN, "Node-Id", "Node-State", "Node-Http-Address",
"Running-Containers");

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -101,7 +102,7 @@ public static void setup() throws Exception {
yarnClient.start();
// get node info
nodeReports = yarnClient.getNodeReports();
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
priority = Priority.newInstance(1);
capability = Resource.newInstance(1024, 1);

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -95,7 +96,7 @@ public void setup() throws YarnException, IOException {
assertEquals(STATE.STARTED, yarnClient.getServiceState());
// get node info
nodeReports = yarnClient.getNodeReports();
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
// submit new app
ApplicationSubmissionContext appContext =

View File

@ -152,12 +152,13 @@ public void testKillApplication() throws Exception {
@Test
public void testListClusterNodes() throws Exception {
NodeCLI cli = new NodeCLI();
when(client.getNodeReports()).thenReturn(getNodeReports(3));
when(client.getNodeReports(NodeState.RUNNING)).thenReturn(
getNodeReports(3));
cli.setClient(client);
cli.setSysOutPrintStream(sysOut);
int result = cli.run(new String[] { "-list" });
assertEquals(0, result);
verify(client).getNodeReports();
verify(client).getNodeReports(NodeState.RUNNING);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("Total Nodes:3");

View File

@ -20,8 +20,15 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProtoOrBuilder;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
@Private
@Unstable
@ -31,6 +38,8 @@ public class GetClusterNodesRequestPBImpl extends GetClusterNodesRequest {
GetClusterNodesRequestProto.Builder builder = null;
boolean viaProto = false;
private EnumSet<NodeState> states = null;
public GetClusterNodesRequestPBImpl() {
builder = GetClusterNodesRequestProto.newBuilder();
}
@ -41,11 +50,91 @@ public GetClusterNodesRequestPBImpl(GetClusterNodesRequestProto proto) {
}
public GetClusterNodesRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public EnumSet<NodeState> getNodeStates() {
initNodeStates();
return this.states;
}
@Override
public void setNodeStates(final EnumSet<NodeState> states) {
initNodeStates();
this.states.clear();
if (states == null) {
return;
}
this.states.addAll(states);
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetClusterNodesRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.states != null) {
maybeInitBuilder();
builder.clearNodeStates();
Iterable<NodeStateProto> iterable = new Iterable<NodeStateProto>() {
@Override
public Iterator<NodeStateProto> iterator() {
return new Iterator<NodeStateProto>() {
Iterator<NodeState> iter = states.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public NodeStateProto next() {
return ProtoUtils.convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllNodeStates(iterable);
}
}
private void initNodeStates() {
if (this.states != null) {
return;
}
GetClusterNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
List<NodeStateProto> list = p.getNodeStatesList();
this.states = EnumSet.noneOf(NodeState.class);
for (NodeStateProto c : list) {
this.states.add(ProtoUtils.convertFromProtoFormat(c));
}
}
@Override
public int hashCode() {
return getProto().hashCode();

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -419,7 +420,13 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException {
GetClusterNodesResponse response =
recordFactory.newRecordInstance(GetClusterNodesResponse.class);
Collection<RMNode> nodes = this.rmContext.getRMNodes().values();
EnumSet<NodeState> nodeStates = request.getNodeStates();
if (nodeStates == null || nodeStates.isEmpty()) {
nodeStates = EnumSet.allOf(NodeState.class);
}
Collection<RMNode> nodes = RMServerUtils.queryRMNodes(rmContext,
nodeStates);
List<NodeReport> nodeReports = new ArrayList<NodeReport>(nodes.size());
for (RMNode nodeInfo : nodes) {
nodeReports.add(createNodeReports(nodeInfo));

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
/**
* Utility methods to aid serving RM data through the REST and RPC APIs
*/
public class RMServerUtils {
public static List<RMNode> queryRMNodes(RMContext context,
EnumSet<NodeState> acceptedStates) {
// nodes contains nodes that are NEW, RUNNING OR UNHEALTHY
ArrayList<RMNode> results = new ArrayList<RMNode>();
if (acceptedStates.contains(NodeState.NEW) ||
acceptedStates.contains(NodeState.RUNNING) ||
acceptedStates.contains(NodeState.UNHEALTHY)) {
for (RMNode rmNode : context.getRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
}
// inactiveNodes contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED
if (acceptedStates.contains(NodeState.DECOMMISSIONED) ||
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
}
return results;
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.http.HttpServletRequest;
@ -39,6 +41,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -149,57 +152,42 @@ public SchedulerTypeInfo getSchedulerInfo() {
}
/**
* If no params are given, returns all active nodes, which includes
* nodes in the NEW and RUNNING states. If state param is "all", returns all
* nodes in all states. Otherwise, if the state param is set to a state name,
* returns all nodes that are in that state.
* Returns all nodes in the cluster. If the states param is given, returns
* all nodes that are in the comma-separated list of states.
*/
@GET
@Path("/nodes")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public NodesInfo getNodes(@QueryParam("state") String state) {
public NodesInfo getNodes(@QueryParam("states") String states) {
init();
ResourceScheduler sched = this.rm.getResourceScheduler();
if (sched == null) {
throw new NotFoundException("Null ResourceScheduler instance");
}
NodeState acceptedState = null;
boolean all = false;
if (state != null && !state.isEmpty()) {
if (state.equalsIgnoreCase("all")) {
all = true;
} else {
acceptedState = NodeState.valueOf(state.toUpperCase());
EnumSet<NodeState> acceptedStates;
if (states == null) {
acceptedStates = EnumSet.allOf(NodeState.class);
} else {
acceptedStates = EnumSet.noneOf(NodeState.class);
for (String stateStr : states.split(",")) {
acceptedStates.add(NodeState.valueOf(stateStr.toUpperCase()));
}
}
// getRMNodes() contains nodes that are NEW, RUNNING OR UNHEALTHY
NodesInfo allNodes = new NodesInfo();
for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) {
if (all || (acceptedState == null && ni.getState() != NodeState.UNHEALTHY)
|| acceptedState == ni.getState()) {
NodeInfo nodeInfo = new NodeInfo(ni, sched);
allNodes.add(nodeInfo);
Collection<RMNode> rmNodes = RMServerUtils.queryRMNodes(this.rm.getRMContext(),
acceptedStates);
NodesInfo nodesInfo = new NodesInfo();
for (RMNode rmNode : rmNodes) {
NodeInfo nodeInfo = new NodeInfo(rmNode, sched);
if (EnumSet.of(NodeState.LOST, NodeState.DECOMMISSIONED, NodeState.REBOOTED)
.contains(rmNode.getState())) {
nodeInfo.setNodeHTTPAddress(EMPTY);
}
nodesInfo.add(nodeInfo);
}
// getInactiveNodes() contains nodes that are DECOMMISSIONED, LOST, OR REBOOTED
if (all || (acceptedState != null &&
(acceptedState == NodeState.DECOMMISSIONED ||
acceptedState == NodeState.LOST ||
acceptedState == NodeState.REBOOTED))) {
for (RMNode ni : this.rm.getRMContext().getInactiveRMNodes().values()) {
if (all || acceptedState == ni.getState()) {
NodeInfo nodeInfo = new NodeInfo(ni, sched);
nodeInfo.setNodeHTTPAddress(EMPTY);
allNodes.add(nodeInfo);
}
}
}
return allNodes;
return nodesInfo;
}
@GET

View File

@ -26,6 +26,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
@ -116,9 +117,17 @@ protected ClientRMService createClientRMService() {
rm.start();
// Add a healthy node
MockNM node = rm.registerNode("host:1234", 1024);
MockNM node = rm.registerNode("host1:1234", 1024);
rm.sendNodeStarted(node);
node.nodeHeartbeat(true);
// Add and lose a node
MockNM lostNode = rm.registerNode("host2:1235", 1024);
rm.sendNodeStarted(lostNode);
lostNode.nodeHeartbeat(true);
rm.NMwaitForState(lostNode.getNodeId(), NodeState.RUNNING);
rm.sendNodeLost(lostNode);
// Create a client.
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
@ -130,7 +139,7 @@ protected ClientRMService createClientRMService() {
// Make call
GetClusterNodesRequest request =
Records.newRecord(GetClusterNodesRequest.class);
GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING));
List<NodeReport> nodeReports =
client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size());
@ -142,9 +151,21 @@ protected ClientRMService createClientRMService() {
// Call again
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals("Unhealthy nodes should not show up by default", 0,
nodeReports.size());
// Now query for UNHEALTHY nodes
request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY));
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size());
Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY,
nodeReports.get(0).getNodeState());
// Query all states should return all nodes
rm.registerNode("host3:1236", 1024);
request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(3, nodeReports.size());
}
@Test

View File

@ -24,6 +24,7 @@
import java.io.StringReader;
import java.util.ArrayList;
import java.util.EnumSet;
import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
@ -55,6 +56,7 @@
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import com.google.common.base.Joiner;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener;
@ -136,8 +138,6 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException,
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
// One unhealthy node which should not appear in the list after
// MAPREDUCE-3760.
MockNM nm3 = rm.registerNode("h3:1236", 5122);
rm.NMwaitForState(nm3.getNodeId(), NodeState.NEW);
rm.sendNodeStarted(nm3);
@ -160,8 +160,8 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException,
JSONObject nodes = json.getJSONObject("nodes");
assertEquals("incorrect number of elements", 1, nodes.length());
JSONArray nodeArray = nodes.getJSONArray("node");
// Just 2 nodes, leaving behind the unhealthy node.
assertEquals("incorrect number of elements", 2, nodeArray.length());
// 3 nodes, including the unhealthy node and the new node.
assertEquals("incorrect number of elements", 3, nodeArray.length());
}
@Test
@ -174,7 +174,7 @@ public void testNodesQueryNew() throws JSONException, Exception {
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", NodeState.NEW.toString())
.path("nodes").queryParam("states", NodeState.NEW.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
@ -197,7 +197,7 @@ public void testNodesQueryStateNone() throws JSONException, Exception {
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes")
.queryParam("state", NodeState.DECOMMISSIONED.toString())
.queryParam("states", NodeState.DECOMMISSIONED.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
@ -213,7 +213,7 @@ public void testNodesQueryStateInvalid() throws JSONException, Exception {
try {
r.path("ws").path("v1").path("cluster").path("nodes")
.queryParam("state", "BOGUSSTATE").accept(MediaType.APPLICATION_JSON)
.queryParam("states", "BOGUSSTATE").accept(MediaType.APPLICATION_JSON)
.get(JSONObject.class);
fail("should have thrown exception querying invalid state");
@ -257,7 +257,7 @@ public void testNodesQueryStateLost() throws JSONException, Exception {
rm.sendNodeLost(nm2);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", NodeState.LOST.toString())
.path("nodes").queryParam("states", NodeState.LOST.toString())
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
@ -316,7 +316,7 @@ public void testNodesQueryRunning() throws JSONException, Exception {
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", "running")
.path("nodes").queryParam("states", "running")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
@ -336,7 +336,7 @@ public void testNodesQueryHealthyFalse() throws JSONException, Exception {
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.NEW);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", "UNHEALTHY")
.path("nodes").queryParam("states", "UNHEALTHY")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
@ -349,6 +349,11 @@ public void testNodesHelper(String path, String media) throws JSONException,
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1235", 5121);
rm.sendNodeStarted(nm1);
rm.sendNodeStarted(nm2);
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path(path).accept(media).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
@ -622,7 +627,8 @@ public void testQueryAll() throws Exception {
rm.sendNodeLost(nm3);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("nodes").queryParam("state", "aLl")
.path("nodes")
.queryParam("states", Joiner.on(',').join(EnumSet.allOf(NodeState.class)))
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());