applicationStates) throws YarnException,
+ IOException;
+
/**
*
* Get metrics ({@link YarnClusterMetrics}) about the cluster.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 99e896e5f57..c433b55b6ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -80,6 +81,9 @@ public class AMRMClientImpl extends AMRMClient {
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
+ protected final Set blacklistAdditions = new HashSet();
+ protected final Set blacklistRemovals = new HashSet();
+
class ResourceRequestInfo {
ResourceRequest remoteRequest;
LinkedHashSet containerRequests;
@@ -199,9 +203,11 @@ public class AMRMClientImpl extends AMRMClient {
Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
- ArrayList askList = null;
- ArrayList releaseList = null;
+ List askList = null;
+ List releaseList = null;
AllocateRequest allocateRequest = null;
+ List blacklistToAdd = new ArrayList();
+ List blacklistToRemove = new ArrayList();
try {
synchronized (this) {
@@ -217,9 +223,22 @@ public class AMRMClientImpl extends AMRMClient {
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
+
+ blacklistToAdd.addAll(blacklistAdditions);
+ blacklistToRemove.addAll(blacklistRemovals);
+
+ ResourceBlacklistRequest blacklistRequest =
+ (blacklistToAdd != null) || (blacklistToRemove != null) ?
+ ResourceBlacklistRequest.newInstance(blacklistToAdd,
+ blacklistToRemove) : null;
+
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
- askList, releaseList, null);
+ askList, releaseList, blacklistRequest);
+ // clear blacklistAdditions and blacklistRemovals before
+ // unsynchronized part
+ blacklistAdditions.clear();
+ blacklistRemovals.clear();
}
allocateResponse = rmClient.allocate(allocateRequest);
@@ -253,6 +272,9 @@ public class AMRMClientImpl extends AMRMClient {
ask.add(oldAsk);
}
}
+
+ blacklistAdditions.addAll(blacklistToAdd);
+ blacklistRemovals.addAll(blacklistToRemove);
}
}
}
@@ -604,4 +626,31 @@ public class AMRMClientImpl extends AMRMClient {
+ " #asks=" + ask.size());
}
}
+
+ @Override
+ public synchronized void updateBlacklist(List blacklistAdditions,
+ List blacklistRemovals) {
+
+ if (blacklistAdditions != null) {
+ this.blacklistAdditions.addAll(blacklistAdditions);
+ // if some resources are also in blacklistRemovals updated before, we
+ // should remove them here.
+ this.blacklistRemovals.removeAll(blacklistAdditions);
+ }
+
+ if (blacklistRemovals != null) {
+ this.blacklistRemovals.addAll(blacklistRemovals);
+ // if some resources are in blacklistAdditions before, we should remove
+ // them here.
+ this.blacklistAdditions.removeAll(blacklistRemovals);
+ }
+
+ if (blacklistAdditions != null && blacklistRemovals != null
+ && blacklistAdditions.removeAll(blacklistRemovals)) {
+ // we allow resources to appear in addition list and removal list in the
+ // same invocation of updateBlacklist(), but should get a warn here.
+ LOG.warn("The same resources appear in both blacklistAdditions and " +
+ "blacklistRemovals in updateBlacklist.");
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 4a1b83ca9e9..d35e1a4300d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -211,15 +211,29 @@ public class YarnClientImpl extends YarnClient {
@Override
public List getApplications() throws YarnException,
IOException {
- return getApplications(null);
+ return getApplications(null, null);
+ }
+
+ @Override
+ public List getApplications(Set applicationTypes)
+ throws YarnException,
+ IOException {
+ return getApplications(applicationTypes, null);
}
@Override
public List getApplications(
- Set applicationTypes) throws YarnException, IOException {
+ EnumSet applicationStates)
+ throws YarnException, IOException {
+ return getApplications(null, applicationStates);
+ }
+
+ @Override
+ public List getApplications(Set applicationTypes,
+ EnumSet applicationStates) throws YarnException,
+ IOException {
GetApplicationsRequest request =
- applicationTypes == null ? GetApplicationsRequest.newInstance()
- : GetApplicationsRequest.newInstance(applicationTypes);
+ GetApplicationsRequest.newInstance(applicationTypes, applicationStates);
GetApplicationsResponse response = rmClient.getApplications(request);
return response.getApplicationList();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index fa22b29ddb9..a7b7d654643 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.DecimalFormat;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -28,6 +29,7 @@ import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -48,6 +51,10 @@ public class ApplicationCLI extends YarnCLI {
System.getProperty("line.separator");
private static final String APP_TYPE_CMD = "appTypes";
+ private static final String APP_STATE_CMD ="appStates";
+ private static final String ALLSTATES_OPTION = "ALL";
+
+ private boolean allAppStates;
public static void main(String[] args) throws Exception {
ApplicationCLI cli = new ApplicationCLI();
@@ -64,21 +71,38 @@ public class ApplicationCLI extends YarnCLI {
Options opts = new Options();
opts.addOption(STATUS_CMD, true, "Prints the status of the application.");
opts.addOption(LIST_CMD, false, "List applications from the RM. " +
- "Supports optional use of --appTypes to filter applications " +
- "based on application type.");
+ "Supports optional use of -appTypes to filter applications " +
+ "based on application type, " +
+ "and -appStates to filter applications based on application state");
opts.addOption(KILL_CMD, true, "Kills the application.");
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
- Option appTypeOpt = new Option(APP_TYPE_CMD, true,
- "Works with --list to filter applications based on their type.");
+ Option appTypeOpt = new Option(APP_TYPE_CMD, true, "Works with -list to " +
+ "filter applications based on " +
+ "input comma-separated list of application types.");
appTypeOpt.setValueSeparator(',');
appTypeOpt.setArgs(Option.UNLIMITED_VALUES);
- appTypeOpt.setArgName("Comma-separated list of application types");
+ appTypeOpt.setArgName("Types");
opts.addOption(appTypeOpt);
+ Option appStateOpt = new Option(APP_STATE_CMD, true, "Works with -list " +
+ "to filter applications based on input comma-separated list of " +
+ "application states. " + getAllValidApplicationStates());
+ appStateOpt.setValueSeparator(',');
+ appStateOpt.setArgs(Option.UNLIMITED_VALUES);
+ appStateOpt.setArgName("States");
+ opts.addOption(appStateOpt);
opts.getOption(KILL_CMD).setArgName("Application ID");
opts.getOption(STATUS_CMD).setArgName("Application ID");
- CommandLine cliParser = new GnuParser().parse(opts, args);
int exitCode = -1;
+ CommandLine cliParser = null;
+ try {
+ cliParser = new GnuParser().parse(opts, args);
+ } catch (MissingArgumentException ex) {
+ sysout.println("Missing argument for options");
+ printUsage(opts);
+ return exitCode;
+ }
+
if (cliParser.hasOption(STATUS_CMD)) {
if (args.length != 2) {
printUsage(opts);
@@ -86,18 +110,44 @@ public class ApplicationCLI extends YarnCLI {
}
printApplicationReport(cliParser.getOptionValue(STATUS_CMD));
} else if (cliParser.hasOption(LIST_CMD)) {
+ allAppStates = false;
Set appTypes = new HashSet();
if(cliParser.hasOption(APP_TYPE_CMD)) {
String[] types = cliParser.getOptionValues(APP_TYPE_CMD);
if (types != null) {
for (String type : types) {
if (!type.trim().isEmpty()) {
- appTypes.add(type.trim());
+ appTypes.add(type.toUpperCase().trim());
}
}
}
}
- listApplications(appTypes);
+
+ EnumSet appStates =
+ EnumSet.noneOf(YarnApplicationState.class);
+ if (cliParser.hasOption(APP_STATE_CMD)) {
+ String[] states = cliParser.getOptionValues(APP_STATE_CMD);
+ if (states != null) {
+ for (String state : states) {
+ if (!state.trim().isEmpty()) {
+ if (state.trim().equalsIgnoreCase(ALLSTATES_OPTION)) {
+ allAppStates = true;
+ break;
+ }
+ try {
+ appStates.add(YarnApplicationState.valueOf(state.toUpperCase()
+ .trim()));
+ } catch (IllegalArgumentException ex) {
+ sysout.println("The application state " + state
+ + " is invalid.");
+ sysout.println(getAllValidApplicationStates());
+ return exitCode;
+ }
+ }
+ }
+ }
+ }
+ listApplications(appTypes, appStates);
} else if (cliParser.hasOption(KILL_CMD)) {
if (args.length != 2) {
printUsage(opts);
@@ -126,19 +176,35 @@ public class ApplicationCLI extends YarnCLI {
/**
* Lists the applications matching the given application Types
- * present in the Resource Manager
+ * And application States present in the Resource Manager
*
* @param appTypes
+ * @param appStates
* @throws YarnException
* @throws IOException
*/
- private void listApplications(Set appTypes)
- throws YarnException, IOException {
+ private void listApplications(Set appTypes,
+ EnumSet appStates) throws YarnException,
+ IOException {
PrintWriter writer = new PrintWriter(sysout);
- List appsReport =
- client.getApplications(appTypes);
+ if (allAppStates) {
+ for(YarnApplicationState appState : YarnApplicationState.values()) {
+ appStates.add(appState);
+ }
+ } else {
+ if (appStates.isEmpty()) {
+ appStates.add(YarnApplicationState.RUNNING);
+ appStates.add(YarnApplicationState.ACCEPTED);
+ appStates.add(YarnApplicationState.SUBMITTED);
+ }
+ }
- writer.println("Total Applications:" + appsReport.size());
+ List appsReport =
+ client.getApplications(appTypes, appStates);
+
+ writer
+ .println("Total number of applications (application-types: " + appTypes
+ + " and states: " + appStates + ")" + ":" + appsReport.size());
writer.printf(APPLICATIONS_PATTERN, "Application-Id",
"Application-Name","Application-Type", "User", "Queue",
"State", "Final-State","Progress", "Tracking-URL");
@@ -164,8 +230,15 @@ public class ApplicationCLI extends YarnCLI {
private void killApplication(String applicationId)
throws YarnException, IOException {
ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
- sysout.println("Killing application " + applicationId);
- client.killApplication(appId);
+ ApplicationReport appReport = client.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
+ || appReport.getYarnApplicationState() == YarnApplicationState.KILLED
+ || appReport.getYarnApplicationState() == YarnApplicationState.FAILED) {
+ sysout.println("Application " + applicationId + " has already finished ");
+ } else {
+ sysout.println("Killing application " + applicationId);
+ client.killApplication(appId);
+ }
}
/**
@@ -221,4 +294,16 @@ public class ApplicationCLI extends YarnCLI {
sysout.println(baos.toString("UTF-8"));
}
+ private String getAllValidApplicationStates() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("The valid application state can be"
+ + " one of the following: ");
+ sb.append(ALLSTATES_OPTION + ",");
+ for (YarnApplicationState appState : YarnApplicationState
+ .values()) {
+ sb.append(appState+",");
+ }
+ String output = sb.toString();
+ return output.substring(0, output.length()-1);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
index 16e80dd93d7..f77c56f927a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
@@ -21,11 +21,15 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -40,9 +44,12 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
@Private
@Unstable
public class NodeCLI extends YarnCLI {
- private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%18s" +
+ private static final String NODES_PATTERN = "%16s\t%15s\t%17s\t%28s" +
System.getProperty("line.separator");
+ private static final String NODE_STATE_CMD = "states";
+ private static final String NODE_ALL = "all";
+
public static void main(String[] args) throws Exception {
NodeCLI cli = new NodeCLI();
cli.setSysOutPrintStream(System.out);
@@ -57,10 +64,30 @@ public class NodeCLI extends YarnCLI {
Options opts = new Options();
opts.addOption(STATUS_CMD, true, "Prints the status report of the node.");
- opts.addOption(LIST_CMD, false, "Lists all the nodes in the RUNNING state.");
- CommandLine cliParser = new GnuParser().parse(opts, args);
+ opts.addOption(LIST_CMD, false, "List all running nodes. " +
+ "Supports optional use of -states to filter nodes " +
+ "based on node state, all -all to list all nodes.");
+ Option nodeStateOpt = new Option(NODE_STATE_CMD, true,
+ "Works with -list to filter nodes based on input comma-separated list of node states.");
+ nodeStateOpt.setValueSeparator(',');
+ nodeStateOpt.setArgs(Option.UNLIMITED_VALUES);
+ nodeStateOpt.setArgName("States");
+ opts.addOption(nodeStateOpt);
+ Option allOpt = new Option(NODE_ALL, false,
+ "Works with -list to list all nodes.");
+ opts.addOption(allOpt);
+ opts.getOption(STATUS_CMD).setArgName("NodeId");
int exitCode = -1;
+ CommandLine cliParser = null;
+ try {
+ cliParser = new GnuParser().parse(opts, args);
+ } catch (MissingArgumentException ex) {
+ sysout.println("Missing argument for options");
+ printUsage(opts);
+ return exitCode;
+ }
+
if (cliParser.hasOption("status")) {
if (args.length != 2) {
printUsage(opts);
@@ -68,7 +95,24 @@ public class NodeCLI extends YarnCLI {
}
printNodeStatus(cliParser.getOptionValue("status"));
} else if (cliParser.hasOption("list")) {
- listClusterNodes();
+ Set nodeStates = new HashSet();
+ if (cliParser.hasOption(NODE_ALL)) {
+ for (NodeState state : NodeState.values()) {
+ nodeStates.add(state);
+ }
+ } else if (cliParser.hasOption(NODE_STATE_CMD)) {
+ String[] types = cliParser.getOptionValues(NODE_STATE_CMD);
+ if (types != null) {
+ for (String type : types) {
+ if (!type.trim().isEmpty()) {
+ nodeStates.add(NodeState.valueOf(type.trim().toUpperCase()));
+ }
+ }
+ }
+ } else {
+ nodeStates.add(NodeState.RUNNING);
+ }
+ listClusterNodes(nodeStates);
} else {
syserr.println("Invalid Command Usage : ");
printUsage(opts);
@@ -86,17 +130,20 @@ public class NodeCLI extends YarnCLI {
}
/**
- * Lists all the nodes present in the cluster
+ * Lists the nodes matching the given node states
*
+ * @param nodeStates
* @throws YarnException
* @throws IOException
*/
- private void listClusterNodes() throws YarnException, IOException {
+ private void listClusterNodes(Set nodeStates)
+ throws YarnException, IOException {
PrintWriter writer = new PrintWriter(sysout);
- List nodesReport = client.getNodeReports(NodeState.RUNNING);
+ List nodesReport = client.getNodeReports(
+ nodeStates.toArray(new NodeState[0]));
writer.println("Total Nodes:" + nodesReport.size());
writer.printf(NODES_PATTERN, "Node-Id", "Node-State", "Node-Http-Address",
- "Running-Containers");
+ "Number-of-Running-Containers");
for (NodeReport nodeReport : nodesReport) {
writer.printf(NODES_PATTERN, nodeReport.getNodeId(), nodeReport
.getNodeState(), nodeReport.getHttpAddress(), nodeReport
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index f4a6dd3f3c8..f24a2cd88fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.client.api.impl;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -97,6 +100,7 @@ public class TestAMRMClient {
static String rack;
static String[] nodes;
static String[] racks;
+ private final static int DEFAULT_ITERATION = 3;
@BeforeClass
public static void setup() throws Exception {
@@ -476,6 +480,144 @@ public class TestAMRMClient {
}
}
}
+
+ @Test (timeout=60000)
+ public void testAllocationWithBlacklist() throws YarnException, IOException {
+ AMRMClientImpl amClient = null;
+ try {
+ // start am rm client
+ amClient =
+ (AMRMClientImpl) AMRMClient
+ . createAMRMClient();
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ ContainerRequest storedContainer1 =
+ new ContainerRequest(capability, nodes, racks, priority);
+ amClient.addContainerRequest(storedContainer1);
+ assertTrue(amClient.ask.size() == 3);
+ assertTrue(amClient.release.size() == 0);
+
+ List