YARN-5434. Add -client|server argument for graceful decommmission. Contributed by Robert Kanter.

(cherry picked from commit 95f2b98597)
This commit is contained in:
Junping Du 2016-07-29 10:26:11 -07:00
parent 0cff416c35
commit 47dd871f96
2 changed files with 115 additions and 23 deletions

View File

@ -98,11 +98,17 @@ public class RMAdminCLI extends HAAdmin {
"Reload the queues' acls, states and scheduler specific " + "Reload the queues' acls, states and scheduler specific " +
"properties. \n\t\tResourceManager will reload the " + "properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file.")) "mapred-queues configuration file."))
.put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]", .put("-refreshNodes",
new UsageInfo("[-g [timeout in seconds] -client|server]",
"Refresh the hosts information at the ResourceManager. Here " "Refresh the hosts information at the ResourceManager. Here "
+ "[-g [timeout in seconds] is optional, if we specify the " + "[-g [timeout in seconds] -client|server] is optional, if we "
+ "timeout then ResourceManager will wait for timeout before " + "specify the timeout then ResourceManager will wait for "
+ "marking the NodeManager as decommissioned.")) + "timeout before marking the NodeManager as decommissioned."
+ " The -client|server indicates if the timeout tracking should"
+ " be handled by the client or the ResourceManager. The client"
+ "-side tracking is blocking, while the server-side tracking"
+ " is not. Omitting the timeout, or a timeout of -1, indicates"
+ " an infinite timeout."))
.put("-refreshNodesResources", new UsageInfo("", .put("-refreshNodesResources", new UsageInfo("",
"Refresh resources of NodeManagers at the ResourceManager.")) "Refresh resources of NodeManagers at the ResourceManager."))
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("", .put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
@ -230,7 +236,7 @@ public class RMAdminCLI extends HAAdmin {
summary.append("The full syntax is: \n\n" + summary.append("The full syntax is: \n\n" +
"yarn rmadmin" + "yarn rmadmin" +
" [-refreshQueues]" + " [-refreshQueues]" +
" [-refreshNodes [-g [timeout in seconds]]]" + " [-refreshNodes [-g [timeout in seconds] -client|server]]" +
" [-refreshNodesResources]" + " [-refreshNodesResources]" +
" [-refreshSuperUserGroupsConfiguration]" + " [-refreshSuperUserGroupsConfiguration]" +
" [-refreshUserToGroupsMappings]" + " [-refreshUserToGroupsMappings]" +
@ -312,7 +318,12 @@ public class RMAdminCLI extends HAAdmin {
return 0; return 0;
} }
private int refreshNodes(long timeout) throws IOException, YarnException { private int refreshNodes(long timeout, String trackingMode)
throws IOException, YarnException {
if (!"client".equals(trackingMode)) {
throw new UnsupportedOperationException(
"Only client tracking mode is currently supported.");
}
// Graceful decommissioning with timeout // Graceful decommissioning with timeout
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesRequest gracefulRequest = RefreshNodesRequest RefreshNodesRequest gracefulRequest = RefreshNodesRequest
@ -721,11 +732,18 @@ public class RMAdminCLI extends HAAdmin {
} else if ("-refreshNodes".equals(cmd)) { } else if ("-refreshNodes".equals(cmd)) {
if (args.length == 1) { if (args.length == 1) {
exitCode = refreshNodes(); exitCode = refreshNodes();
} else if (args.length == 3) { } else if (args.length == 3 || args.length == 4) {
// if the graceful timeout specified // if the graceful timeout specified
if ("-g".equals(args[1])) { if ("-g".equals(args[1])) {
long timeout = validateTimeout(args[2]); long timeout = -1;
exitCode = refreshNodes(timeout); String trackingMode;
if (args.length == 4) {
timeout = validateTimeout(args[2]);
trackingMode = validateTrackingMode(args[3]);
} else {
trackingMode = validateTrackingMode(args[2]);
}
exitCode = refreshNodes(timeout, trackingMode);
} else { } else {
printUsage(cmd, isHAEnabled); printUsage(cmd, isHAEnabled);
return -1; return -1;
@ -838,6 +856,16 @@ public class RMAdminCLI extends HAAdmin {
return timeout; return timeout;
} }
private String validateTrackingMode(String mode) {
if ("-client".equals(mode)) {
return "client";
}
if ("-server".equals(mode)) {
return "server";
}
throw new IllegalArgumentException("Invalid mode specified: " + mode);
}
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
if (conf != null) { if (conf != null) {

View File

@ -26,6 +26,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -255,9 +256,9 @@ public class TestRMAdminCLI {
} }
@Test @Test
public void testRefreshNodesWithGracefulTimeout() throws Exception { public void testRefreshNodesGracefulBeforeTimeout() throws Exception {
// graceful decommission before timeout // graceful decommission before timeout
String[] args = { "-refreshNodes", "-g", "1" }; String[] args = {"-refreshNodes", "-g", "1", "-client"};
CheckForDecommissioningNodesResponse response = Records CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class); .newRecord(CheckForDecommissioningNodesResponse.class);
HashSet<NodeId> decomNodes = new HashSet<NodeId>(); HashSet<NodeId> decomNodes = new HashSet<NodeId>();
@ -267,30 +268,91 @@ public class TestRMAdminCLI {
assertEquals(0, rmAdminCLI.run(args)); assertEquals(0, rmAdminCLI.run(args));
verify(admin).refreshNodes( verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL)); RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@Test
public void testRefreshNodesGracefulHitTimeout() throws Exception {
// Forceful decommission when timeout occurs // Forceful decommission when timeout occurs
String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" }; String[] forcefulDecomArgs = {"-refreshNodes", "-g", "1", "-client"};
decomNodes = new HashSet<NodeId>(); HashSet<NodeId> decomNodes = new HashSet<NodeId>();
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
response.setDecommissioningNodes(decomNodes); response.setDecommissioningNodes(decomNodes);
decomNodes.add(NodeId.newInstance("node1", 100)); decomNodes.add(NodeId.newInstance("node1", 100));
response.setDecommissioningNodes(decomNodes); response.setDecommissioningNodes(decomNodes);
when(admin.checkForDecommissioningNodes(any( when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenReturn(response); CheckForDecommissioningNodesRequest.class))).thenReturn(response);
assertEquals(0, rmAdminCLI.run(focefulDecomArgs)); assertEquals(0, rmAdminCLI.run(forcefulDecomArgs));
verify(admin).refreshNodes( verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL)); RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@Test
public void testRefreshNodesGracefulInfiniteTimeout() throws Exception {
String[] infiniteTimeoutArgs = {"-refreshNodes", "-g", "-1", "-client"};
testRefreshNodesGracefulInfiniteTimeout(infiniteTimeoutArgs);
}
@Test
public void testRefreshNodesGracefulNoTimeout() throws Exception {
// no timeout (infinite timeout)
String[] noTimeoutArgs = {"-refreshNodes", "-g", "-client"};
testRefreshNodesGracefulInfiniteTimeout(noTimeoutArgs);
}
private void testRefreshNodesGracefulInfiniteTimeout(String[] args)
throws Exception {
when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenAnswer(
new Answer<CheckForDecommissioningNodesResponse>() {
private int count = 5;
@Override
public CheckForDecommissioningNodesResponse answer(
InvocationOnMock invocationOnMock) throws Throwable {
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
HashSet<NodeId> decomNodes = new HashSet<NodeId>();
count--;
if (count <= 0) {
response.setDecommissioningNodes(decomNodes);
return response;
} else {
decomNodes.add(NodeId.newInstance("node1", 100));
response.setDecommissioningNodes(decomNodes);
return response;
}
}
});
assertEquals(0, rmAdminCLI.run(args));
verify(admin, atLeastOnce()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@Test
public void testRefreshNodesGracefulInvalidArgs() throws Exception {
// invalid graceful timeout parameter // invalid graceful timeout parameter
String[] invalidArgs = { "-refreshNodes", "-ginvalid", "invalid" }; String[] invalidArgs = {"-refreshNodes", "-ginvalid", "invalid", "-client"};
assertEquals(-1, rmAdminCLI.run(invalidArgs)); assertEquals(-1, rmAdminCLI.run(invalidArgs));
// invalid timeout // invalid timeout
String[] invalidTimeoutArgs = { "-refreshNodes", "-g", "invalid" }; String[] invalidTimeoutArgs = {"-refreshNodes", "-g", "invalid", "-client"};
assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs)); assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs));
// negative timeout // negative timeout
String[] negativeTimeoutArgs = { "-refreshNodes", "-g", "-1000" }; String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs)); assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
// server tracking mode
String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
// invalid tracking mode
String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
} }
@Test(timeout=500) @Test(timeout=500)
@ -404,8 +466,8 @@ public class TestRMAdminCLI {
.toString() .toString()
.contains( .contains(
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " + "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
"seconds]]] [-refreshNodesResources] [-refreshSuperUserGroups" + "seconds] -client|server]] [-refreshNodesResources] [-refresh" +
"Configuration] [-refreshUserToGroupsMappings] " + "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
"[username]] [-addToClusterNodeLabels " + "[username]] [-addToClusterNodeLabels " +
"<\"label1(exclusive=true),label2(exclusive=false),label3\">] " + "<\"label1(exclusive=true),label2(exclusive=false),label3\">] " +
@ -423,8 +485,8 @@ public class TestRMAdminCLI {
assertTrue(dataOut assertTrue(dataOut
.toString() .toString()
.contains( .contains(
"-refreshNodes [-g [timeout in seconds]]: Refresh the hosts information at the " + "-refreshNodes [-g [timeout in seconds] -client|server]: " +
"ResourceManager.")); "Refresh the hosts information at the ResourceManager."));
assertTrue(dataOut assertTrue(dataOut
.toString() .toString()
.contains( .contains(
@ -456,7 +518,8 @@ public class TestRMAdminCLI {
testError(new String[] { "-help", "-refreshQueues" }, testError(new String[] { "-help", "-refreshQueues" },
"Usage: yarn rmadmin [-refreshQueues]", dataErr, 0); "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodes" }, testError(new String[] { "-help", "-refreshNodes" },
"Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", dataErr, 0); "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
"-client|server]]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodesResources" }, testError(new String[] { "-help", "-refreshNodesResources" },
"Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0); "Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
testError(new String[] { "-help", "-refreshUserToGroupsMappings" }, testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@ -495,7 +558,8 @@ public class TestRMAdminCLI {
assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
oldOutPrintStream.println(dataOut); oldOutPrintStream.println(dataOut);
String expectedHelpMsg = String expectedHelpMsg =
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] " "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
+ "seconds] -client|server]] "
+ "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] " + "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
+ "[-refreshUserToGroupsMappings] " + "[-refreshUserToGroupsMappings] "
+ "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"