YARN-11424. [Federation] Router Supports DeregisterSubCluster. (#5363)
This commit is contained in:
parent
a2dda0ce03
commit
d95b5c679d
|
@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
|
||||
@Private
|
||||
public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol {
|
||||
|
@ -153,4 +155,22 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
|
|||
NodesToAttributesMappingResponse mapAttributesToNodes(
|
||||
NodesToAttributesMappingRequest request) throws YarnException,
|
||||
IOException;
|
||||
|
||||
/**
|
||||
* In YARN Federation mode, We allow users to mark subClusters
|
||||
* With no heartbeat for a long time as SC_LOST state.
|
||||
*
|
||||
* If we include a specific subClusterId in the request, check for the specified subCluster.
|
||||
* If subClusterId is empty, all subClusters are checked.
|
||||
*
|
||||
* @param request deregisterSubCluster request.
|
||||
* The request contains the id of to deregister sub-cluster.
|
||||
* @return Response from deregisterSubCluster.
|
||||
* @throws YarnException exceptions from yarn servers.
|
||||
* @throws IOException if an IO error occurred.
|
||||
*/
|
||||
@Private
|
||||
@Idempotent
|
||||
DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
|
||||
throws YarnException, IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract class DeregisterSubClusterRequest {
|
||||
|
||||
/**
|
||||
* Initialize DeregisterSubClusterRequest according to subClusterId.
|
||||
*
|
||||
* @param subClusterId subClusterId.
|
||||
* @return DeregisterSubClusterRequest.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public static DeregisterSubClusterRequest newInstance(String subClusterId) {
|
||||
DeregisterSubClusterRequest request = Records.newRecord(DeregisterSubClusterRequest.class);
|
||||
request.setSubClusterId(subClusterId);
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the subClusterId.
|
||||
*
|
||||
* @return subClusterId.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getSubClusterId();
|
||||
|
||||
/**
|
||||
* Set the subClusterId.
|
||||
*
|
||||
* @param subClusterId subClusterId.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setSubClusterId(String subClusterId);
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public abstract class DeregisterSubClusterResponse {
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static DeregisterSubClusterResponse newInstance(
|
||||
List<DeregisterSubClusters> deregisterSubClusters) {
|
||||
DeregisterSubClusterResponse response = Records.newRecord(DeregisterSubClusterResponse.class);
|
||||
response.setDeregisterSubClusters(deregisterSubClusters);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setDeregisterSubClusters(List<DeregisterSubClusters> deregisterSubClusters);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract List<DeregisterSubClusters> getDeregisterSubClusters();
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class DeregisterSubClusters {
|
||||
|
||||
/**
|
||||
* Initialize DeregisterSubClusters.
|
||||
*
|
||||
* @param subClusterId subCluster Id.
|
||||
* @param deregisterState deregister state,
|
||||
* SUCCESS means deregister is successful, Failed means deregister was unsuccessful.
|
||||
* @param lastHeartBeatTime last heartbeat time.
|
||||
* @param info offline information.
|
||||
* @param subClusterState subCluster State.
|
||||
* @return DeregisterSubClusters.
|
||||
*/
|
||||
public static DeregisterSubClusters newInstance(String subClusterId,
|
||||
String deregisterState, String lastHeartBeatTime, String info,
|
||||
String subClusterState) {
|
||||
DeregisterSubClusters deregisterSubClusters =
|
||||
Records.newRecord(DeregisterSubClusters.class);
|
||||
deregisterSubClusters.setSubClusterId(subClusterId);
|
||||
deregisterSubClusters.setDeregisterState(deregisterState);
|
||||
deregisterSubClusters.setLastHeartBeatTime(lastHeartBeatTime);
|
||||
deregisterSubClusters.setInformation(info);
|
||||
deregisterSubClusters.setSubClusterState(subClusterState);
|
||||
return deregisterSubClusters;
|
||||
}
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getSubClusterId();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setSubClusterId(String subClusterId);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getDeregisterState();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setDeregisterState(String deregisterState);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getLastHeartBeatTime();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setLastHeartBeatTime(String lastHeartBeatTime);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getInformation();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setInformation(String info);
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getSubClusterState();
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setSubClusterState(String subClusterState);
|
||||
}
|
|
@ -47,4 +47,5 @@ service ResourceManagerAdministrationProtocolService {
|
|||
rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto);
|
||||
rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
|
||||
rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto);
|
||||
rpc deregisterSubCluster(DeregisterSubClusterRequestProto) returns (DeregisterSubClusterResponseProto);
|
||||
}
|
||||
|
|
|
@ -161,6 +161,15 @@ message NodesToAttributesMappingRequestProto {
|
|||
|
||||
message NodesToAttributesMappingResponseProto {
|
||||
}
|
||||
|
||||
message DeregisterSubClusterRequestProto {
|
||||
optional string subClusterId = 1;
|
||||
}
|
||||
|
||||
message DeregisterSubClusterResponseProto {
|
||||
repeated DeregisterSubClustersProto deregisterSubClusters = 1;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////
|
||||
///////////// RM Failover related records ////////////////////////
|
||||
//////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -432,6 +432,14 @@ enum ExecutionTypeProto {
|
|||
OPPORTUNISTIC = 2;
|
||||
}
|
||||
|
||||
message DeregisterSubClustersProto {
|
||||
optional string subClusterId = 1;
|
||||
optional string deregisterState = 2;
|
||||
optional string lastHeartBeatTime = 3;
|
||||
optional string information = 4;
|
||||
optional string subClusterState = 5;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
////// From AM_RM_Protocol /////////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -17,18 +17,47 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
import org.apache.commons.cli.MissingArgumentException;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.ha.HAAdmin.UsageInfo;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class RouterCLI extends Configured implements Tool {
|
||||
|
||||
protected final static Map<String, UsageInfo> ADMIN_USAGE =
|
||||
ImmutableMap.<String, UsageInfo>builder().put("-deregisterSubCluster",
|
||||
new UsageInfo("[-sc|subClusterId [subCluster id]]",
|
||||
"deregister subCluster, if the interval between the heartbeat time of the subCluster " +
|
||||
"and the current time exceeds the timeout period, " +
|
||||
"set the state of the subCluster to SC_LOST")).build();
|
||||
|
||||
// title information
|
||||
private final static String SUB_CLUSTER_ID = "SubClusterId";
|
||||
private final static String DEREGISTER_STATE = "DeregisterState";
|
||||
private final static String LAST_HEARTBEAT_TIME = "LastHeartBeatTime";
|
||||
private final static String INFORMATION = "Information";
|
||||
private final static String SUB_CLUSTER_STATE = "SubClusterState";
|
||||
private static final String DEREGISTER_SUBCLUSTER_PATTERN = "%30s\t%20s\t%30s\t%30s\t%20s";
|
||||
|
||||
public RouterCLI() {
|
||||
super();
|
||||
}
|
||||
|
@ -37,14 +66,63 @@ public class RouterCLI extends Configured implements Tool {
|
|||
super(conf);
|
||||
}
|
||||
|
||||
private static void buildHelpMsg(String cmd, StringBuilder builder) {
|
||||
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
|
||||
if (usageInfo == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (usageInfo.args != null) {
|
||||
String space = (usageInfo.args == "") ? "" : " ";
|
||||
builder.append(" ")
|
||||
.append(cmd)
|
||||
.append(space)
|
||||
.append(usageInfo.args)
|
||||
.append(": ")
|
||||
.append(usageInfo.help);
|
||||
} else {
|
||||
builder.append(" ")
|
||||
.append(cmd)
|
||||
.append(": ")
|
||||
.append(usageInfo.help);
|
||||
}
|
||||
}
|
||||
|
||||
private static void buildIndividualUsageMsg(String cmd, StringBuilder builder) {
|
||||
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
|
||||
if (usageInfo == null) {
|
||||
return;
|
||||
}
|
||||
if (usageInfo.args == null) {
|
||||
builder.append("Usage: routeradmin [")
|
||||
.append(cmd)
|
||||
.append("]\n");
|
||||
} else {
|
||||
String space = (usageInfo.args == "") ? "" : " ";
|
||||
builder.append("Usage: routeradmin [")
|
||||
.append(cmd)
|
||||
.append(space)
|
||||
.append(usageInfo.args)
|
||||
.append("]\n");
|
||||
}
|
||||
}
|
||||
|
||||
private static void printHelp() {
|
||||
StringBuilder summary = new StringBuilder();
|
||||
summary.append("router-admin is the command to execute " +
|
||||
"YARN Federation administrative commands.\n");
|
||||
summary.append("router-admin is the command to execute ")
|
||||
.append("YARN Federation administrative commands.\n");
|
||||
summary.append("The full syntax is: \n\n")
|
||||
.append("routeradmin")
|
||||
.append(" [-deregisterSubCluster [-c|clusterId [subClusterId]]");
|
||||
summary.append(" [-help [cmd]]").append("\n");
|
||||
StringBuilder helpBuilder = new StringBuilder();
|
||||
System.out.println(summary);
|
||||
helpBuilder.append(" -help [cmd]: Displays help for the given command or all commands" +
|
||||
" if none is specified.");
|
||||
for (String cmdKey : ADMIN_USAGE.keySet()) {
|
||||
buildHelpMsg(cmdKey, helpBuilder);
|
||||
helpBuilder.append("\n");
|
||||
}
|
||||
helpBuilder.append(" -help [cmd]: Displays help for the given command or all commands")
|
||||
.append(" if none is specified.");
|
||||
System.out.println(helpBuilder);
|
||||
System.out.println();
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
|
@ -60,16 +138,82 @@ public class RouterCLI extends Configured implements Tool {
|
|||
private static void buildUsageMsg(StringBuilder builder) {
|
||||
builder.append("router-admin is only used in Yarn Federation Mode.\n");
|
||||
builder.append("Usage: router-admin\n");
|
||||
builder.append(" -help" + " [cmd]\n");
|
||||
for (Map.Entry<String, UsageInfo> cmdEntry : ADMIN_USAGE.entrySet()) {
|
||||
UsageInfo usageInfo = cmdEntry.getValue();
|
||||
builder.append(" ")
|
||||
.append(cmdEntry.getKey())
|
||||
.append(" ")
|
||||
.append(usageInfo.args)
|
||||
.append("\n");
|
||||
}
|
||||
builder.append(" -help [cmd]\n");
|
||||
}
|
||||
|
||||
private static void printUsage() {
|
||||
private static void printUsage(String cmd) {
|
||||
StringBuilder usageBuilder = new StringBuilder();
|
||||
if (ADMIN_USAGE.containsKey(cmd)) {
|
||||
buildIndividualUsageMsg(cmd, usageBuilder);
|
||||
} else {
|
||||
buildUsageMsg(usageBuilder);
|
||||
}
|
||||
System.err.println(usageBuilder);
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
}
|
||||
|
||||
private int handleDeregisterSubCluster(String[] args)
|
||||
throws IOException, YarnException, ParseException {
|
||||
|
||||
Options opts = new Options();
|
||||
opts.addOption("deregisterSubCluster", false,
|
||||
"Refresh the hosts information at the ResourceManager.");
|
||||
Option gracefulOpt = new Option("c", "clusterId", true,
|
||||
"Wait for timeout before marking the NodeManager as decommissioned.");
|
||||
gracefulOpt.setOptionalArg(true);
|
||||
opts.addOption(gracefulOpt);
|
||||
|
||||
CommandLine cliParser;
|
||||
try {
|
||||
cliParser = new GnuParser().parse(opts, args);
|
||||
} catch (MissingArgumentException ex) {
|
||||
System.out.println("Missing argument for options");
|
||||
printUsage(args[0]);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (cliParser.hasOption("c")) {
|
||||
String subClusterId = cliParser.getOptionValue("c");
|
||||
return deregisterSubCluster(subClusterId);
|
||||
} else {
|
||||
return deregisterSubCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private int deregisterSubCluster(String subClusterId)
|
||||
throws IOException, YarnException {
|
||||
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
|
||||
DeregisterSubClusterRequest request =
|
||||
DeregisterSubClusterRequest.newInstance(subClusterId);
|
||||
DeregisterSubClusterResponse response = adminProtocol.deregisterSubCluster(request);
|
||||
System.out.println(String.format(DEREGISTER_SUBCLUSTER_PATTERN,
|
||||
SUB_CLUSTER_ID, DEREGISTER_STATE, LAST_HEARTBEAT_TIME, INFORMATION, SUB_CLUSTER_STATE));
|
||||
List<DeregisterSubClusters> deregisterSubClusters = response.getDeregisterSubClusters();
|
||||
deregisterSubClusters.forEach(deregisterSubCluster -> {
|
||||
String responseSubClusterId = deregisterSubCluster.getSubClusterId();
|
||||
String deregisterState = deregisterSubCluster.getDeregisterState();
|
||||
String lastHeartBeatTime = deregisterSubCluster.getLastHeartBeatTime();
|
||||
String info = deregisterSubCluster.getInformation();
|
||||
String subClusterState = deregisterSubCluster.getSubClusterState();
|
||||
System.out.println(String.format(DEREGISTER_SUBCLUSTER_PATTERN,
|
||||
responseSubClusterId, deregisterState, lastHeartBeatTime, info, subClusterState));
|
||||
});
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int deregisterSubCluster() throws IOException, YarnException {
|
||||
deregisterSubCluster("");
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
YarnConfiguration yarnConf = getConf() == null ?
|
||||
|
@ -78,16 +222,29 @@ public class RouterCLI extends Configured implements Tool {
|
|||
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
|
||||
|
||||
if (args.length < 1 || !isFederationEnabled) {
|
||||
printUsage();
|
||||
printUsage("");
|
||||
return -1;
|
||||
}
|
||||
|
||||
String cmd = args[0];
|
||||
if ("-help".equals(cmd)) {
|
||||
if (args.length > 1) {
|
||||
printUsage(args[1]);
|
||||
} else {
|
||||
printHelp();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ("-deregisterSubCluster".equals(cmd)) {
|
||||
return handleDeregisterSubCluster(args);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int result = ToolRunner.run(new RouterCLI(), args);
|
||||
System.exit(result);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,27 +17,52 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestRouterCLI {
|
||||
|
||||
private ResourceManagerAdministrationProtocol admin;
|
||||
private RouterCLI rmAdminCLI;
|
||||
private final static int SUBCLUSTER_NUM = 4;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
|
||||
admin = mock(ResourceManagerAdministrationProtocol.class);
|
||||
when(admin.deregisterSubCluster(any(DeregisterSubClusterRequest.class)))
|
||||
.thenAnswer((Answer<DeregisterSubClusterResponse>) invocationOnMock -> {
|
||||
// Step1. parse subClusterId.
|
||||
Object obj = invocationOnMock.getArgument(0);
|
||||
DeregisterSubClusterRequest request = (DeregisterSubClusterRequest) obj;
|
||||
String subClusterId = request.getSubClusterId();
|
||||
|
||||
if (StringUtils.isNotBlank(subClusterId)) {
|
||||
return generateSubClusterDataBySCId(subClusterId);
|
||||
} else {
|
||||
return generateAllSubClusterData();
|
||||
}
|
||||
});
|
||||
|
||||
Configuration config = new Configuration();
|
||||
config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||
|
||||
|
@ -49,6 +74,33 @@ public class TestRouterCLI {
|
|||
};
|
||||
}
|
||||
|
||||
private DeregisterSubClusterResponse generateSubClusterDataBySCId(String subClusterId) {
|
||||
// Step2. generate return data.
|
||||
String lastHeartBeatTime = new Date().toString();
|
||||
DeregisterSubClusters deregisterSubClusters =
|
||||
DeregisterSubClusters.newInstance(subClusterId, "SUCCESS", lastHeartBeatTime,
|
||||
"Heartbeat Time > 30 minutes", "SC_LOST");
|
||||
List<DeregisterSubClusters> deregisterSubClusterList = new ArrayList<>();
|
||||
deregisterSubClusterList.add(deregisterSubClusters);
|
||||
|
||||
// Step3. return data.
|
||||
return DeregisterSubClusterResponse.newInstance(deregisterSubClusterList);
|
||||
}
|
||||
|
||||
private DeregisterSubClusterResponse generateAllSubClusterData() {
|
||||
List<DeregisterSubClusters> deregisterSubClusterList = new ArrayList<>();
|
||||
for (int i = 1; i <= SUBCLUSTER_NUM; i++) {
|
||||
String subClusterId = "SC-" + i;
|
||||
String lastHeartBeatTime = new Date().toString();
|
||||
DeregisterSubClusters deregisterSubClusters =
|
||||
DeregisterSubClusters.newInstance(subClusterId, "SUCCESS", lastHeartBeatTime,
|
||||
"Heartbeat Time > 30 minutes", "SC_LOST");
|
||||
deregisterSubClusterList.add(deregisterSubClusters);
|
||||
}
|
||||
|
||||
return DeregisterSubClusterResponse.newInstance(deregisterSubClusterList);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHelp() throws Exception {
|
||||
PrintStream oldOutPrintStream = System.out;
|
||||
|
@ -61,4 +113,31 @@ public class TestRouterCLI {
|
|||
String[] args = {"-help"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeregisterSubCluster() throws Exception {
|
||||
PrintStream oldOutPrintStream = System.out;
|
||||
ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(dataOut));
|
||||
oldOutPrintStream.println(dataOut);
|
||||
String[] args = {"-deregisterSubCluster", "-c", "SC-1"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeregisterSubClusters() throws Exception {
|
||||
PrintStream oldOutPrintStream = System.out;
|
||||
ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(dataOut));
|
||||
oldOutPrintStream.println(dataOut);
|
||||
|
||||
String[] args = {"-deregisterSubCluster"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
|
||||
args = new String[]{"-deregisterSubCluster", "-c"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
|
||||
args = new String[]{"-deregisterSubCluster", "-c", ""};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Refre
|
|||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
|
@ -75,6 +76,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
|
||||
|
@ -103,6 +106,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOn
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl;
|
||||
|
||||
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
|
||||
|
||||
|
@ -111,8 +116,8 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
|
|||
|
||||
private ResourceManagerAdministrationProtocolPB proxy;
|
||||
|
||||
public ResourceManagerAdministrationProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
|
||||
Configuration conf) throws IOException {
|
||||
public ResourceManagerAdministrationProtocolPBClientImpl(
|
||||
long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
|
||||
RPC.setProtocolEngine(conf, ResourceManagerAdministrationProtocolPB.class,
|
||||
ProtobufRpcEngine2.class);
|
||||
proxy = (ResourceManagerAdministrationProtocolPB)RPC.getProxy(
|
||||
|
@ -343,4 +348,18 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeregisterSubClusterResponse deregisterSubCluster(
|
||||
DeregisterSubClusterRequest request) throws YarnException, IOException {
|
||||
DeregisterSubClusterRequestProto requestProto =
|
||||
((DeregisterSubClusterRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new DeregisterSubClusterResponsePBImpl(
|
||||
proxy.deregisterSubCluster(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,6 +52,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Repla
|
|||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterResponseProto;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
|
@ -71,6 +73,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
|
||||
|
@ -99,6 +103,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOn
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl;
|
||||
|
||||
import org.apache.hadoop.thirdparty.protobuf.RpcController;
|
||||
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
|
||||
|
@ -359,4 +365,18 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeregisterSubClusterResponseProto deregisterSubCluster(RpcController controller,
|
||||
DeregisterSubClusterRequestProto proto) throws ServiceException {
|
||||
DeregisterSubClusterRequest request = new DeregisterSubClusterRequestPBImpl(proto);
|
||||
try {
|
||||
DeregisterSubClusterResponse response = real.deregisterSubCluster(request);
|
||||
return ((DeregisterSubClusterResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class DeregisterSubClusterRequestPBImpl extends DeregisterSubClusterRequest {
|
||||
|
||||
private DeregisterSubClusterRequestProto proto =
|
||||
DeregisterSubClusterRequestProto.getDefaultInstance();
|
||||
private DeregisterSubClusterRequestProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public DeregisterSubClusterRequestPBImpl() {
|
||||
builder = DeregisterSubClusterRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public DeregisterSubClusterRequestPBImpl(DeregisterSubClusterRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = DeregisterSubClusterRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
public DeregisterSubClusterRequestProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof DeregisterSubClusterRequest)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DeregisterSubClusterRequestPBImpl otherImpl = this.getClass().cast(other);
|
||||
return new EqualsBuilder()
|
||||
.append(this.getProto(), otherImpl.getProto())
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSubClusterId() {
|
||||
DeregisterSubClusterRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
boolean hasSubClusterId = p.hasSubClusterId();
|
||||
if (hasSubClusterId) {
|
||||
return p.getSubClusterId();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSubClusterId(String subClusterId) {
|
||||
maybeInitBuilder();
|
||||
if (subClusterId == null) {
|
||||
builder.clearSubClusterId();
|
||||
return;
|
||||
}
|
||||
builder.setSubClusterId(subClusterId);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.DeregisterSubClustersProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class DeregisterSubClusterResponsePBImpl extends DeregisterSubClusterResponse {
|
||||
private DeregisterSubClusterResponseProto proto =
|
||||
DeregisterSubClusterResponseProto.getDefaultInstance();
|
||||
private DeregisterSubClusterResponseProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
private List<DeregisterSubClusters> deregisterSubClustersMapping = null;
|
||||
|
||||
public DeregisterSubClusterResponsePBImpl() {
|
||||
this.builder = DeregisterSubClusterResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public DeregisterSubClusterResponsePBImpl(DeregisterSubClusterResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = DeregisterSubClusterResponseProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto) {
|
||||
maybeInitBuilder();
|
||||
}
|
||||
if (this.deregisterSubClustersMapping != null) {
|
||||
for (DeregisterSubClusters deregisterSubClusters : deregisterSubClustersMapping) {
|
||||
DeregisterSubClustersPBImpl deregisterSubClustersPBImpl =
|
||||
(DeregisterSubClustersPBImpl) deregisterSubClusters;
|
||||
builder.addDeregisterSubClusters(deregisterSubClustersPBImpl.getProto());
|
||||
}
|
||||
}
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public DeregisterSubClusterResponseProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof DeregisterSubClusterResponse)) {
|
||||
return false;
|
||||
}
|
||||
DeregisterSubClusterResponsePBImpl otherImpl = this.getClass().cast(other);
|
||||
return new EqualsBuilder()
|
||||
.append(this.getProto(), otherImpl.getProto())
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDeregisterSubClusters(List<DeregisterSubClusters> deregisterSubClusters) {
|
||||
if (deregisterSubClustersMapping == null) {
|
||||
deregisterSubClustersMapping = new ArrayList<>();
|
||||
}
|
||||
if(deregisterSubClusters == null) {
|
||||
throw new IllegalArgumentException("deregisterSubClusters cannot be null");
|
||||
}
|
||||
deregisterSubClustersMapping.clear();
|
||||
deregisterSubClustersMapping.addAll(deregisterSubClusters);
|
||||
}
|
||||
|
||||
private void initDeregisterSubClustersMapping() {
|
||||
if (this.deregisterSubClustersMapping != null) {
|
||||
return;
|
||||
}
|
||||
DeregisterSubClusterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<DeregisterSubClustersProto> deregisterSubClustersProtoList =
|
||||
p.getDeregisterSubClustersList();
|
||||
List<DeregisterSubClusters> attributes = new ArrayList<>();
|
||||
if (deregisterSubClustersProtoList == null || deregisterSubClustersProtoList.size() == 0) {
|
||||
this.deregisterSubClustersMapping = attributes;
|
||||
return;
|
||||
}
|
||||
for (DeregisterSubClustersProto deregisterSubClustersProto : deregisterSubClustersProtoList) {
|
||||
attributes.add(new DeregisterSubClustersPBImpl(deregisterSubClustersProto));
|
||||
}
|
||||
this.deregisterSubClustersMapping = attributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DeregisterSubClusters> getDeregisterSubClusters() {
|
||||
initDeregisterSubClustersMapping();
|
||||
return this.deregisterSubClustersMapping;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,178 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.DeregisterSubClustersProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.DeregisterSubClustersProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class DeregisterSubClustersPBImpl extends DeregisterSubClusters {
|
||||
|
||||
private DeregisterSubClustersProto proto = DeregisterSubClustersProto.getDefaultInstance();
|
||||
private DeregisterSubClustersProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public DeregisterSubClustersPBImpl() {
|
||||
this.builder = DeregisterSubClustersProto.newBuilder();
|
||||
}
|
||||
|
||||
public DeregisterSubClustersPBImpl(DeregisterSubClustersProto proto) {
|
||||
this.proto = proto;
|
||||
this.viaProto = true;
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (this.viaProto || this.builder == null) {
|
||||
this.builder = DeregisterSubClustersProto.newBuilder(proto);
|
||||
}
|
||||
this.viaProto = false;
|
||||
}
|
||||
|
||||
public DeregisterSubClustersProto getProto() {
|
||||
this.proto = this.viaProto ? this.proto : this.builder.build();
|
||||
this.viaProto = true;
|
||||
return this.proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof DeregisterSubClusters)) {
|
||||
return false;
|
||||
}
|
||||
DeregisterSubClustersPBImpl otherImpl = this.getClass().cast(other);
|
||||
return new EqualsBuilder()
|
||||
.append(this.getProto(), otherImpl.getProto())
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSubClusterId() {
|
||||
DeregisterSubClustersProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
|
||||
boolean hasSubClusterId = p.hasSubClusterId();
|
||||
if (hasSubClusterId) {
|
||||
return p.getSubClusterId();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSubClusterId(String subClusterId) {
|
||||
maybeInitBuilder();
|
||||
if (subClusterId == null) {
|
||||
builder.clearSubClusterId();
|
||||
return;
|
||||
}
|
||||
builder.setSubClusterId(subClusterId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDeregisterState() {
|
||||
DeregisterSubClustersProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
|
||||
boolean hasDeregisterState = p.hasDeregisterState();
|
||||
if (hasDeregisterState) {
|
||||
return p.getDeregisterState();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDeregisterState(String deregisterState) {
|
||||
maybeInitBuilder();
|
||||
if (deregisterState == null) {
|
||||
builder.clearDeregisterState();
|
||||
return;
|
||||
}
|
||||
builder.setDeregisterState(deregisterState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLastHeartBeatTime() {
|
||||
DeregisterSubClustersProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
|
||||
boolean hasLastHeartBeatTime = p.hasLastHeartBeatTime();
|
||||
if (hasLastHeartBeatTime) {
|
||||
return p.getLastHeartBeatTime();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastHeartBeatTime(String lastHeartBeatTime) {
|
||||
maybeInitBuilder();
|
||||
if (lastHeartBeatTime == null) {
|
||||
builder.clearLastHeartBeatTime();
|
||||
return;
|
||||
}
|
||||
builder.setLastHeartBeatTime(lastHeartBeatTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInformation() {
|
||||
DeregisterSubClustersProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
|
||||
boolean hasInformation = p.hasInformation();
|
||||
if (hasInformation) {
|
||||
return p.getInformation();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInformation(String info) {
|
||||
maybeInitBuilder();
|
||||
if (info == null) {
|
||||
builder.clearInformation();
|
||||
return;
|
||||
}
|
||||
builder.setInformation(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSubClusterState() {
|
||||
DeregisterSubClustersProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
|
||||
boolean hasSubClusterState = p.hasSubClusterState();
|
||||
if (hasSubClusterState) {
|
||||
return p.getSubClusterState();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSubClusterState(String subClusterState) {
|
||||
maybeInitBuilder();
|
||||
if (subClusterState == null) {
|
||||
builder.clearSubClusterState();
|
||||
return;
|
||||
}
|
||||
builder.setSubClusterState(subClusterState);
|
||||
}
|
||||
}
|
|
@ -5127,6 +5127,15 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.router.submit.interval.time</name>
|
||||
<value>10ms</value>
|
||||
<description>
|
||||
The interval Time between calling different subCluster requests.
|
||||
Default is 10ms.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.router.asc-interceptor-max-size</name>
|
||||
<value>1MB</value>
|
||||
|
|
|
@ -51,8 +51,8 @@ public enum SubClusterState {
|
|||
/** Subcluster has unregistered. */
|
||||
SC_UNREGISTERED;
|
||||
|
||||
public boolean isUnusable() {
|
||||
return (this != SC_RUNNING && this != SC_NEW);
|
||||
public boolean isUsable() {
|
||||
return (this == SC_RUNNING || this == SC_NEW);
|
||||
}
|
||||
|
||||
public boolean isActive() {
|
||||
|
|
|
@ -172,7 +172,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
|
||||
|
||||
/**
|
||||
|
@ -956,6 +957,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
|
||||
throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
|
||||
return applicationContainerIdMap;
|
||||
|
|
|
@ -1877,7 +1877,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
Set<SubClusterId> timeOutScs = getTimedOutSCs(true);
|
||||
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
||||
if (timeOutScs.contains(subClusterId) ||
|
||||
subClusterInfo == null || subClusterInfo.getState().isUnusable()) {
|
||||
subClusterInfo == null || !subClusterInfo.getState().isUsable()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -95,6 +95,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
||||
|
@ -1030,6 +1032,28 @@ public class AdminService extends CompositeService implements
|
|||
.newRecordInstance(NodesToAttributesMappingResponse.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* In YARN Federation mode, We allow users to mark subClusters
|
||||
* With no heartbeat for a long time as SC_LOST state.
|
||||
*
|
||||
* RM does not support deregisterSubCluster, deregisterSubCluster is supported by Router.
|
||||
*
|
||||
* If we include a specific subClusterId in the request, check for the specified subCluster.
|
||||
* If subClusterId is empty, all subClusters are checked.
|
||||
*
|
||||
* @param request deregisterSubCluster request.
|
||||
* The request contains the id of to deregister sub-cluster.
|
||||
* @return Response from deregisterSubCluster.
|
||||
* @throws YarnException exceptions from yarn servers.
|
||||
*/
|
||||
@Override
|
||||
public DeregisterSubClusterResponse deregisterSubCluster(
|
||||
DeregisterSubClusterRequest request) throws YarnException {
|
||||
throw new YarnException("It is not allowed to call the RM's deregisterSubCluster to " +
|
||||
"set the subCluster(s) state to SC_LOST, " +
|
||||
"Please call Router's deregisterSubCluster to set.");
|
||||
}
|
||||
|
||||
private void validateAttributesExists(
|
||||
List<NodeToAttributes> nodesToAttributes) throws IOException {
|
||||
NodeAttributesManager nodeAttributesManager =
|
||||
|
|
|
@ -147,6 +147,8 @@ public final class RouterMetrics {
|
|||
private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved;
|
||||
@Metric("# of refreshUserToGroupsMappings failed to be retrieved")
|
||||
private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved;
|
||||
@Metric("# of deregisterSubCluster failed to be retrieved")
|
||||
private MutableGaugeInt numDeregisterSubClusterFailedRetrieved;
|
||||
@Metric("# of refreshAdminAcls failed to be retrieved")
|
||||
private MutableGaugeInt numRefreshAdminAclsFailedRetrieved;
|
||||
@Metric("# of refreshServiceAcls failed to be retrieved")
|
||||
|
@ -291,6 +293,8 @@ public final class RouterMetrics {
|
|||
private MutableRate totalSucceededReplaceLabelsOnNodeRetrieved;
|
||||
@Metric("Total number of successful Retrieved GetSchedulerInfo and latency(ms)")
|
||||
private MutableRate totalSucceededGetSchedulerInfoRetrieved;
|
||||
@Metric("Total number of successful Retrieved DeregisterSubCluster and latency(ms)")
|
||||
private MutableRate totalSucceededDeregisterSubClusterRetrieved;
|
||||
@Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)")
|
||||
private MutableRate totalSucceededRefreshAdminAclsRetrieved;
|
||||
@Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)")
|
||||
|
@ -376,6 +380,7 @@ public final class RouterMetrics {
|
|||
private MutableQuantiles getSchedulerInfoRetrievedLatency;
|
||||
private MutableQuantiles refreshSuperUserGroupsConfLatency;
|
||||
private MutableQuantiles refreshUserToGroupsMappingsLatency;
|
||||
private MutableQuantiles refreshDeregisterSubClusterLatency;
|
||||
private MutableQuantiles refreshAdminAclsLatency;
|
||||
private MutableQuantiles refreshServiceAclsLatency;
|
||||
private MutableQuantiles replaceLabelsOnNodesLatency;
|
||||
|
@ -584,6 +589,9 @@ public final class RouterMetrics {
|
|||
refreshUserToGroupsMappingsLatency = registry.newQuantiles("refreshUserToGroupsMappingsLatency",
|
||||
"latency of refresh user to groups mappings timeouts", "ops", "latency", 10);
|
||||
|
||||
refreshDeregisterSubClusterLatency = registry.newQuantiles("refreshDeregisterSubClusterLatency",
|
||||
"latency of deregister subcluster timeouts", "ops", "latency", 10);
|
||||
|
||||
refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency",
|
||||
"latency of refresh admin acls timeouts", "ops", "latency", 10);
|
||||
|
||||
|
@ -908,6 +916,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededDeregisterSubClusterRetrieved() {
|
||||
return totalSucceededDeregisterSubClusterRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededRefreshAdminAclsRetrieved() {
|
||||
return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples();
|
||||
|
@ -1248,6 +1261,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededDeregisterSubClusterRetrieved() {
|
||||
return totalSucceededDeregisterSubClusterRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededRefreshAdminAclsRetrieved() {
|
||||
return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean();
|
||||
|
@ -1539,6 +1557,10 @@ public final class RouterMetrics {
|
|||
return numRefreshUserToGroupsMappingsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getDeregisterSubClusterFailedRetrieved() {
|
||||
return numDeregisterSubClusterFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getNumRefreshAdminAclsFailedRetrieved() {
|
||||
return numRefreshAdminAclsFailedRetrieved.value();
|
||||
}
|
||||
|
@ -1886,6 +1908,11 @@ public final class RouterMetrics {
|
|||
getSchedulerInfoRetrievedLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededDeregisterSubClusterRetrieved(long duration) {
|
||||
totalSucceededDeregisterSubClusterRetrieved.add(duration);
|
||||
refreshDeregisterSubClusterLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededRefreshAdminAclsRetrieved(long duration) {
|
||||
totalSucceededRefreshAdminAclsRetrieved.add(duration);
|
||||
refreshAdminAclsLatency.add(duration);
|
||||
|
@ -2160,6 +2187,10 @@ public final class RouterMetrics {
|
|||
numRefreshUserToGroupsMappingsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrDeregisterSubClusterFailedRetrieved() {
|
||||
numDeregisterSubClusterFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrRefreshAdminAclsFailedRetrieved() {
|
||||
numRefreshAdminAclsFailedRetrieved.incr();
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ public class SubClusterCleaner implements Runnable {
|
|||
long lastHeartBeatTime = subClusterInfo.getLastHeartBeat();
|
||||
|
||||
// We Only Check SubClusters in NEW and RUNNING states
|
||||
if (!subClusterState.isUnusable()) {
|
||||
if (subClusterState.isUsable()) {
|
||||
long heartBeatInterval = now.getTime() - lastHeartBeatTime;
|
||||
try {
|
||||
// HeartBeat Interval Exceeds Expiration Time
|
||||
|
|
|
@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -222,4 +224,10 @@ public class DefaultRMAdminRequestInterceptor
|
|||
throws YarnException, IOException {
|
||||
return rmAdminProxy.mapAttributesToNodes(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
|
||||
throws YarnException, IOException {
|
||||
return rmAdminProxy.deregisterSubCluster(request);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -59,8 +60,13 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriori
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
|
||||
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
|
@ -71,9 +77,11 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
@ -94,6 +102,7 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
|
|||
private RouterMetrics routerMetrics;
|
||||
private ThreadPoolExecutor executorService;
|
||||
private Configuration conf;
|
||||
private long heartbeatExpirationMillis;
|
||||
|
||||
@Override
|
||||
public void init(String userName) {
|
||||
|
@ -113,6 +122,10 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
|
|||
this.conf = this.getConf();
|
||||
this.adminRMProxies = new ConcurrentHashMap<>();
|
||||
routerMetrics = RouterMetrics.getMetrics();
|
||||
|
||||
this.heartbeatExpirationMillis = this.conf.getTimeDuration(
|
||||
YarnConfiguration.ROUTER_SUBCLUSTER_EXPIRATION_TIME,
|
||||
YarnConfiguration.DEFAULT_ROUTER_SUBCLUSTER_EXPIRATION_TIME, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -795,4 +808,112 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
|
|||
public ThreadPoolExecutor getExecutorService() {
|
||||
return executorService;
|
||||
}
|
||||
|
||||
/**
|
||||
* In YARN Federation mode, We allow users to mark subClusters
|
||||
* With no heartbeat for a long time as SC_LOST state.
|
||||
*
|
||||
* If we include a specific subClusterId in the request, check for the specified subCluster.
|
||||
* If subClusterId is empty, all subClusters are checked.
|
||||
*
|
||||
* @param request deregisterSubCluster request.
|
||||
* The request contains the id of to deregister sub-cluster.
|
||||
* @return Response from deregisterSubCluster.
|
||||
* @throws YarnException exceptions from yarn servers.
|
||||
* @throws IOException if an IO error occurred.
|
||||
*/
|
||||
@Override
|
||||
public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
if (request == null) {
|
||||
routerMetrics.incrDeregisterSubClusterFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Missing DeregisterSubCluster request.", null);
|
||||
}
|
||||
|
||||
try {
|
||||
long startTime = clock.getTime();
|
||||
List<DeregisterSubClusters> deregisterSubClusterList = new ArrayList<>();
|
||||
String reqSubClusterId = request.getSubClusterId();
|
||||
if (StringUtils.isNotBlank(reqSubClusterId)) {
|
||||
// If subCluster is not empty, process the specified subCluster.
|
||||
DeregisterSubClusters deregisterSubClusters = deregisterSubCluster(reqSubClusterId);
|
||||
deregisterSubClusterList.add(deregisterSubClusters);
|
||||
} else {
|
||||
// Traversing all Active SubClusters,
|
||||
// for subCluster whose heartbeat times out, update the status to SC_LOST.
|
||||
Map<SubClusterId, SubClusterInfo> subClusterInfo = federationFacade.getSubClusters(true);
|
||||
for (Map.Entry<SubClusterId, SubClusterInfo> entry : subClusterInfo.entrySet()) {
|
||||
SubClusterId subClusterId = entry.getKey();
|
||||
DeregisterSubClusters deregisterSubClusters = deregisterSubCluster(subClusterId.getId());
|
||||
deregisterSubClusterList.add(deregisterSubClusters);
|
||||
}
|
||||
}
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededDeregisterSubClusterRetrieved(stopTime - startTime);
|
||||
return DeregisterSubClusterResponse.newInstance(deregisterSubClusterList);
|
||||
} catch (Exception e) {
|
||||
routerMetrics.incrDeregisterSubClusterFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(e,
|
||||
"Unable to deregisterSubCluster due to exception. " + e.getMessage());
|
||||
}
|
||||
|
||||
routerMetrics.incrDeregisterSubClusterFailedRetrieved();
|
||||
throw new YarnException("Unable to deregisterSubCluster.");
|
||||
}
|
||||
|
||||
/**
|
||||
* deregisterSubCluster by SubClusterId.
|
||||
*
|
||||
* @param reqSubClusterId subClusterId.
|
||||
* @throws YarnException indicates exceptions from yarn servers.
|
||||
*/
|
||||
private DeregisterSubClusters deregisterSubCluster(String reqSubClusterId) {
|
||||
|
||||
DeregisterSubClusters deregisterSubClusters = null;
|
||||
|
||||
try {
|
||||
// Step1. Get subCluster information.
|
||||
SubClusterId subClusterId = SubClusterId.newInstance(reqSubClusterId);
|
||||
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
|
||||
SubClusterState subClusterState = subClusterInfo.getState();
|
||||
long lastHeartBeat = subClusterInfo.getLastHeartBeat();
|
||||
Date lastHeartBeatDate = new Date(lastHeartBeat);
|
||||
|
||||
deregisterSubClusters = DeregisterSubClusters.newInstance(
|
||||
reqSubClusterId, "UNKNOWN", lastHeartBeatDate.toString(), "", subClusterState.name());
|
||||
|
||||
// Step2. Deregister subCluster.
|
||||
if (subClusterState.isUsable()) {
|
||||
LOG.warn("Deregister SubCluster {} in State {} last heartbeat at {}.",
|
||||
subClusterId, subClusterState, lastHeartBeatDate);
|
||||
// heartbeat interval time.
|
||||
long heartBearTimeInterval = Time.now() - lastHeartBeat;
|
||||
if (heartBearTimeInterval - heartbeatExpirationMillis < 0) {
|
||||
boolean deregisterSubClusterFlag =
|
||||
federationFacade.deregisterSubCluster(subClusterId, SubClusterState.SC_LOST);
|
||||
if (deregisterSubClusterFlag) {
|
||||
deregisterSubClusters.setDeregisterState("SUCCESS");
|
||||
deregisterSubClusters.setSubClusterState("SC_LOST");
|
||||
deregisterSubClusters.setInformation("Heartbeat Time >= 30 minutes.");
|
||||
} else {
|
||||
deregisterSubClusters.setDeregisterState("FAILED");
|
||||
deregisterSubClusters.setInformation("DeregisterSubClusters Failed.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
deregisterSubClusters.setDeregisterState("FAILED");
|
||||
deregisterSubClusters.setInformation("Heartbeat Time < 30 minutes. " +
|
||||
"DeregisterSubCluster does not need to be executed");
|
||||
LOG.warn("SubCluster {} in State {} does not need to update state.",
|
||||
subClusterId, subClusterState);
|
||||
}
|
||||
return deregisterSubClusters;
|
||||
} catch (YarnException e) {
|
||||
LOG.error("SubCluster {} DeregisterSubCluster Failed", reqSubClusterId, e);
|
||||
deregisterSubClusters = DeregisterSubClusters.newInstance(
|
||||
reqSubClusterId, "FAILED", "UNKNOWN", e.getMessage(), "UNKNOWN");
|
||||
return deregisterSubClusters;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
|
@ -383,4 +385,11 @@ public class RouterRMAdminService extends AbstractService
|
|||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().mapAttributesToNodes(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeregisterSubClusterResponse deregisterSubCluster(
|
||||
DeregisterSubClusterRequest request) throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().deregisterSubCluster(request);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -569,6 +569,11 @@ public class TestRouterMetrics {
|
|||
metrics.incrGetBulkActivitiesFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getDeregisterSubClusterFailed() {
|
||||
LOG.info("Mocked: failed deregisterSubCluster call");
|
||||
metrics.incrDeregisterSubClusterFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getSchedulerConfigurationFailed() {
|
||||
LOG.info("Mocked: failed getSchedulerConfiguration call");
|
||||
metrics.incrGetSchedulerConfigurationFailedRetrieved();
|
||||
|
@ -884,6 +889,11 @@ public class TestRouterMetrics {
|
|||
metrics.succeededGetBulkActivitiesRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getDeregisterSubClusterRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful DeregisterSubCluster call with duration {}", duration);
|
||||
metrics.succeededDeregisterSubClusterRetrieved(duration);
|
||||
}
|
||||
|
||||
public void addToClusterNodeLabelsRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful AddToClusterNodeLabels call with duration {}", duration);
|
||||
metrics.succeededAddToClusterNodeLabelsRetrieved(duration);
|
||||
|
@ -1938,6 +1948,29 @@ public class TestRouterMetrics {
|
|||
metrics.getBulkActivitiesFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeregisterSubClusterRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededDeregisterSubClusterRetrieved();
|
||||
goodSubCluster.getDeregisterSubClusterRetrieved(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededDeregisterSubClusterRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededDeregisterSubClusterRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getDeregisterSubClusterRetrieved(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededDeregisterSubClusterRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededDeregisterSubClusterRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeregisterSubClusterRetrievedFailed() {
|
||||
long totalBadBefore = metrics.getDeregisterSubClusterFailedRetrieved();
|
||||
badSubCluster.getDeregisterSubClusterFailed();
|
||||
Assert.assertEquals(totalBadBefore + 1,
|
||||
metrics.getDeregisterSubClusterFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddToClusterNodeLabelsRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededAddToClusterNodeLabelsRetrieved();
|
||||
|
|
|
@ -50,6 +50,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
|
||||
|
||||
/**
|
||||
* Mock interceptor that does not do anything other than forwarding it to the
|
||||
|
@ -154,4 +156,9 @@ public class PassThroughRMAdminRequestInterceptor
|
|||
return getNextInterceptor().mapAttributesToNodes(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
|
||||
throws YarnException, IOException {
|
||||
return getNextInterceptor().deregisterSubCluster(request);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue