Merge r1536888 from trunk to branch-2 for YARN-1068. Add admin support for HA operations (Karthik Kambatla via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1536899 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-10-29 21:21:06 +00:00
parent 11b73ddbcc
commit 330ada83eb
14 changed files with 525 additions and 150 deletions

View File

@ -63,7 +63,7 @@ public abstract class HAAdmin extends Configured implements Tool {
private int rpcTimeoutForChecks = -1; private int rpcTimeoutForChecks = -1;
private static Map<String, UsageInfo> USAGE = protected final static Map<String, UsageInfo> USAGE =
ImmutableMap.<String, UsageInfo>builder() ImmutableMap.<String, UsageInfo>builder()
.put("-transitionToActive", .put("-transitionToActive",
new UsageInfo("<serviceId>", "Transitions the service into Active state")) new UsageInfo("<serviceId>", "Transitions the service into Active state"))
@ -91,6 +91,14 @@ public abstract class HAAdmin extends Configured implements Tool {
protected PrintStream out = System.out; protected PrintStream out = System.out;
private RequestSource requestSource = RequestSource.REQUEST_BY_USER; private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
protected HAAdmin() {
super();
}
protected HAAdmin(Configuration conf) {
super(conf);
}
protected abstract HAServiceTarget resolveTarget(String string); protected abstract HAServiceTarget resolveTarget(String string);
protected String getUsageString() { protected String getUsageString() {
@ -461,9 +469,9 @@ public abstract class HAAdmin extends Configured implements Tool {
return 0; return 0;
} }
private static class UsageInfo { protected static class UsageInfo {
private final String args; public final String args;
private final String help; public final String help;
public UsageInfo(String args, String help) { public UsageInfo(String args, String help) {
this.args = args; this.args = args;

View File

@ -14,6 +14,11 @@ Release 2.3.0 - UNRELEASED
YARN-1010. FairScheduler: decouple container scheduling from nodemanager YARN-1010. FairScheduler: decouple container scheduling from nodemanager
heartbeats. (Wei Yan via Sandy Ryza) heartbeats. (Wei Yan via Sandy Ryza)
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
YARN-1068. Add admin support for HA operations (Karthik Kambatla via
bikas)
IMPROVEMENTS IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
@ -23,8 +28,6 @@ Release 2.3.0 - UNRELEASED
YARN-1098. Separate out RM services into Always On and Active (Karthik YARN-1098. Separate out RM services into Always On and Active (Karthik
Kambatla via bikas) Kambatla via bikas)
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
YARN-353. Add Zookeeper-based store implementation for RMStateStore. YARN-353. Add Zookeeper-based store implementation for RMStateStore.
(Bikas Saha, Jian He and Karthik Kambatla via hitesh) (Bikas Saha, Jian He and Karthik Kambatla via hitesh)

View File

@ -41,7 +41,9 @@ public class HAUtil {
YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.RM_WEBAPP_ADDRESS)); YarnConfiguration.RM_WEBAPP_ADDRESS,
// TODO Remove after YARN-1318
YarnConfiguration.RM_HA_ADMIN_ADDRESS));
public static final String BAD_CONFIG_MESSAGE_PREFIX = public static final String BAD_CONFIG_MESSAGE_PREFIX =
"Invalid configuration! "; "Invalid configuration! ";

View File

@ -24,6 +24,7 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -278,10 +279,22 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_PREFIX = RM_PREFIX + "ha."; public static final String RM_HA_PREFIX = RM_PREFIX + "ha.";
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
public static final boolean DEFAULT_RM_HA_ENABLED = false; public static final boolean DEFAULT_RM_HA_ENABLED = false;
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
public static final String RM_HA_ID = RM_HA_PREFIX + "id"; public static final String RM_HA_ID = RM_HA_PREFIX + "id";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String RM_HA_ADMIN_ADDRESS =
RM_HA_PREFIX + "admin.address";
public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034;
public static String DEFAULT_RM_HA_ADMIN_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT;
public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT =
RM_HA_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
// end @Private
//////////////////////////////// ////////////////////////////////
// RM state store configs // RM state store configs
//////////////////////////////// ////////////////////////////////
@ -753,6 +766,11 @@ public class YarnConfiguration extends Configuration {
public static final String public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl"; "security.resourcelocalizer.protocol.acl";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL =
CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL;
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL /** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */ * to a running container */
@ -911,4 +929,14 @@ public class YarnConfiguration extends Configuration {
} }
return NetUtils.createSocketAddr(address, defaultPort, name); return NetUtils.createSocketAddr(address, defaultPort, name);
} }
@Override
public InetSocketAddress updateConnectAddr(String name,
InetSocketAddress addr) {
String prefix = name;
if (HAUtil.isHAEnabled(this)) {
prefix = HAUtil.addSuffix(prefix, HAUtil.getRMHAId(this));
}
return super.updateConnectAddr(prefix, addr);
}
} }

View File

@ -20,18 +20,26 @@ package org.apache.hadoop.yarn.client.cli;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ha.HAAdmin;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
@ -44,11 +52,35 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
@Private @Private
@Unstable @Unstable
public class RMAdminCLI extends Configured implements Tool { public class RMAdminCLI extends HAAdmin {
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
protected final static Map<String, UsageInfo> ADMIN_USAGE =
ImmutableMap.<String, UsageInfo>builder()
.put("-refreshQueues", new UsageInfo("",
"Reload the queues' acls, states and scheduler specific " +
"properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file."))
.put("-refreshNodes", new UsageInfo("",
"Refresh the hosts information at the ResourceManager."))
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
"Refresh superuser proxy groups mappings"))
.put("-refreshUserToGroupsMappings", new UsageInfo("",
"Refresh user-to-groups mappings"))
.put("-refreshAdminAcls", new UsageInfo("",
"Refresh acls for administration of ResourceManager"))
.put("-refreshServiceAcl", new UsageInfo("",
"Reload the service-level authorization policy file. \n\t\t" +
"ResoureceManager will reload the authorization policy file."))
.put("-getGroups", new UsageInfo("[username]",
"Get the groups which given user belongs to."))
.put("-help", new UsageInfo("[cmd]",
"Displays help for the given command or all commands if none " +
"is specified."))
.build();
public RMAdminCLI() { public RMAdminCLI() {
super(); super();
} }
@ -57,10 +89,64 @@ public class RMAdminCLI extends Configured implements Tool {
super(conf); super(conf);
} }
private static void appendHAUsage(final StringBuilder usageBuilder) {
for (String cmdKey : USAGE.keySet()) {
if (cmdKey.equals("-help")) {
continue;
}
UsageInfo usageInfo = USAGE.get(cmdKey);
usageBuilder.append(" [" + cmdKey + " " + usageInfo.args + "]");
}
}
private static void buildHelpMsg(String cmd, StringBuilder builder) {
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
if (usageInfo == null) {
usageInfo = USAGE.get(cmd);
if (usageInfo == null) {
return;
}
}
String space = (usageInfo.args == "") ? "" : " ";
builder.append(" " + cmd + space + usageInfo.args + ": " +
usageInfo.help);
}
private static void buildIndividualUsageMsg(String cmd,
StringBuilder builder ) {
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
if (usageInfo == null) {
usageInfo = USAGE.get(cmd);
if (usageInfo == null) {
return;
}
}
String space = (usageInfo.args == "") ? "" : " ";
builder.append("Usage: java RMAdmin ["
+ cmd + space + usageInfo.args
+ "]\n");
}
private static void buildUsageMsg(StringBuilder builder) {
builder.append("Usage: java RMAdmin");
for (String cmdKey : ADMIN_USAGE.keySet()) {
UsageInfo usageInfo = ADMIN_USAGE.get(cmdKey);
builder.append(" " + cmdKey + " " + usageInfo.args + "\n");
}
for (String cmdKey : USAGE.keySet()) {
if (!cmdKey.equals("-help")) {
UsageInfo usageInfo = USAGE.get(cmdKey);
builder.append(" " + cmdKey + " " + usageInfo.args + "\n");
}
}
}
private static void printHelp(String cmd) { private static void printHelp(String cmd) {
String summary = "rmadmin is the command to execute Map-Reduce administrative commands.\n" + StringBuilder summary = new StringBuilder();
"The full syntax is: \n\n" + summary.append("rmadmin is the command to execute YARN administrative " +
"hadoop rmadmin" + "commands.\n");
summary.append("The full syntax is: \n\n" +
"yarn rmadmin" +
" [-refreshQueues]" + " [-refreshQueues]" +
" [-refreshNodes]" + " [-refreshNodes]" +
" [-refreshSuperUserGroupsConfiguration]" + " [-refreshSuperUserGroupsConfiguration]" +
@ -68,64 +154,25 @@ public class RMAdminCLI extends Configured implements Tool {
" [-refreshAdminAcls]" + " [-refreshAdminAcls]" +
" [-refreshServiceAcl]" + " [-refreshServiceAcl]" +
" [-getGroup [username]]" + " [-getGroup [username]]" +
" [-help [cmd]]\n"; " [-help [cmd]]");
appendHAUsage(summary);
summary.append("\n");
String refreshQueues = StringBuilder helpBuilder = new StringBuilder();
"-refreshQueues: Reload the queues' acls, states and " System.out.println(summary);
+ "scheduler specific properties.\n" for (String cmdKey : ADMIN_USAGE.keySet()) {
+ "\t\tResourceManager will reload the mapred-queues configuration file.\n"; buildHelpMsg(cmdKey, helpBuilder);
helpBuilder.append("\n");
String refreshNodes =
"-refreshNodes: Refresh the hosts information at the ResourceManager.\n";
String refreshUserToGroupsMappings =
"-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
String refreshSuperUserGroupsConfiguration =
"-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n";
String refreshAdminAcls =
"-refreshAdminAcls: Refresh acls for administration of ResourceManager\n";
String refreshServiceAcl =
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tResoureceManager will reload the authorization policy file.\n";
String getGroups =
"-getGroups [username]: Get the groups which given user belongs to\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
if ("refreshQueues".equals(cmd)) {
System.out.println(refreshQueues);
} else if ("refreshNodes".equals(cmd)) {
System.out.println(refreshNodes);
} else if ("refreshUserToGroupsMappings".equals(cmd)) {
System.out.println(refreshUserToGroupsMappings);
} else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
System.out.println(refreshSuperUserGroupsConfiguration);
} else if ("refreshAdminAcls".equals(cmd)) {
System.out.println(refreshAdminAcls);
} else if ("refreshServiceAcl".equals(cmd)) {
System.out.println(refreshServiceAcl);
} else if ("getGroups".equals(cmd)) {
System.out.println(getGroups);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
System.out.println(summary);
System.out.println(refreshQueues);
System.out.println(refreshNodes);
System.out.println(refreshUserToGroupsMappings);
System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshAdminAcls);
System.out.println(refreshServiceAcl);
System.out.println(getGroups);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
} }
for (String cmdKey : USAGE.keySet()) {
if (!cmdKey.equals("-help")) {
buildHelpMsg(cmdKey, helpBuilder);
helpBuilder.append("\n");
}
}
System.out.println(helpBuilder);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
} }
/** /**
@ -133,33 +180,15 @@ public class RMAdminCLI extends Configured implements Tool {
* @param cmd The command that is being executed. * @param cmd The command that is being executed.
*/ */
private static void printUsage(String cmd) { private static void printUsage(String cmd) {
if ("-refreshQueues".equals(cmd)) { StringBuilder usageBuilder = new StringBuilder();
System.err.println("Usage: java RMAdmin" + " [-refreshQueues]"); if (ADMIN_USAGE.containsKey(cmd) || USAGE.containsKey(cmd)) {
} else if ("-refreshNodes".equals(cmd)){ buildIndividualUsageMsg(cmd, usageBuilder);
System.err.println("Usage: java RMAdmin" + " [-refreshNodes]");
} else if ("-refreshUserToGroupsMappings".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshUserToGroupsMappings]");
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshSuperUserGroupsConfiguration]");
} else if ("-refreshAdminAcls".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]");
} else if ("-refreshService".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshServiceAcl]");
} else if ("-getGroups".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-getGroups [username]]");
} else { } else {
System.err.println("Usage: java RMAdmin"); buildUsageMsg(usageBuilder);
System.err.println(" [-refreshQueues]");
System.err.println(" [-refreshNodes]");
System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshAdminAcls]");
System.err.println(" [-refreshServiceAcl]");
System.err.println(" [-getGroups [username]]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
} }
System.err.println(usageBuilder);
ToolRunner.printGenericCommandUsage(System.err);
} }
protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
@ -255,6 +284,21 @@ public class RMAdminCLI extends Configured implements Tool {
int exitCode = -1; int exitCode = -1;
int i = 0; int i = 0;
String cmd = args[i++]; String cmd = args[i++];
exitCode = 0;
if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);
} else {
printHelp("");
}
return exitCode;
}
if (USAGE.containsKey(cmd)) {
return super.run(args);
}
// //
// verify that we have enough command line parameters // verify that we have enough command line parameters
// //
@ -268,7 +312,6 @@ public class RMAdminCLI extends Configured implements Tool {
} }
} }
exitCode = 0;
try { try {
if ("-refreshQueues".equals(cmd)) { if ("-refreshQueues".equals(cmd)) {
exitCode = refreshQueues(); exitCode = refreshQueues();
@ -285,12 +328,6 @@ public class RMAdminCLI extends Configured implements Tool {
} else if ("-getGroups".equals(cmd)) { } else if ("-getGroups".equals(cmd)) {
String[] usernames = Arrays.copyOfRange(args, i, args.length); String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames); exitCode = getGroups(usernames);
} else if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);
} else {
printHelp("");
}
} else { } else {
exitCode = -1; exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command"); System.err.println(cmd.substring(1) + ": Unknown command");
@ -324,6 +361,40 @@ public class RMAdminCLI extends Configured implements Tool {
return exitCode; return exitCode;
} }
@Override
public void setConf(Configuration conf) {
if (conf != null) {
if (!(conf instanceof YarnConfiguration)) {
conf = new YarnConfiguration(conf);
}
}
super.setConf(conf);
}
@Override
protected HAServiceTarget resolveTarget(String rmId) {
Collection<String> rmIds = HAUtil.getRMHAIds(getConf());
if (!rmIds.contains(rmId)) {
StringBuilder msg = new StringBuilder();
msg.append(rmId + " is not a valid serviceId. It should be one of ");
for (String id : rmIds) {
msg.append(id + " ");
}
throw new IllegalArgumentException(msg.toString());
}
try {
YarnConfiguration conf = new YarnConfiguration(getConf());
conf.set(YarnConfiguration.RM_HA_ID, rmId);
return new RMHAServiceTarget(conf);
} catch (IllegalArgumentException iae) {
throw new YarnRuntimeException("Could not connect to " + rmId +
"; the configuration for it might be missing");
} catch (IOException ioe) {
throw new YarnRuntimeException(
"Could not connect to RM HA Admin for node " + rmId);
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new RMAdminCLI(), args); int result = ToolRunner.run(new RMAdminCLI(), args);
System.exit(result); System.exit(result);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -31,7 +32,12 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -46,11 +52,20 @@ import org.mockito.ArgumentMatcher;
public class TestRMAdminCLI { public class TestRMAdminCLI {
private ResourceManagerAdministrationProtocol admin; private ResourceManagerAdministrationProtocol admin;
private HAServiceProtocol haadmin;
private RMAdminCLI rmAdminCLI; private RMAdminCLI rmAdminCLI;
@Before @Before
public void configure() { public void configure() throws IOException {
admin = mock(ResourceManagerAdministrationProtocol.class); admin = mock(ResourceManagerAdministrationProtocol.class);
haadmin = mock(HAServiceProtocol.class);
when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus(
HAServiceProtocol.HAServiceState.INITIALIZING));
final HAServiceTarget haServiceTarget = mock(HAServiceTarget.class);
when(haServiceTarget.getProxy(any(Configuration.class), anyInt()))
.thenReturn(haadmin);
rmAdminCLI = new RMAdminCLI() { rmAdminCLI = new RMAdminCLI() {
@Override @Override
@ -58,7 +73,11 @@ public class TestRMAdminCLI {
throws IOException { throws IOException {
return admin; return admin;
} }
@Override
protected HAServiceTarget resolveTarget(String rmId) {
return haServiceTarget;
}
}; };
} }
@ -128,6 +147,36 @@ public class TestRMAdminCLI {
} }
} }
@Test(timeout = 500)
public void testTransitionToActive() throws Exception {
String[] args = {"-transitionToActive", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).transitionToActive(
any(HAServiceProtocol.StateChangeRequestInfo.class));
}
@Test(timeout = 500)
public void testTransitionToStandby() throws Exception {
String[] args = {"-transitionToStandby", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).transitionToStandby(
any(HAServiceProtocol.StateChangeRequestInfo.class));
}
@Test(timeout = 500)
public void testGetServiceState() throws Exception {
String[] args = {"-getServiceState", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).getServiceStatus();
}
@Test(timeout = 500)
public void testCheckHealth() throws Exception {
String[] args = {"-checkHealth", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).monitorHealth();
}
/** /**
* Test printing of help messages * Test printing of help messages
*/ */
@ -142,18 +191,22 @@ public class TestRMAdminCLI {
try { try {
String[] args = { "-help" }; String[] args = { "-help" };
assertEquals(0, rmAdminCLI.run(args)); assertEquals(0, rmAdminCLI.run(args));
oldOutPrintStream.println(dataOut);
assertTrue(dataOut assertTrue(dataOut
.toString() .toString()
.contains( .contains(
"rmadmin is the command to execute Map-Reduce" + "rmadmin is the command to execute YARN administrative commands."));
" administrative commands."));
assertTrue(dataOut assertTrue(dataOut
.toString() .toString()
.contains( .contains(
"hadoop rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
"UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
" [username]] [-help [cmd]]")); " [username]] [-help [cmd]] [-transitionToActive <serviceId>]" +
" [-transitionToStandby <serviceId>] [-failover [--forcefence] " +
"[--forceactive] <serviceId> <serviceId>] " +
"[-getServiceState <serviceId>] [-checkHealth <serviceId>]"
));
assertTrue(dataOut assertTrue(dataOut
.toString() .toString()
.contains( .contains(
@ -184,7 +237,7 @@ public class TestRMAdminCLI {
assertTrue(dataOut assertTrue(dataOut
.toString() .toString()
.contains( .contains(
"-help [cmd]: \tDisplays help for the given command or all " + "-help [cmd]: Displays help for the given command or all " +
"commands if none")); "commands if none"));
testError(new String[] { "-help", "-refreshQueues" }, testError(new String[] { "-help", "-refreshQueues" },
@ -199,12 +252,24 @@ public class TestRMAdminCLI {
dataErr, 0); dataErr, 0);
testError(new String[] { "-help", "-refreshAdminAcls" }, testError(new String[] { "-help", "-refreshAdminAcls" },
"Usage: java RMAdmin [-refreshAdminAcls]", dataErr, 0); "Usage: java RMAdmin [-refreshAdminAcls]", dataErr, 0);
testError(new String[] { "-help", "-refreshService" }, testError(new String[] { "-help", "-refreshServiceAcl" },
"Usage: java RMAdmin [-refreshServiceAcl]", dataErr, 0); "Usage: java RMAdmin [-refreshServiceAcl]", dataErr, 0);
testError(new String[] { "-help", "-getGroups" }, testError(new String[] { "-help", "-getGroups" },
"Usage: java RMAdmin [-getGroups [username]]", dataErr, 0); "Usage: java RMAdmin [-getGroups [username]]", dataErr, 0);
testError(new String[] { "-help", "-transitionToActive" },
"Usage: java RMAdmin [-transitionToActive <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-transitionToStandby" },
"Usage: java RMAdmin [-transitionToStandby <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-getServiceState" },
"Usage: java RMAdmin [-getServiceState <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-checkHealth" },
"Usage: java RMAdmin [-checkHealth <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-failover" },
"Usage: java RMAdmin " +
"[-failover [--forcefence] [--forceactive] " +
"<serviceId> <serviceId>]",
dataErr, 0);
testError(new String[] { "-help", "-badParameter" }, testError(new String[] { "-help", "-badParameter" },
"Usage: java RMAdmin", dataErr, 0); "Usage: java RMAdmin", dataErr, 0);
testError(new String[] { "-badParameter" }, testError(new String[] { "-badParameter" },

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
import java.net.InetSocketAddress;
public class RMHAServiceTarget extends HAServiceTarget {
private InetSocketAddress haAdminServiceAddress;
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
}
@Override
public InetSocketAddress getAddress() {
return haAdminServiceAddress;
}
@Override
public InetSocketAddress getZKFCAddress() {
// TODO (YARN-1177): Hook up ZKFC information
return null;
}
@Override
public NodeFencer getFencer() {
// TODO (YARN-1026): Hook up fencing implementation
return null;
}
@Override
public void checkFencingConfigured()
throws BadFencingConfigurationException {
// TODO (YARN-1026): Based on fencing implementation
}
}

View File

@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@ -135,36 +133,11 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
} }
private UserGroupInformation checkAcls(String method) throws YarnException { private UserGroupInformation checkAcls(String method) throws YarnException {
UserGroupInformation user;
try { try {
user = UserGroupInformation.getCurrentUser(); return RMServerUtils.verifyAccess(adminAcl, method, LOG);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
RMAuditLogger.logFailure("UNKNOWN", method,
adminAcl.toString(), "AdminService",
"Couldn't get current user");
throw RPCUtil.getRemoteException(ioe); throw RPCUtil.getRemoteException(ioe);
} }
if (!adminAcl.isUserAllowed(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");
RMAuditLogger.logFailure(user.getShortUserName(), method,
adminAcl.toString(), "AdminService",
AuditConstants.UNAUTHORIZED_USER);
throw RPCUtil.getRemoteException(
new AccessControlException("User " + user.getShortUserName() +
" doesn't have permission" +
" to call '" + method + "'")
);
}
LOG.info("RM Admin: " + method + " invoked by user " +
user.getShortUserName());
return user;
} }
@Override @Override

View File

@ -20,20 +20,41 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
/**
* Internal class to handle HA related aspects of the {@link ResourceManager}.
*
* TODO (YARN-1318): Some/ all of this functionality should be merged with
* {@link AdminService}. Currently, marking this as Private and Unstable for
* those reasons.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class RMHAProtocolService extends AbstractService implements public class RMHAProtocolService extends AbstractService implements
@ -44,6 +65,8 @@ public class RMHAProtocolService extends AbstractService implements
private ResourceManager rm; private ResourceManager rm;
@VisibleForTesting @VisibleForTesting
protected HAServiceState haState = HAServiceState.INITIALIZING; protected HAServiceState haState = HAServiceState.INITIALIZING;
private AccessControlList adminAcl;
private Server haAdminServer;
private boolean haEnabled; private boolean haEnabled;
public RMHAProtocolService(ResourceManager resourceManager) { public RMHAProtocolService(ResourceManager resourceManager) {
@ -59,6 +82,9 @@ public class RMHAProtocolService extends AbstractService implements
if (haEnabled) { if (haEnabled) {
HAUtil.verifyAndSetConfiguration(conf); HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(this.conf); rm.setConf(this.conf);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
} }
rm.createAndInitActiveServices(); rm.createAndInitActiveServices();
super.serviceInit(this.conf); super.serviceInit(this.conf);
@ -68,6 +94,7 @@ public class RMHAProtocolService extends AbstractService implements
protected synchronized void serviceStart() throws Exception { protected synchronized void serviceStart() throws Exception {
if (haEnabled) { if (haEnabled) {
transitionToStandby(true); transitionToStandby(true);
startHAAdminServer();
} else { } else {
transitionToActive(); transitionToActive();
} }
@ -77,13 +104,70 @@ public class RMHAProtocolService extends AbstractService implements
@Override @Override
protected synchronized void serviceStop() throws Exception { protected synchronized void serviceStop() throws Exception {
if (haEnabled) {
stopHAAdminServer();
}
transitionToStandby(false); transitionToStandby(false);
haState = HAServiceState.STOPPING; haState = HAServiceState.STOPPING;
super.serviceStop(); super.serviceStop();
} }
protected void startHAAdminServer() throws Exception {
InetSocketAddress haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService =
HAServiceProtocolProtos.HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
WritableRpcEngine.ensureInitialized();
String bindHost = haAdminServiceAddress.getHostName();
int serviceHandlerCount = conf.getInt(
YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT);
haAdminServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(bindHost)
.setPort(haAdminServiceAddress.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.build();
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider());
}
haAdminServer.start();
conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS,
haAdminServer.getListenerAddress());
}
private void stopHAAdminServer() throws Exception {
if (haAdminServer != null) {
haAdminServer.stop();
haAdminServer.join();
haAdminServer = null;
}
}
@Override @Override
public synchronized void monitorHealth() throws HealthCheckFailedException { public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) { if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException( throw new HealthCheckFailedException(
"Active ResourceManager services are not running!"); "Active ResourceManager services are not running!");
@ -103,14 +187,21 @@ public class RMHAProtocolService extends AbstractService implements
} }
@Override @Override
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) { public synchronized void transitionToActive(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled, // TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request // check if transition should be allowed for this request
try { try {
transitionToActive(); transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error when transitioning to Active mode", e); RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
throw new YarnRuntimeException(e); adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
} }
} }
@ -133,19 +224,27 @@ public class RMHAProtocolService extends AbstractService implements
} }
@Override @Override
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) { public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled, // TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request // check if transition should be allowed for this request
try { try {
transitionToStandby(true); transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error when transitioning to Standby mode", e); RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
throw new YarnRuntimeException(e); adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
} }
} }
@Override @Override
public synchronized HAServiceStatus getServiceStatus() throws IOException { public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceStatus ret = new HAServiceStatus(haState); HAServiceStatus ret = new HAServiceStatus(haState);
if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) { if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) {
ret.setReadyToBecomeActive(); ret.setReadyToBecomeActive();
@ -154,4 +253,8 @@ public class RMHAProtocolService extends AbstractService implements
} }
return ret; return ret;
} }
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
}
} }

View File

@ -18,22 +18,24 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -115,4 +117,44 @@ public class RMServerUtils {
} }
} }
} }
/**
* Utility method to verify if the current user has access based on the
* passed {@link AccessControlList}
* @param acl the {@link AccessControlList} to check against
* @param method the method name to be logged
* @param LOG the logger to use
* @return {@link UserGroupInformation} of the current user
* @throws IOException
*/
public static UserGroupInformation verifyAccess(
AccessControlList acl, String method, final Log LOG)
throws IOException {
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
RMAuditLogger.logFailure("UNKNOWN", method, acl.toString(),
"AdminService", "Couldn't get current user");
throw ioe;
}
if (!acl.isUserAllowed(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");
RMAuditLogger.logFailure(user.getShortUserName(), method,
acl.toString(), "AdminService",
RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
throw new AccessControlException("User " + user.getShortUserName() +
" doesn't have permission" +
" to call '" + method + "'");
}
if (LOG.isTraceEnabled()) {
LOG.trace(method + " invoked by user " + user.getShortUserName());
}
return user;
}
} }

View File

@ -182,7 +182,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
validateConfigs(conf); validateConfigs(conf);
this.conf = conf; this.conf = conf;
haService = new RMHAProtocolService(this); haService = createRMHAProtocolService();
addService(haService); addService(haService);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -198,6 +198,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
((RMContextImpl) rmContext).setStateStore(rmStore); ((RMContextImpl) rmContext).setStateStore(rmStore);
} }
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this);
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager( protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) { Configuration conf) {
return new RMContainerTokenSecretManager(conf); return new RMContainerTokenSecretManager(conf);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.security.authorize;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
@ -52,6 +53,9 @@ public class RMPolicyProvider extends PolicyProvider {
new Service( new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL, YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL,
ContainerManagementProtocolPB.class), ContainerManagementProtocolPB.class),
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL,
HAServiceProtocol.class),
}; };
@Override @Override

View File

@ -296,6 +296,16 @@ public class MockRM extends ResourceManager {
.handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed")); .handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed"));
} }
@Override
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this) {
@Override
protected void startHAAdminServer() {
// do nothing
}
};
}
@Override @Override
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(), return new ClientRMService(getRMContext(), getResourceScheduler(),

View File

@ -60,7 +60,7 @@ public class TestRMHA {
rm.init(conf); rm.init(conf);
} }
private void checkMonitorHealth() { private void checkMonitorHealth() throws IOException {
try { try {
rm.haService.monitorHealth(); rm.haService.monitorHealth();
} catch (HealthCheckFailedException e) { } catch (HealthCheckFailedException e) {