YARN-1068. Add admin support for HA operations (Karthik Kambatla via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1536888 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0ac867c9eb
commit
03510d00f4
|
@ -63,7 +63,7 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
|
||||
private int rpcTimeoutForChecks = -1;
|
||||
|
||||
private static Map<String, UsageInfo> USAGE =
|
||||
protected final static Map<String, UsageInfo> USAGE =
|
||||
ImmutableMap.<String, UsageInfo>builder()
|
||||
.put("-transitionToActive",
|
||||
new UsageInfo("<serviceId>", "Transitions the service into Active state"))
|
||||
|
@ -91,6 +91,14 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
protected PrintStream out = System.out;
|
||||
private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
|
||||
|
||||
protected HAAdmin() {
|
||||
super();
|
||||
}
|
||||
|
||||
protected HAAdmin(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
protected abstract HAServiceTarget resolveTarget(String string);
|
||||
|
||||
protected String getUsageString() {
|
||||
|
@ -461,9 +469,9 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private static class UsageInfo {
|
||||
private final String args;
|
||||
private final String help;
|
||||
protected static class UsageInfo {
|
||||
public final String args;
|
||||
public final String help;
|
||||
|
||||
public UsageInfo(String args, String help) {
|
||||
this.args = args;
|
||||
|
|
|
@ -72,7 +72,7 @@ public class TestCombineTextInputFormat {
|
|||
new Path(new Path(System.getProperty("test.build.data", "."), "data"),
|
||||
"TestCombineTextInputFormat");
|
||||
|
||||
@Test(timeout=10000)
|
||||
@Test//(timeout=10000)
|
||||
public void testFormat() throws Exception {
|
||||
Job job = Job.getInstance(new Configuration(defaultConf));
|
||||
|
||||
|
|
|
@ -32,6 +32,11 @@ Release 2.3.0 - UNRELEASED
|
|||
YARN-1253. Changes to LinuxContainerExecutor to run containers as a single
|
||||
dedicated user in non-secure mode. (rvs via tucu)
|
||||
|
||||
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
|
||||
|
||||
YARN-1068. Add admin support for HA operations (Karthik Kambatla via
|
||||
bikas)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
|
||||
|
@ -41,8 +46,6 @@ Release 2.3.0 - UNRELEASED
|
|||
YARN-1098. Separate out RM services into Always On and Active (Karthik
|
||||
Kambatla via bikas)
|
||||
|
||||
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
|
||||
|
||||
YARN-353. Add Zookeeper-based store implementation for RMStateStore.
|
||||
(Bikas Saha, Jian He and Karthik Kambatla via hitesh)
|
||||
|
||||
|
|
|
@ -41,7 +41,9 @@ public class HAUtil {
|
|||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||
YarnConfiguration.RM_WEBAPP_ADDRESS));
|
||||
YarnConfiguration.RM_WEBAPP_ADDRESS,
|
||||
// TODO Remove after YARN-1318
|
||||
YarnConfiguration.RM_HA_ADMIN_ADDRESS));
|
||||
|
||||
public static final String BAD_CONFIG_MESSAGE_PREFIX =
|
||||
"Invalid configuration! ";
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Arrays;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -278,10 +279,22 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String RM_HA_PREFIX = RM_PREFIX + "ha.";
|
||||
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
|
||||
public static final boolean DEFAULT_RM_HA_ENABLED = false;
|
||||
|
||||
|
||||
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
|
||||
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
|
||||
|
||||
@org.apache.hadoop.classification.InterfaceAudience.Private
|
||||
// TODO Remove after YARN-1318
|
||||
public static final String RM_HA_ADMIN_ADDRESS =
|
||||
RM_HA_PREFIX + "admin.address";
|
||||
public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034;
|
||||
public static String DEFAULT_RM_HA_ADMIN_ADDRESS =
|
||||
"0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT;
|
||||
public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT =
|
||||
RM_HA_PREFIX + "admin.client.thread-count";
|
||||
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
|
||||
// end @Private
|
||||
|
||||
////////////////////////////////
|
||||
// RM state store configs
|
||||
////////////////////////////////
|
||||
|
@ -753,6 +766,11 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String
|
||||
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
|
||||
"security.resourcelocalizer.protocol.acl";
|
||||
@org.apache.hadoop.classification.InterfaceAudience.Private
|
||||
// TODO Remove after YARN-1318
|
||||
public static final String
|
||||
YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL =
|
||||
CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL;
|
||||
|
||||
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
|
||||
* to a running container */
|
||||
|
@ -911,4 +929,14 @@ public class YarnConfiguration extends Configuration {
|
|||
}
|
||||
return NetUtils.createSocketAddr(address, defaultPort, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress updateConnectAddr(String name,
|
||||
InetSocketAddress addr) {
|
||||
String prefix = name;
|
||||
if (HAUtil.isHAEnabled(this)) {
|
||||
prefix = HAUtil.addSuffix(prefix, HAUtil.getRMHAId(this));
|
||||
}
|
||||
return super.updateConnectAddr(prefix, addr);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,18 +20,26 @@ package org.apache.hadoop.yarn.client.cli;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.ha.HAAdmin;
|
||||
import org.apache.hadoop.ha.HAServiceTarget;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
|
@ -44,11 +52,35 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
|
|||
|
||||
@Private
|
||||
@Unstable
|
||||
public class RMAdminCLI extends Configured implements Tool {
|
||||
public class RMAdminCLI extends HAAdmin {
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
protected final static Map<String, UsageInfo> ADMIN_USAGE =
|
||||
ImmutableMap.<String, UsageInfo>builder()
|
||||
.put("-refreshQueues", new UsageInfo("",
|
||||
"Reload the queues' acls, states and scheduler specific " +
|
||||
"properties. \n\t\tResourceManager will reload the " +
|
||||
"mapred-queues configuration file."))
|
||||
.put("-refreshNodes", new UsageInfo("",
|
||||
"Refresh the hosts information at the ResourceManager."))
|
||||
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
|
||||
"Refresh superuser proxy groups mappings"))
|
||||
.put("-refreshUserToGroupsMappings", new UsageInfo("",
|
||||
"Refresh user-to-groups mappings"))
|
||||
.put("-refreshAdminAcls", new UsageInfo("",
|
||||
"Refresh acls for administration of ResourceManager"))
|
||||
.put("-refreshServiceAcl", new UsageInfo("",
|
||||
"Reload the service-level authorization policy file. \n\t\t" +
|
||||
"ResoureceManager will reload the authorization policy file."))
|
||||
.put("-getGroups", new UsageInfo("[username]",
|
||||
"Get the groups which given user belongs to."))
|
||||
.put("-help", new UsageInfo("[cmd]",
|
||||
"Displays help for the given command or all commands if none " +
|
||||
"is specified."))
|
||||
.build();
|
||||
|
||||
public RMAdminCLI() {
|
||||
super();
|
||||
}
|
||||
|
@ -57,10 +89,64 @@ public class RMAdminCLI extends Configured implements Tool {
|
|||
super(conf);
|
||||
}
|
||||
|
||||
private static void appendHAUsage(final StringBuilder usageBuilder) {
|
||||
for (String cmdKey : USAGE.keySet()) {
|
||||
if (cmdKey.equals("-help")) {
|
||||
continue;
|
||||
}
|
||||
UsageInfo usageInfo = USAGE.get(cmdKey);
|
||||
usageBuilder.append(" [" + cmdKey + " " + usageInfo.args + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private static void buildHelpMsg(String cmd, StringBuilder builder) {
|
||||
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
|
||||
if (usageInfo == null) {
|
||||
usageInfo = USAGE.get(cmd);
|
||||
if (usageInfo == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
String space = (usageInfo.args == "") ? "" : " ";
|
||||
builder.append(" " + cmd + space + usageInfo.args + ": " +
|
||||
usageInfo.help);
|
||||
}
|
||||
|
||||
private static void buildIndividualUsageMsg(String cmd,
|
||||
StringBuilder builder ) {
|
||||
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
|
||||
if (usageInfo == null) {
|
||||
usageInfo = USAGE.get(cmd);
|
||||
if (usageInfo == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
String space = (usageInfo.args == "") ? "" : " ";
|
||||
builder.append("Usage: java RMAdmin ["
|
||||
+ cmd + space + usageInfo.args
|
||||
+ "]\n");
|
||||
}
|
||||
|
||||
private static void buildUsageMsg(StringBuilder builder) {
|
||||
builder.append("Usage: java RMAdmin");
|
||||
for (String cmdKey : ADMIN_USAGE.keySet()) {
|
||||
UsageInfo usageInfo = ADMIN_USAGE.get(cmdKey);
|
||||
builder.append(" " + cmdKey + " " + usageInfo.args + "\n");
|
||||
}
|
||||
for (String cmdKey : USAGE.keySet()) {
|
||||
if (!cmdKey.equals("-help")) {
|
||||
UsageInfo usageInfo = USAGE.get(cmdKey);
|
||||
builder.append(" " + cmdKey + " " + usageInfo.args + "\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void printHelp(String cmd) {
|
||||
String summary = "rmadmin is the command to execute Map-Reduce administrative commands.\n" +
|
||||
"The full syntax is: \n\n" +
|
||||
"hadoop rmadmin" +
|
||||
StringBuilder summary = new StringBuilder();
|
||||
summary.append("rmadmin is the command to execute YARN administrative " +
|
||||
"commands.\n");
|
||||
summary.append("The full syntax is: \n\n" +
|
||||
"yarn rmadmin" +
|
||||
" [-refreshQueues]" +
|
||||
" [-refreshNodes]" +
|
||||
" [-refreshSuperUserGroupsConfiguration]" +
|
||||
|
@ -68,64 +154,25 @@ public class RMAdminCLI extends Configured implements Tool {
|
|||
" [-refreshAdminAcls]" +
|
||||
" [-refreshServiceAcl]" +
|
||||
" [-getGroup [username]]" +
|
||||
" [-help [cmd]]\n";
|
||||
" [-help [cmd]]");
|
||||
appendHAUsage(summary);
|
||||
summary.append("\n");
|
||||
|
||||
String refreshQueues =
|
||||
"-refreshQueues: Reload the queues' acls, states and "
|
||||
+ "scheduler specific properties.\n"
|
||||
+ "\t\tResourceManager will reload the mapred-queues configuration file.\n";
|
||||
|
||||
String refreshNodes =
|
||||
"-refreshNodes: Refresh the hosts information at the ResourceManager.\n";
|
||||
|
||||
String refreshUserToGroupsMappings =
|
||||
"-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
|
||||
|
||||
String refreshSuperUserGroupsConfiguration =
|
||||
"-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n";
|
||||
|
||||
String refreshAdminAcls =
|
||||
"-refreshAdminAcls: Refresh acls for administration of ResourceManager\n";
|
||||
|
||||
String refreshServiceAcl =
|
||||
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
|
||||
"\t\tResoureceManager will reload the authorization policy file.\n";
|
||||
|
||||
String getGroups =
|
||||
"-getGroups [username]: Get the groups which given user belongs to\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||
"\t\tis specified.\n";
|
||||
|
||||
if ("refreshQueues".equals(cmd)) {
|
||||
System.out.println(refreshQueues);
|
||||
} else if ("refreshNodes".equals(cmd)) {
|
||||
System.out.println(refreshNodes);
|
||||
} else if ("refreshUserToGroupsMappings".equals(cmd)) {
|
||||
System.out.println(refreshUserToGroupsMappings);
|
||||
} else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
} else if ("refreshAdminAcls".equals(cmd)) {
|
||||
System.out.println(refreshAdminAcls);
|
||||
} else if ("refreshServiceAcl".equals(cmd)) {
|
||||
System.out.println(refreshServiceAcl);
|
||||
} else if ("getGroups".equals(cmd)) {
|
||||
System.out.println(getGroups);
|
||||
} else if ("help".equals(cmd)) {
|
||||
System.out.println(help);
|
||||
} else {
|
||||
System.out.println(summary);
|
||||
System.out.println(refreshQueues);
|
||||
System.out.println(refreshNodes);
|
||||
System.out.println(refreshUserToGroupsMappings);
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
System.out.println(refreshAdminAcls);
|
||||
System.out.println(refreshServiceAcl);
|
||||
System.out.println(getGroups);
|
||||
System.out.println(help);
|
||||
System.out.println();
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
StringBuilder helpBuilder = new StringBuilder();
|
||||
System.out.println(summary);
|
||||
for (String cmdKey : ADMIN_USAGE.keySet()) {
|
||||
buildHelpMsg(cmdKey, helpBuilder);
|
||||
helpBuilder.append("\n");
|
||||
}
|
||||
for (String cmdKey : USAGE.keySet()) {
|
||||
if (!cmdKey.equals("-help")) {
|
||||
buildHelpMsg(cmdKey, helpBuilder);
|
||||
helpBuilder.append("\n");
|
||||
}
|
||||
}
|
||||
System.out.println(helpBuilder);
|
||||
System.out.println();
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,33 +180,15 @@ public class RMAdminCLI extends Configured implements Tool {
|
|||
* @param cmd The command that is being executed.
|
||||
*/
|
||||
private static void printUsage(String cmd) {
|
||||
if ("-refreshQueues".equals(cmd)) {
|
||||
System.err.println("Usage: java RMAdmin" + " [-refreshQueues]");
|
||||
} else if ("-refreshNodes".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-refreshNodes]");
|
||||
} else if ("-refreshUserToGroupsMappings".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-refreshUserToGroupsMappings]");
|
||||
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-refreshSuperUserGroupsConfiguration]");
|
||||
} else if ("-refreshAdminAcls".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]");
|
||||
} else if ("-refreshService".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-refreshServiceAcl]");
|
||||
} else if ("-getGroups".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-getGroups [username]]");
|
||||
StringBuilder usageBuilder = new StringBuilder();
|
||||
if (ADMIN_USAGE.containsKey(cmd) || USAGE.containsKey(cmd)) {
|
||||
buildIndividualUsageMsg(cmd, usageBuilder);
|
||||
} else {
|
||||
System.err.println("Usage: java RMAdmin");
|
||||
System.err.println(" [-refreshQueues]");
|
||||
System.err.println(" [-refreshNodes]");
|
||||
System.err.println(" [-refreshUserToGroupsMappings]");
|
||||
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
|
||||
System.err.println(" [-refreshAdminAcls]");
|
||||
System.err.println(" [-refreshServiceAcl]");
|
||||
System.err.println(" [-getGroups [username]]");
|
||||
System.err.println(" [-help [cmd]]");
|
||||
System.err.println();
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
buildUsageMsg(usageBuilder);
|
||||
}
|
||||
System.err.println(usageBuilder);
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
|
||||
}
|
||||
|
||||
protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
|
||||
|
@ -255,6 +284,21 @@ public class RMAdminCLI extends Configured implements Tool {
|
|||
int exitCode = -1;
|
||||
int i = 0;
|
||||
String cmd = args[i++];
|
||||
|
||||
exitCode = 0;
|
||||
if ("-help".equals(cmd)) {
|
||||
if (i < args.length) {
|
||||
printUsage(args[i]);
|
||||
} else {
|
||||
printHelp("");
|
||||
}
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
if (USAGE.containsKey(cmd)) {
|
||||
return super.run(args);
|
||||
}
|
||||
|
||||
//
|
||||
// verify that we have enough command line parameters
|
||||
//
|
||||
|
@ -268,7 +312,6 @@ public class RMAdminCLI extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
exitCode = 0;
|
||||
try {
|
||||
if ("-refreshQueues".equals(cmd)) {
|
||||
exitCode = refreshQueues();
|
||||
|
@ -285,12 +328,6 @@ public class RMAdminCLI extends Configured implements Tool {
|
|||
} else if ("-getGroups".equals(cmd)) {
|
||||
String[] usernames = Arrays.copyOfRange(args, i, args.length);
|
||||
exitCode = getGroups(usernames);
|
||||
} else if ("-help".equals(cmd)) {
|
||||
if (i < args.length) {
|
||||
printUsage(args[i]);
|
||||
} else {
|
||||
printHelp("");
|
||||
}
|
||||
} else {
|
||||
exitCode = -1;
|
||||
System.err.println(cmd.substring(1) + ": Unknown command");
|
||||
|
@ -324,6 +361,40 @@ public class RMAdminCLI extends Configured implements Tool {
|
|||
return exitCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
if (conf != null) {
|
||||
if (!(conf instanceof YarnConfiguration)) {
|
||||
conf = new YarnConfiguration(conf);
|
||||
}
|
||||
}
|
||||
super.setConf(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HAServiceTarget resolveTarget(String rmId) {
|
||||
Collection<String> rmIds = HAUtil.getRMHAIds(getConf());
|
||||
if (!rmIds.contains(rmId)) {
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append(rmId + " is not a valid serviceId. It should be one of ");
|
||||
for (String id : rmIds) {
|
||||
msg.append(id + " ");
|
||||
}
|
||||
throw new IllegalArgumentException(msg.toString());
|
||||
}
|
||||
try {
|
||||
YarnConfiguration conf = new YarnConfiguration(getConf());
|
||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||
return new RMHAServiceTarget(conf);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
throw new YarnRuntimeException("Could not connect to " + rmId +
|
||||
"; the configuration for it might be missing");
|
||||
} catch (IOException ioe) {
|
||||
throw new YarnRuntimeException(
|
||||
"Could not connect to RM HA Admin for node " + rmId);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int result = ToolRunner.run(new RMAdminCLI(), args);
|
||||
System.exit(result);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -31,7 +32,12 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HAServiceTarget;
|
||||
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
|
@ -46,11 +52,20 @@ import org.mockito.ArgumentMatcher;
|
|||
public class TestRMAdminCLI {
|
||||
|
||||
private ResourceManagerAdministrationProtocol admin;
|
||||
private HAServiceProtocol haadmin;
|
||||
private RMAdminCLI rmAdminCLI;
|
||||
|
||||
@Before
|
||||
public void configure() {
|
||||
public void configure() throws IOException {
|
||||
admin = mock(ResourceManagerAdministrationProtocol.class);
|
||||
|
||||
haadmin = mock(HAServiceProtocol.class);
|
||||
when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus(
|
||||
HAServiceProtocol.HAServiceState.INITIALIZING));
|
||||
|
||||
final HAServiceTarget haServiceTarget = mock(HAServiceTarget.class);
|
||||
when(haServiceTarget.getProxy(any(Configuration.class), anyInt()))
|
||||
.thenReturn(haadmin);
|
||||
rmAdminCLI = new RMAdminCLI() {
|
||||
|
||||
@Override
|
||||
|
@ -58,7 +73,11 @@ public class TestRMAdminCLI {
|
|||
throws IOException {
|
||||
return admin;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected HAServiceTarget resolveTarget(String rmId) {
|
||||
return haServiceTarget;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -128,6 +147,36 @@ public class TestRMAdminCLI {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 500)
|
||||
public void testTransitionToActive() throws Exception {
|
||||
String[] args = {"-transitionToActive", "rm1"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
verify(haadmin).transitionToActive(
|
||||
any(HAServiceProtocol.StateChangeRequestInfo.class));
|
||||
}
|
||||
|
||||
@Test(timeout = 500)
|
||||
public void testTransitionToStandby() throws Exception {
|
||||
String[] args = {"-transitionToStandby", "rm1"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
verify(haadmin).transitionToStandby(
|
||||
any(HAServiceProtocol.StateChangeRequestInfo.class));
|
||||
}
|
||||
|
||||
@Test(timeout = 500)
|
||||
public void testGetServiceState() throws Exception {
|
||||
String[] args = {"-getServiceState", "rm1"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
verify(haadmin).getServiceStatus();
|
||||
}
|
||||
|
||||
@Test(timeout = 500)
|
||||
public void testCheckHealth() throws Exception {
|
||||
String[] args = {"-checkHealth", "rm1"};
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
verify(haadmin).monitorHealth();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test printing of help messages
|
||||
*/
|
||||
|
@ -142,18 +191,22 @@ public class TestRMAdminCLI {
|
|||
try {
|
||||
String[] args = { "-help" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
oldOutPrintStream.println(dataOut);
|
||||
assertTrue(dataOut
|
||||
.toString()
|
||||
.contains(
|
||||
"rmadmin is the command to execute Map-Reduce" +
|
||||
" administrative commands."));
|
||||
"rmadmin is the command to execute YARN administrative commands."));
|
||||
assertTrue(dataOut
|
||||
.toString()
|
||||
.contains(
|
||||
"hadoop rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
|
||||
"yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
|
||||
"UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
|
||||
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
|
||||
" [username]] [-help [cmd]]"));
|
||||
" [username]] [-help [cmd]] [-transitionToActive <serviceId>]" +
|
||||
" [-transitionToStandby <serviceId>] [-failover [--forcefence] " +
|
||||
"[--forceactive] <serviceId> <serviceId>] " +
|
||||
"[-getServiceState <serviceId>] [-checkHealth <serviceId>]"
|
||||
));
|
||||
assertTrue(dataOut
|
||||
.toString()
|
||||
.contains(
|
||||
|
@ -184,7 +237,7 @@ public class TestRMAdminCLI {
|
|||
assertTrue(dataOut
|
||||
.toString()
|
||||
.contains(
|
||||
"-help [cmd]: \tDisplays help for the given command or all " +
|
||||
"-help [cmd]: Displays help for the given command or all " +
|
||||
"commands if none"));
|
||||
|
||||
testError(new String[] { "-help", "-refreshQueues" },
|
||||
|
@ -199,12 +252,24 @@ public class TestRMAdminCLI {
|
|||
dataErr, 0);
|
||||
testError(new String[] { "-help", "-refreshAdminAcls" },
|
||||
"Usage: java RMAdmin [-refreshAdminAcls]", dataErr, 0);
|
||||
testError(new String[] { "-help", "-refreshService" },
|
||||
testError(new String[] { "-help", "-refreshServiceAcl" },
|
||||
"Usage: java RMAdmin [-refreshServiceAcl]", dataErr, 0);
|
||||
testError(new String[] { "-help", "-getGroups" },
|
||||
"Usage: java RMAdmin [-getGroups [username]]", dataErr, 0);
|
||||
testError(new String[] { "-help", "-transitionToActive" },
|
||||
"Usage: java RMAdmin [-transitionToActive <serviceId>]", dataErr, 0);
|
||||
testError(new String[] { "-help", "-transitionToStandby" },
|
||||
"Usage: java RMAdmin [-transitionToStandby <serviceId>]", dataErr, 0);
|
||||
testError(new String[] { "-help", "-getServiceState" },
|
||||
"Usage: java RMAdmin [-getServiceState <serviceId>]", dataErr, 0);
|
||||
testError(new String[] { "-help", "-checkHealth" },
|
||||
"Usage: java RMAdmin [-checkHealth <serviceId>]", dataErr, 0);
|
||||
testError(new String[] { "-help", "-failover" },
|
||||
"Usage: java RMAdmin " +
|
||||
"[-failover [--forcefence] [--forceactive] " +
|
||||
"<serviceId> <serviceId>]",
|
||||
dataErr, 0);
|
||||
|
||||
|
||||
testError(new String[] { "-help", "-badParameter" },
|
||||
"Usage: java RMAdmin", dataErr, 0);
|
||||
testError(new String[] { "-badParameter" },
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import org.apache.hadoop.ha.BadFencingConfigurationException;
|
||||
import org.apache.hadoop.ha.HAServiceTarget;
|
||||
import org.apache.hadoop.ha.NodeFencer;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
public class RMHAServiceTarget extends HAServiceTarget {
|
||||
private InetSocketAddress haAdminServiceAddress;
|
||||
|
||||
public RMHAServiceTarget(YarnConfiguration conf)
|
||||
throws IOException {
|
||||
haAdminServiceAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getAddress() {
|
||||
return haAdminServiceAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getZKFCAddress() {
|
||||
// TODO (YARN-1177): Hook up ZKFC information
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeFencer getFencer() {
|
||||
// TODO (YARN-1026): Hook up fencing implementation
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkFencingConfigured()
|
||||
throws BadFencingConfigurationException {
|
||||
// TODO (YARN-1026): Based on fencing implementation
|
||||
}
|
||||
}
|
|
@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Groups;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
|
@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
|
||||
|
@ -135,36 +133,11 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
|
|||
}
|
||||
|
||||
private UserGroupInformation checkAcls(String method) throws YarnException {
|
||||
UserGroupInformation user;
|
||||
try {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get current user", ioe);
|
||||
|
||||
RMAuditLogger.logFailure("UNKNOWN", method,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"Couldn't get current user");
|
||||
throw RPCUtil.getRemoteException(ioe);
|
||||
}
|
||||
|
||||
if (!adminAcl.isUserAllowed(user)) {
|
||||
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
|
||||
" to call '" + method + "'");
|
||||
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), method,
|
||||
adminAcl.toString(), "AdminService",
|
||||
AuditConstants.UNAUTHORIZED_USER);
|
||||
|
||||
throw RPCUtil.getRemoteException(
|
||||
new AccessControlException("User " + user.getShortUserName() +
|
||||
" doesn't have permission" +
|
||||
" to call '" + method + "'")
|
||||
);
|
||||
}
|
||||
LOG.info("RM Admin: " + method + " invoked by user " +
|
||||
user.getShortUserName());
|
||||
|
||||
return user;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,20 +20,41 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
|
||||
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
|
||||
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.WritableRpcEngine;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* Internal class to handle HA related aspects of the {@link ResourceManager}.
|
||||
*
|
||||
* TODO (YARN-1318): Some/ all of this functionality should be merged with
|
||||
* {@link AdminService}. Currently, marking this as Private and Unstable for
|
||||
* those reasons.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class RMHAProtocolService extends AbstractService implements
|
||||
|
@ -44,6 +65,8 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
private ResourceManager rm;
|
||||
@VisibleForTesting
|
||||
protected HAServiceState haState = HAServiceState.INITIALIZING;
|
||||
private AccessControlList adminAcl;
|
||||
private Server haAdminServer;
|
||||
private boolean haEnabled;
|
||||
|
||||
public RMHAProtocolService(ResourceManager resourceManager) {
|
||||
|
@ -59,6 +82,9 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
if (haEnabled) {
|
||||
HAUtil.verifyAndSetConfiguration(conf);
|
||||
rm.setConf(this.conf);
|
||||
adminAcl = new AccessControlList(conf.get(
|
||||
YarnConfiguration.YARN_ADMIN_ACL,
|
||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||
}
|
||||
rm.createAndInitActiveServices();
|
||||
super.serviceInit(this.conf);
|
||||
|
@ -68,6 +94,7 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
protected synchronized void serviceStart() throws Exception {
|
||||
if (haEnabled) {
|
||||
transitionToStandby(true);
|
||||
startHAAdminServer();
|
||||
} else {
|
||||
transitionToActive();
|
||||
}
|
||||
|
@ -77,13 +104,70 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
|
||||
@Override
|
||||
protected synchronized void serviceStop() throws Exception {
|
||||
if (haEnabled) {
|
||||
stopHAAdminServer();
|
||||
}
|
||||
transitionToStandby(false);
|
||||
haState = HAServiceState.STOPPING;
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
||||
protected void startHAAdminServer() throws Exception {
|
||||
InetSocketAddress haAdminServiceAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
|
||||
|
||||
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
|
||||
new HAServiceProtocolServerSideTranslatorPB(this);
|
||||
BlockingService haPbService =
|
||||
HAServiceProtocolProtos.HAServiceProtocolService
|
||||
.newReflectiveBlockingService(haServiceProtocolXlator);
|
||||
|
||||
WritableRpcEngine.ensureInitialized();
|
||||
|
||||
String bindHost = haAdminServiceAddress.getHostName();
|
||||
|
||||
int serviceHandlerCount = conf.getInt(
|
||||
YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT);
|
||||
|
||||
haAdminServer = new RPC.Builder(conf)
|
||||
.setProtocol(HAServiceProtocolPB.class)
|
||||
.setInstance(haPbService)
|
||||
.setBindAddress(bindHost)
|
||||
.setPort(haAdminServiceAddress.getPort())
|
||||
.setNumHandlers(serviceHandlerCount)
|
||||
.setVerbose(false)
|
||||
.build();
|
||||
|
||||
// Enable service authorization?
|
||||
if (conf.getBoolean(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
||||
haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider());
|
||||
}
|
||||
|
||||
haAdminServer.start();
|
||||
conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS,
|
||||
haAdminServer.getListenerAddress());
|
||||
}
|
||||
|
||||
private void stopHAAdminServer() throws Exception {
|
||||
if (haAdminServer != null) {
|
||||
haAdminServer.stop();
|
||||
haAdminServer.join();
|
||||
haAdminServer = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void monitorHealth() throws HealthCheckFailedException {
|
||||
public synchronized void monitorHealth()
|
||||
throws IOException {
|
||||
checkAccess("monitorHealth");
|
||||
if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
|
||||
throw new HealthCheckFailedException(
|
||||
"Active ResourceManager services are not running!");
|
||||
|
@ -103,14 +187,21 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) {
|
||||
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
UserGroupInformation user = checkAccess("transitionToActive");
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToActive();
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"transitionToActive", "RMHAProtocolService");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error when transitioning to Active mode", e);
|
||||
throw new YarnRuntimeException(e);
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
|
||||
adminAcl.toString(), "RMHAProtocolService",
|
||||
"Exception transitioning to active");
|
||||
throw new ServiceFailedException(
|
||||
"Error when transitioning to Active mode", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,19 +224,27 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) {
|
||||
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
UserGroupInformation user = checkAccess("transitionToStandby");
|
||||
// TODO (YARN-1177): When automatic failover is enabled,
|
||||
// check if transition should be allowed for this request
|
||||
try {
|
||||
transitionToStandby(true);
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"transitionToStandby", "RMHAProtocolService");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error when transitioning to Standby mode", e);
|
||||
throw new YarnRuntimeException(e);
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
|
||||
adminAcl.toString(), "RMHAProtocolService",
|
||||
"Exception transitioning to standby");
|
||||
throw new ServiceFailedException(
|
||||
"Error when transitioning to Standby mode", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized HAServiceStatus getServiceStatus() throws IOException {
|
||||
checkAccess("getServiceState");
|
||||
HAServiceStatus ret = new HAServiceStatus(haState);
|
||||
if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) {
|
||||
ret.setReadyToBecomeActive();
|
||||
|
@ -154,4 +253,8 @@ public class RMHAProtocolService extends AbstractService implements
|
|||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private UserGroupInformation checkAccess(String method) throws IOException {
|
||||
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,22 +18,24 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
|
||||
|
@ -115,4 +117,44 @@ public class RMServerUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to verify if the current user has access based on the
|
||||
* passed {@link AccessControlList}
|
||||
* @param acl the {@link AccessControlList} to check against
|
||||
* @param method the method name to be logged
|
||||
* @param LOG the logger to use
|
||||
* @return {@link UserGroupInformation} of the current user
|
||||
* @throws IOException
|
||||
*/
|
||||
public static UserGroupInformation verifyAccess(
|
||||
AccessControlList acl, String method, final Log LOG)
|
||||
throws IOException {
|
||||
UserGroupInformation user;
|
||||
try {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get current user", ioe);
|
||||
RMAuditLogger.logFailure("UNKNOWN", method, acl.toString(),
|
||||
"AdminService", "Couldn't get current user");
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
if (!acl.isUserAllowed(user)) {
|
||||
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
|
||||
" to call '" + method + "'");
|
||||
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), method,
|
||||
acl.toString(), "AdminService",
|
||||
RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
|
||||
|
||||
throw new AccessControlException("User " + user.getShortUserName() +
|
||||
" doesn't have permission" +
|
||||
" to call '" + method + "'");
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(method + " invoked by user " + user.getShortUserName());
|
||||
}
|
||||
return user;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,7 +182,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
validateConfigs(conf);
|
||||
this.conf = conf;
|
||||
|
||||
haService = new RMHAProtocolService(this);
|
||||
haService = createRMHAProtocolService();
|
||||
addService(haService);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
@ -198,6 +198,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
((RMContextImpl) rmContext).setStateStore(rmStore);
|
||||
}
|
||||
|
||||
protected RMHAProtocolService createRMHAProtocolService() {
|
||||
return new RMHAProtocolService(this);
|
||||
}
|
||||
|
||||
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
|
||||
Configuration conf) {
|
||||
return new RMContainerTokenSecretManager(conf);
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.security.authorize;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.Service;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
|
||||
|
@ -52,6 +53,9 @@ public class RMPolicyProvider extends PolicyProvider {
|
|||
new Service(
|
||||
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL,
|
||||
ContainerManagementProtocolPB.class),
|
||||
new Service(
|
||||
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL,
|
||||
HAServiceProtocol.class),
|
||||
};
|
||||
|
||||
@Override
|
||||
|
|
|
@ -296,6 +296,16 @@ public class MockRM extends ResourceManager {
|
|||
.handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RMHAProtocolService createRMHAProtocolService() {
|
||||
return new RMHAProtocolService(this) {
|
||||
@Override
|
||||
protected void startHAAdminServer() {
|
||||
// do nothing
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
||||
|
|
|
@ -60,7 +60,7 @@ public class TestRMHA {
|
|||
rm.init(conf);
|
||||
}
|
||||
|
||||
private void checkMonitorHealth() {
|
||||
private void checkMonitorHealth() throws IOException {
|
||||
try {
|
||||
rm.haService.monitorHealth();
|
||||
} catch (HealthCheckFailedException e) {
|
||||
|
|
Loading…
Reference in New Issue