diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index aa7fc30344b..4aa3a14d980 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -98,11 +98,17 @@ public class RMAdminCLI extends HAAdmin { "Reload the queues' acls, states and scheduler specific " + "properties. \n\t\tResourceManager will reload the " + "mapred-queues configuration file.")) - .put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]", + .put("-refreshNodes", + new UsageInfo("[-g [timeout in seconds] -client|server]", "Refresh the hosts information at the ResourceManager. Here " - + "[-g [timeout in seconds] is optional, if we specify the " - + "timeout then ResourceManager will wait for timeout before " - + "marking the NodeManager as decommissioned.")) + + "[-g [timeout in seconds] -client|server] is optional, if we " + + "specify the timeout then ResourceManager will wait for " + + "timeout before marking the NodeManager as decommissioned." + + " The -client|server indicates if the timeout tracking should" + + " be handled by the client or the ResourceManager. The client" + + "-side tracking is blocking, while the server-side tracking" + + " is not. Omitting the timeout, or a timeout of -1, indicates" + + " an infinite timeout.")) .put("-refreshNodesResources", new UsageInfo("", "Refresh resources of NodeManagers at the ResourceManager.")) .put("-refreshSuperUserGroupsConfiguration", new UsageInfo("", @@ -230,7 +236,7 @@ public class RMAdminCLI extends HAAdmin { summary.append("The full syntax is: \n\n" + "yarn rmadmin" + " [-refreshQueues]" + - " [-refreshNodes [-g [timeout in seconds]]]" + + " [-refreshNodes [-g [timeout in seconds] -client|server]]" + " [-refreshNodesResources]" + " [-refreshSuperUserGroupsConfiguration]" + " [-refreshUserToGroupsMappings]" + @@ -312,7 +318,12 @@ public class RMAdminCLI extends HAAdmin { return 0; } - private int refreshNodes(long timeout) throws IOException, YarnException { + private int refreshNodes(long timeout, String trackingMode) + throws IOException, YarnException { + if (!"client".equals(trackingMode)) { + throw new UnsupportedOperationException( + "Only client tracking mode is currently supported."); + } // Graceful decommissioning with timeout ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest gracefulRequest = RefreshNodesRequest @@ -721,11 +732,18 @@ public class RMAdminCLI extends HAAdmin { } else if ("-refreshNodes".equals(cmd)) { if (args.length == 1) { exitCode = refreshNodes(); - } else if (args.length == 3) { + } else if (args.length == 3 || args.length == 4) { // if the graceful timeout specified if ("-g".equals(args[1])) { - long timeout = validateTimeout(args[2]); - exitCode = refreshNodes(timeout); + long timeout = -1; + String trackingMode; + if (args.length == 4) { + timeout = validateTimeout(args[2]); + trackingMode = validateTrackingMode(args[3]); + } else { + trackingMode = validateTrackingMode(args[2]); + } + exitCode = refreshNodes(timeout, trackingMode); } else { printUsage(cmd, isHAEnabled); return -1; @@ -838,6 +856,16 @@ public class RMAdminCLI extends HAAdmin { return timeout; } + private String validateTrackingMode(String mode) { + if ("-client".equals(mode)) { + return "client"; + } + if ("-server".equals(mode)) { + return "server"; + } + throw new IllegalArgumentException("Invalid mode specified: " + mode); + } + @Override public void setConf(Configuration conf) { if (conf != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index 15513338391..d3161ba9579 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -255,9 +256,9 @@ public class TestRMAdminCLI { } @Test - public void testRefreshNodesWithGracefulTimeout() throws Exception { + public void testRefreshNodesGracefulBeforeTimeout() throws Exception { // graceful decommission before timeout - String[] args = { "-refreshNodes", "-g", "1" }; + String[] args = {"-refreshNodes", "-g", "1", "-client"}; CheckForDecommissioningNodesResponse response = Records .newRecord(CheckForDecommissioningNodesResponse.class); HashSet decomNodes = new HashSet(); @@ -267,30 +268,91 @@ public class TestRMAdminCLI { assertEquals(0, rmAdminCLI.run(args)); verify(admin).refreshNodes( RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); + verify(admin, never()).refreshNodes( + RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); + } + @Test + public void testRefreshNodesGracefulHitTimeout() throws Exception { // Forceful decommission when timeout occurs - String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" }; - decomNodes = new HashSet(); + String[] forcefulDecomArgs = {"-refreshNodes", "-g", "1", "-client"}; + HashSet decomNodes = new HashSet(); + CheckForDecommissioningNodesResponse response = Records + .newRecord(CheckForDecommissioningNodesResponse.class); response.setDecommissioningNodes(decomNodes); decomNodes.add(NodeId.newInstance("node1", 100)); response.setDecommissioningNodes(decomNodes); when(admin.checkForDecommissioningNodes(any( CheckForDecommissioningNodesRequest.class))).thenReturn(response); - assertEquals(0, rmAdminCLI.run(focefulDecomArgs)); + assertEquals(0, rmAdminCLI.run(forcefulDecomArgs)); verify(admin).refreshNodes( RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); + } + @Test + public void testRefreshNodesGracefulInfiniteTimeout() throws Exception { + String[] infiniteTimeoutArgs = {"-refreshNodes", "-g", "-1", "-client"}; + testRefreshNodesGracefulInfiniteTimeout(infiniteTimeoutArgs); + } + + @Test + public void testRefreshNodesGracefulNoTimeout() throws Exception { + // no timeout (infinite timeout) + String[] noTimeoutArgs = {"-refreshNodes", "-g", "-client"}; + testRefreshNodesGracefulInfiniteTimeout(noTimeoutArgs); + } + + private void testRefreshNodesGracefulInfiniteTimeout(String[] args) + throws Exception { + when(admin.checkForDecommissioningNodes(any( + CheckForDecommissioningNodesRequest.class))).thenAnswer( + new Answer() { + private int count = 5; + @Override + public CheckForDecommissioningNodesResponse answer( + InvocationOnMock invocationOnMock) throws Throwable { + CheckForDecommissioningNodesResponse response = Records + .newRecord(CheckForDecommissioningNodesResponse.class); + HashSet decomNodes = new HashSet(); + count--; + if (count <= 0) { + response.setDecommissioningNodes(decomNodes); + return response; + } else { + decomNodes.add(NodeId.newInstance("node1", 100)); + response.setDecommissioningNodes(decomNodes); + return response; + } + } + }); + assertEquals(0, rmAdminCLI.run(args)); + verify(admin, atLeastOnce()).refreshNodes( + RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); + verify(admin, never()).refreshNodes( + RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); + } + + @Test + public void testRefreshNodesGracefulInvalidArgs() throws Exception { // invalid graceful timeout parameter - String[] invalidArgs = { "-refreshNodes", "-ginvalid", "invalid" }; + String[] invalidArgs = {"-refreshNodes", "-ginvalid", "invalid", "-client"}; assertEquals(-1, rmAdminCLI.run(invalidArgs)); // invalid timeout - String[] invalidTimeoutArgs = { "-refreshNodes", "-g", "invalid" }; + String[] invalidTimeoutArgs = {"-refreshNodes", "-g", "invalid", "-client"}; assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs)); // negative timeout - String[] negativeTimeoutArgs = { "-refreshNodes", "-g", "-1000" }; + String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"}; assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs)); + + // server tracking mode + String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"}; + assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs)); + + // invalid tracking mode + String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"}; + assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs)); } @Test(timeout=500) @@ -404,8 +466,8 @@ public class TestRMAdminCLI { .toString() .contains( "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " + - "seconds]]] [-refreshNodesResources] [-refreshSuperUserGroups" + - "Configuration] [-refreshUserToGroupsMappings] " + + "seconds] -client|server]] [-refreshNodesResources] [-refresh" + + "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " + "[username]] [-addToClusterNodeLabels " + "<\"label1(exclusive=true),label2(exclusive=false),label3\">] " + @@ -423,8 +485,8 @@ public class TestRMAdminCLI { assertTrue(dataOut .toString() .contains( - "-refreshNodes [-g [timeout in seconds]]: Refresh the hosts information at the " + - "ResourceManager.")); + "-refreshNodes [-g [timeout in seconds] -client|server]: " + + "Refresh the hosts information at the ResourceManager.")); assertTrue(dataOut .toString() .contains( @@ -456,7 +518,8 @@ public class TestRMAdminCLI { testError(new String[] { "-help", "-refreshQueues" }, "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0); testError(new String[] { "-help", "-refreshNodes" }, - "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", dataErr, 0); + "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " + + "-client|server]]", dataErr, 0); testError(new String[] { "-help", "-refreshNodesResources" }, "Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0); testError(new String[] { "-help", "-refreshUserToGroupsMappings" }, @@ -495,7 +558,8 @@ public class TestRMAdminCLI { assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); oldOutPrintStream.println(dataOut); String expectedHelpMsg = - "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] " + "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " + + "seconds] -client|server]] " + "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] " + "[-refreshUserToGroupsMappings] " + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"