New parameter of CLI for decommissioning node gracefully in RMAdmin CLI. Contributed by Devaraj K

This commit is contained in:
Junping Du 2015-04-22 10:07:20 -07:00
parent b08908ae5e
commit fad9d7e85b
24 changed files with 761 additions and 39 deletions

View File

@ -93,6 +93,9 @@ Release 2.8.0 - UNRELEASED
YARN-3410. YARN admin should be able to remove individual application
records from RMStateStore. (Rohith Sharmaks via wangda)
YARN-3225. New parameter of CLI for decommissioning node gracefully in
RMAdmin CLI. (Devaraj K via junping_du)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
public enum DecommissionType {
/** Decomissioning nodes in normal way **/
NORMAL,
/** Graceful decommissioning of nodes **/
GRACEFUL,
/** Forceful decommissioning of nodes which are already in progress **/
FORCEFUL
}

View File

@ -43,7 +43,10 @@ public enum NodeState {
LOST,
/** Node has rebooted */
REBOOTED;
REBOOTED,
/** Node decommission is in progress */
DECOMMISSIONING;
public boolean isUnusable() {
return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -142,4 +144,11 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
@Idempotent
public UpdateNodeLabelsResponse updateNodeLabels(
UpdateNodeLabelsRequest request) throws YarnException, IOException;
@Public
@Evolving
@Idempotent
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException;
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class CheckForDecommissioningNodesRequest {
@Private
@Unstable
public static CheckForDecommissioningNodesRequest newInstance() {
CheckForDecommissioningNodesRequest request = Records
.newRecord(CheckForDecommissioningNodesRequest.class);
return request;
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class CheckForDecommissioningNodesResponse {
@Private
@Unstable
public static CheckForDecommissioningNodesResponse newInstance(
Set<NodeId> decommissioningNodes) {
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
response.setDecommissioningNodes(decommissioningNodes);
return response;
}
public abstract void setDecommissioningNodes(Set<NodeId> decommissioningNodes);
public abstract Set<NodeId> getDecommissioningNodes();
}

View File

@ -19,17 +19,41 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.util.Records;
@Private
@Stable
@Unstable
public abstract class RefreshNodesRequest {
@Public
@Private
@Stable
public static RefreshNodesRequest newInstance() {
RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
return request;
}
@Private
@Unstable
public static RefreshNodesRequest newInstance(
DecommissionType decommissionType) {
RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
request.setDecommissionType(decommissionType);
return request;
}
/**
* Set the DecommissionType
*
* @param decommissionType
*/
public abstract void setDecommissionType(DecommissionType decommissionType);
/**
* Get the DecommissionType
*
* @return decommissionType
*/
public abstract DecommissionType getDecommissionType();
}

View File

@ -43,4 +43,5 @@ service ResourceManagerAdministrationProtocolService {
rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto);
rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto);
rpc updateNodeLabels(UpdateNodeLabelsRequestProto) returns (UpdateNodeLabelsResponseProto);
rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto);
}

View File

@ -36,6 +36,7 @@ message RefreshQueuesResponseProto {
}
message RefreshNodesRequestProto {
optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
}
message RefreshNodesResponseProto {
}
@ -105,6 +106,17 @@ message UpdateNodeLabelsRequestProto {
message UpdateNodeLabelsResponseProto {
}
message CheckForDecommissioningNodesRequestProto {
}
message CheckForDecommissioningNodesResponseProto {
repeated NodeIdProto decommissioningNodes = 1;
}
enum DecommissionTypeProto {
NORMAL = 1;
GRACEFUL = 2;
FORCEFUL = 3;
}
//////////////////////////////////////////////////////////////////
///////////// RM Failover related records ////////////////////////
//////////////////////////////////////////////////////////////////

View File

@ -226,6 +226,7 @@ enum NodeStateProto {
NS_DECOMMISSIONED = 4;
NS_LOST = 5;
NS_REBOOTED = 6;
NS_DECOMMISSIONING = 7;
}
message NodeIdProto {

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
@ -50,6 +51,8 @@
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
@ -75,6 +78,8 @@ public class RMAdminCLI extends HAAdmin {
"No cluster node-labels are specified";
private static final String NO_MAPPING_ERR_MSG =
"No node-to-labels mappings are specified";
private static final String INVALID_TIMEOUT_ERR_MSG =
"Invalid timeout specified : ";
protected final static Map<String, UsageInfo> ADMIN_USAGE =
ImmutableMap.<String, UsageInfo>builder()
@ -82,8 +87,11 @@ public class RMAdminCLI extends HAAdmin {
"Reload the queues' acls, states and scheduler specific " +
"properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file."))
.put("-refreshNodes", new UsageInfo("",
"Refresh the hosts information at the ResourceManager."))
.put("-refreshNodes", new UsageInfo("[-g [timeout in seconds]]",
"Refresh the hosts information at the ResourceManager. Here "
+ "[-g [timeout in seconds] is optional, if we specify the "
+ "timeout then ResourceManager will wait for timeout before "
+ "marking the NodeManager as decommissioned."))
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
"Refresh superuser proxy groups mappings"))
.put("-refreshUserToGroupsMappings", new UsageInfo("",
@ -202,7 +210,7 @@ private static void printHelp(String cmd, boolean isHAEnabled) {
summary.append("The full syntax is: \n\n" +
"yarn rmadmin" +
" [-refreshQueues]" +
" [-refreshNodes]" +
" [-refreshNodes [-g [timeout in seconds]]]" +
" [-refreshSuperUserGroupsConfiguration]" +
" [-refreshUserToGroupsMappings]" +
" [-refreshAdminAcls]" +
@ -275,12 +283,60 @@ private int refreshQueues() throws IOException, YarnException {
private int refreshNodes() throws IOException, YarnException {
// Refresh the nodes
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesRequest request =
recordFactory.newRecordInstance(RefreshNodesRequest.class);
RefreshNodesRequest request = RefreshNodesRequest
.newInstance(DecommissionType.NORMAL);
adminProtocol.refreshNodes(request);
return 0;
}
private int refreshNodes(long timeout) throws IOException, YarnException {
// Graceful decommissioning with timeout
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshNodesRequest gracefulRequest = RefreshNodesRequest
.newInstance(DecommissionType.GRACEFUL);
adminProtocol.refreshNodes(gracefulRequest);
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory
.newRecordInstance(CheckForDecommissioningNodesRequest.class);
long waitingTime;
boolean nodesDecommissioning = true;
// timeout=-1 means wait for all the nodes to be gracefully
// decommissioned
for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) {
// wait for one second to check nodes decommissioning status
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore the InterruptedException
}
CheckForDecommissioningNodesResponse checkForDecommissioningNodes = adminProtocol
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
Set<NodeId> decommissioningNodes = checkForDecommissioningNodes
.getDecommissioningNodes();
if (decommissioningNodes.isEmpty()) {
nodesDecommissioning = false;
break;
} else {
StringBuilder nodes = new StringBuilder();
for (NodeId nodeId : decommissioningNodes) {
nodes.append(nodeId).append(",");
}
nodes.deleteCharAt(nodes.length() - 1);
System.out.println("Nodes '" + nodes + "' are still decommissioning.");
}
}
if (nodesDecommissioning) {
System.out.println("Graceful decommissioning not completed in " + timeout
+ " seconds, issueing forceful decommissioning command.");
RefreshNodesRequest forcefulRequest = RefreshNodesRequest
.newInstance(DecommissionType.FORCEFUL);
adminProtocol.refreshNodes(forcefulRequest);
} else {
System.out.println("Graceful decommissioning completed in " + waitingTime
+ " seconds.");
}
return 0;
}
private int refreshUserToGroupsMappings() throws IOException,
YarnException {
// Refresh the user-to-groups mappings
@ -518,7 +574,7 @@ public int run(String[] args) throws Exception {
// verify that we have enough command line parameters
//
if ("-refreshAdminAcls".equals(cmd) || "-refreshQueues".equals(cmd) ||
"-refreshNodes".equals(cmd) || "-refreshServiceAcl".equals(cmd) ||
"-refreshServiceAcl".equals(cmd) ||
"-refreshUserToGroupsMappings".equals(cmd) ||
"-refreshSuperUserGroupsConfiguration".equals(cmd)) {
if (args.length != 1) {
@ -531,7 +587,21 @@ public int run(String[] args) throws Exception {
if ("-refreshQueues".equals(cmd)) {
exitCode = refreshQueues();
} else if ("-refreshNodes".equals(cmd)) {
exitCode = refreshNodes();
if (args.length == 1) {
exitCode = refreshNodes();
} else if (args.length == 3) {
// if the graceful timeout specified
if ("-g".equals(args[1])) {
long timeout = validateTimeout(args[2]);
exitCode = refreshNodes(timeout);
} else {
printUsage(cmd, isHAEnabled);
return -1;
}
} else {
printUsage(cmd, isHAEnabled);
return -1;
}
} else if ("-refreshUserToGroupsMappings".equals(cmd)) {
exitCode = refreshUserToGroupsMappings();
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
@ -577,7 +647,7 @@ public int run(String[] args) throws Exception {
} catch (RemoteException e) {
//
// This is a error returned by hadoop server. Print
// out the first line of the error mesage, ignore the stack trace.
// out the first line of the error message, ignore the stack trace.
exitCode = -1;
try {
String[] content;
@ -599,6 +669,19 @@ public int run(String[] args) throws Exception {
return exitCode;
}
private long validateTimeout(String strTimeout) {
long timeout;
try {
timeout = Long.parseLong(strTimeout);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout);
}
if (timeout < -1) {
throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + timeout);
}
return timeout;
}
@Override
public void setConf(Configuration conf) {
if (conf != null) {

View File

@ -24,6 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -119,8 +120,8 @@ public void testRefreshQueues() throws Exception {
@Test
public void testRefreshNodes() throws Exception {
resourceManager.getClientRMService();
RefreshNodesRequest request = recordFactory
.newRecordInstance(RefreshNodesRequest.class);
RefreshNodesRequest request = RefreshNodesRequest
.newInstance(DecommissionType.NORMAL);
RefreshNodesResponse response = client.refreshNodes(request);
assertNotNull(response);
}

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.client.cli;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
@ -31,14 +33,15 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
@ -46,12 +49,15 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
@ -175,7 +181,47 @@ public void testRefreshNodes() throws Exception {
assertEquals(0, rmAdminCLI.run(args));
verify(admin).refreshNodes(any(RefreshNodesRequest.class));
}
@Test
public void testRefreshNodesWithGracefulTimeout() throws Exception {
// graceful decommission before timeout
String[] args = { "-refreshNodes", "-g", "1" };
CheckForDecommissioningNodesResponse response = Records
.newRecord(CheckForDecommissioningNodesResponse.class);
HashSet<NodeId> decomNodes = new HashSet<NodeId>();
response.setDecommissioningNodes(decomNodes);
when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenReturn(response);
assertEquals(0, rmAdminCLI.run(args));
// verify(admin).refreshNodes(any(RefreshNodesRequest.class));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
// Forceful decommission when timeout occurs
String[] focefulDecomArgs = { "-refreshNodes", "-g", "1" };
decomNodes = new HashSet<NodeId>();
response.setDecommissioningNodes(decomNodes);
decomNodes.add(NodeId.newInstance("node1", 100));
response.setDecommissioningNodes(decomNodes);
when(admin.checkForDecommissioningNodes(any(
CheckForDecommissioningNodesRequest.class))).thenReturn(response);
assertEquals(0, rmAdminCLI.run(focefulDecomArgs));
verify(admin).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
// invalid graceful timeout parameter
String[] invalidArgs = { "-refreshNodes", "-ginvalid", "invalid" };
assertEquals(-1, rmAdminCLI.run(invalidArgs));
// invalid timeout
String[] invalidTimeoutArgs = { "-refreshNodes", "-g", "invalid" };
assertEquals(-1, rmAdminCLI.run(invalidTimeoutArgs));
// negative timeout
String[] negativeTimeoutArgs = { "-refreshNodes", "-g", "-1000" };
assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
}
@Test(timeout=500)
public void testGetGroups() throws Exception {
when(admin.getGroupsForUser(eq("admin"))).thenReturn(
@ -284,7 +330,7 @@ public void testHelp() throws Exception {
assertTrue(dataOut
.toString()
.contains(
"yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] [-refreshSuper" +
"UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
" [username]] [[-addToClusterNodeLabels [label1,label2,label3]]" +
@ -299,7 +345,7 @@ public void testHelp() throws Exception {
assertTrue(dataOut
.toString()
.contains(
"-refreshNodes: Refresh the hosts information at the " +
"-refreshNodes [-g [timeout in seconds]]: Refresh the hosts information at the " +
"ResourceManager."));
assertTrue(dataOut.toString().contains(
"-refreshUserToGroupsMappings: Refresh user-to-groups mappings"));
@ -327,7 +373,7 @@ public void testHelp() throws Exception {
testError(new String[] { "-help", "-refreshQueues" },
"Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodes" },
"Usage: yarn rmadmin [-refreshNodes]", dataErr, 0);
"Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds]]]", dataErr, 0);
testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
"Usage: yarn rmadmin [-refreshUserToGroupsMappings]", dataErr, 0);
testError(
@ -364,7 +410,7 @@ public void testHelp() throws Exception {
assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
oldOutPrintStream.println(dataOut);
String expectedHelpMsg =
"yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper"
"yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in seconds]]] [-refreshSuper"
+ "UserGroupsConfiguration] [-refreshUserToGroupsMappings] "
+ "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"
+ " [username]] [[-addToClusterNodeLabels [label1,label2,label3]]"

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
@ -46,6 +47,8 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -68,6 +71,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
@ -282,4 +287,20 @@ public UpdateNodeLabelsResponse updateNodeLabels(
return null;
}
}
}
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException {
CheckForDecommissioningNodesRequestProto requestProto =
((CheckForDecommissioningNodesRequestPBImpl) checkForDecommissioningNodesRequest)
.getProto();
try {
return new CheckForDecommissioningNodesResponsePBImpl(
proxy.checkForDecommissioningNodes(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
@ -49,6 +51,8 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
@ -62,6 +66,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
@ -291,4 +297,21 @@ public UpdateNodeLabelsResponseProto updateNodeLabels(
throw new ServiceException(e);
}
}
@Override
public CheckForDecommissioningNodesResponseProto checkForDecommissioningNodes(
RpcController controller, CheckForDecommissioningNodesRequestProto proto)
throws ServiceException {
CheckForDecommissioningNodesRequest request = new CheckForDecommissioningNodesRequestPBImpl(
proto);
try {
CheckForDecommissioningNodesResponse response = real
.checkForDecommissioningNodes(request);
return ((CheckForDecommissioningNodesResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class CheckForDecommissioningNodesRequestPBImpl extends
CheckForDecommissioningNodesRequest {
CheckForDecommissioningNodesRequestProto proto = CheckForDecommissioningNodesRequestProto
.getDefaultInstance();
CheckForDecommissioningNodesRequestProto.Builder builder = null;
boolean viaProto = false;
public CheckForDecommissioningNodesRequestPBImpl() {
builder = CheckForDecommissioningNodesRequestProto.newBuilder();
}
public CheckForDecommissioningNodesRequestPBImpl(
CheckForDecommissioningNodesRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public CheckForDecommissioningNodesRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class CheckForDecommissioningNodesResponsePBImpl extends
CheckForDecommissioningNodesResponse {
CheckForDecommissioningNodesResponseProto proto = CheckForDecommissioningNodesResponseProto
.getDefaultInstance();
CheckForDecommissioningNodesResponseProto.Builder builder = null;
boolean viaProto = false;
private Set<NodeId> decommissioningNodes;
public CheckForDecommissioningNodesResponsePBImpl() {
builder = CheckForDecommissioningNodesResponseProto.newBuilder();
}
public CheckForDecommissioningNodesResponsePBImpl(
CheckForDecommissioningNodesResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public CheckForDecommissioningNodesResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = CheckForDecommissioningNodesResponseProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.decommissioningNodes != null) {
addDecommissioningNodesToProto();
}
}
private void addDecommissioningNodesToProto() {
maybeInitBuilder();
builder.clearDecommissioningNodes();
if (this.decommissioningNodes == null)
return;
Set<NodeIdProto> nodeIdProtos = new HashSet<NodeIdProto>();
for (NodeId nodeId : decommissioningNodes) {
nodeIdProtos.add(convertToProtoFormat(nodeId));
}
builder.addAllDecommissioningNodes(nodeIdProtos);
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl) nodeId).getProto();
}
@Override
public void setDecommissioningNodes(Set<NodeId> decommissioningNodes) {
maybeInitBuilder();
if (decommissioningNodes == null)
builder.clearDecommissioningNodes();
this.decommissioningNodes = decommissioningNodes;
}
@Override
public Set<NodeId> getDecommissioningNodes() {
initNodesDecommissioning();
return this.decommissioningNodes;
}
private void initNodesDecommissioning() {
if (this.decommissioningNodes != null) {
return;
}
CheckForDecommissioningNodesResponseProtoOrBuilder p = viaProto ? proto
: builder;
List<NodeIdProto> nodeIds = p.getDecommissioningNodesList();
this.decommissioningNodes = new HashSet<NodeId>();
for (NodeIdProto nodeIdProto : nodeIds) {
this.decommissioningNodes.add(convertFromProtoFormat(nodeIdProto));
}
}
private NodeId convertFromProtoFormat(NodeIdProto nodeIdProto) {
return new NodeIdPBImpl(nodeIdProto);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -20,7 +20,10 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DecommissionTypeProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import com.google.protobuf.TextFormat;
@ -32,7 +35,8 @@ public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance();
RefreshNodesRequestProto.Builder builder = null;
boolean viaProto = false;
private DecommissionType decommissionType;
public RefreshNodesRequestPBImpl() {
builder = RefreshNodesRequestProto.newBuilder();
}
@ -42,12 +46,34 @@ public RefreshNodesRequestPBImpl(RefreshNodesRequestProto proto) {
viaProto = true;
}
public RefreshNodesRequestProto getProto() {
public synchronized RefreshNodesRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.decommissionType != null) {
builder.setDecommissionType(convertToProtoFormat(this.decommissionType));
}
}
private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RefreshNodesRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int hashCode() {
return getProto().hashCode();
@ -67,4 +93,26 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public synchronized void setDecommissionType(
DecommissionType decommissionType) {
maybeInitBuilder();
this.decommissionType = decommissionType;
mergeLocalToBuilder();
}
@Override
public synchronized DecommissionType getDecommissionType() {
RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
return convertFromProtoFormat(p.getDecommissionType());
}
private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
return DecommissionType.valueOf(p.name());
}
private DecommissionTypeProto convertToProtoFormat(DecommissionType t) {
return DecommissionTypeProto.valueOf(t.name());
}
}

View File

@ -215,6 +215,8 @@
import org.apache.hadoop.yarn.proto.YarnProtos.YarnClusterMetricsProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
@ -291,6 +293,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
@ -1301,4 +1305,16 @@ public void testUpdateNodeLabelsResponsePBImpl() throws Exception {
validatePBImplRecord(UpdateNodeLabelsResponsePBImpl.class,
UpdateNodeLabelsResponseProto.class);
}
}
@Test
public void testCheckForDecommissioningNodesRequestPBImpl() throws Exception {
validatePBImplRecord(CheckForDecommissioningNodesRequestPBImpl.class,
CheckForDecommissioningNodesRequestProto.class);
}
@Test
public void testCheckForDecommissioningNodesResponsePBImpl() throws Exception {
validatePBImplRecord(CheckForDecommissioningNodesResponsePBImpl.class,
CheckForDecommissioningNodesResponseProto.class);
}
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.HAUtil;
@ -61,6 +62,8 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -383,7 +386,17 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getNodesListManager().refreshNodes(conf);
switch (request.getDecommissionType()) {
case NORMAL:
rmContext.getNodesListManager().refreshNodes(conf);
break;
case GRACEFUL:
rmContext.getNodesListManager().refreshNodesGracefully(conf);
break;
case FORCEFUL:
rmContext.getNodesListManager().refreshNodesForcefully();
break;
}
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return recordFactory.newRecordInstance(RefreshNodesResponse.class);
@ -576,7 +589,7 @@ private synchronized Configuration getConfiguration(Configuration conf,
private void refreshAll() throws ServiceFailedException {
try {
refreshQueues(RefreshQueuesRequest.newInstance());
refreshNodes(RefreshNodesRequest.newInstance());
refreshNodes(RefreshNodesRequest.newInstance(DecommissionType.NORMAL));
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
refreshUserToGroupsMappings(
@ -704,4 +717,23 @@ private YarnException logAndWrapException(Exception exception, String user,
"AdminService", "Exception " + msg);
return RPCUtil.getRemoteException(exception);
}
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws IOException, YarnException {
String argName = "checkForDecommissioningNodes";
final String msg = "check for decommissioning nodes.";
UserGroupInformation user = checkAcls("checkForDecommissioningNodes");
checkRMStatus(user.getShortUserName(), argName, msg);
Set<NodeId> decommissioningNodes = rmContext.getNodesListManager()
.checkForDecommissioningNodes();
RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService");
CheckForDecommissioningNodesResponse response = recordFactory
.newRecordInstance(CheckForDecommissioningNodesResponse.class);
response.setDecommissioningNodes(decommissioningNodes);
return response;
}
}

View File

@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -31,6 +33,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -107,6 +110,18 @@ private void printConfiguredHosts() {
public void refreshNodes(Configuration yarnConf) throws IOException,
YarnException {
refreshHostsReader(yarnConf);
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
}
}
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
YarnException {
synchronized (hostsReader) {
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
@ -126,13 +141,6 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
.getConfigurationInputStream(this.conf, excludesFile));
printConfiguredHosts();
}
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
}
}
}
private void setDecomissionedNMsMetrics() {
@ -236,4 +244,57 @@ private HostsFileReader createHostsFileReader(String includesFile,
.getConfigurationInputStream(this.conf, excludesFile));
return hostsReader;
}
}
/**
* Refresh the nodes gracefully
*
* @param conf
* @throws IOException
* @throws YarnException
*/
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
for (Entry<NodeId, RMNode> entry:rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION_WITH_TIMEOUT));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING
|| entry.getValue().getState() == NodeState.DECOMMISSIONED) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION));
}
}
}
}
/**
* It checks for any nodes in decommissioning state
*
* @return decommissioning nodes
*/
public Set<NodeId> checkForDecommissioningNodes() {
Set<NodeId> decommissioningNodes = new HashSet<NodeId>();
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
decommissioningNodes.add(entry.getKey());
}
}
return decommissioningNodes;
}
/**
* Forcefully decommission the nodes if they are in DECOMMISSIONING state
*/
public void refreshNodesForcefully() {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
}
}
}
}

View File

@ -24,6 +24,8 @@ public enum RMNodeEventType {
// Source: AdminService
DECOMMISSION,
DECOMMISSION_WITH_TIMEOUT,
RECOMMISSION,
// Source: AdminService, ResourceTrackerService
RESOURCE_UPDATE,

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -459,7 +460,8 @@ public void testRefreshNodesWithLocalConfigurationProvider() {
rm.start();
try {
rm.adminService.refreshNodes(RefreshNodesRequest.newInstance());
rm.adminService.refreshNodes(RefreshNodesRequest
.newInstance(DecommissionType.NORMAL));
} catch (Exception ex) {
fail("Using localConfigurationProvider. Should not get any exception.");
}
@ -500,7 +502,8 @@ public void testRefreshNodesWithFileSystemBasedConfigurationProvider()
+ "/excludeHosts");
uploadConfiguration(yarnConf, YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rm.adminService.refreshNodes(RefreshNodesRequest.newInstance());
rm.adminService.refreshNodes(RefreshNodesRequest
.newInstance(DecommissionType.NORMAL));
Set<String> excludeHosts =
rm.getNodesListManager().getHostsReader().getExcludedHosts();
Assert.assertTrue(excludeHosts.size() == 1);

View File

@ -40,7 +40,7 @@
public class TestNodesPage {
final int numberOfRacks = 2;
final int numberOfNodesPerRack = 6;
final int numberOfNodesPerRack = 7;
// The following is because of the way TestRMWebApp.mockRMContext creates
// nodes.
final int numberOfLostNodesPerRack = numberOfNodesPerRack