YARN-53. Added the missing getGroups API to ResourceManager. Contributed by Bo Wang.

svn merge --ignore-ancestry -c 1389176 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1389177 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-09-24 00:52:55 +00:00
parent de4204cce5
commit e7b414289c
10 changed files with 267 additions and 9 deletions

View File

@ -20,6 +20,9 @@ Release 2.0.3-alpha - Unreleased
YARN-82. Change the default local and log dirs to be based on
hadoop.tmp.dir and yarn.log.dir. (Hemanth Yamijala via sseth)
YARN-53. Added the missing getGroups API to ResourceManager. (Bo Wang via
vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -272,4 +272,9 @@ void refreshServiceAcls(Configuration configuration,
this.server.refreshServiceAcl(configuration, policyProvider);
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.api;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse;
@ -32,7 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
public interface RMAdminProtocol {
public interface RMAdminProtocol extends GetUserMappingsProtocol {
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnRemoteException;

View File

@ -19,14 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.client;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
@ -154,5 +156,18 @@ public RefreshServiceAclsResponse refreshServiceAcls(
}
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
GetGroupsForUserRequestProto requestProto =
GetGroupsForUserRequestProto.newBuilder().setUser(user).build();
try {
GetGroupsForUserResponseProto responseProto =
proxy.getGroupsForUser(null, requestProto);
return (String[]) responseProto.getGroupsList().toArray(
new String[responseProto.getGroupsCount()]);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.*;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocolPB;
@ -139,4 +139,22 @@ public RefreshServiceAclsResponseProto refreshServiceAcls(
}
}
@Override
public GetGroupsForUserResponseProto getGroupsForUser(
RpcController controller, GetGroupsForUserRequestProto request)
throws ServiceException {
String user = request.getUser();
try {
String[] groups = real.getGroupsForUser(user);
GetGroupsForUserResponseProto.Builder responseBuilder =
GetGroupsForUserResponseProto.newBuilder();
for (String group : groups) {
responseBuilder.addGroups(group);
}
return responseBuilder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@ -63,6 +64,7 @@ private static void printHelp(String cmd) {
" [-refreshUserToGroupsMappings]" +
" [-refreshAdminAcls]" +
" [-refreshServiceAcl]" +
" [-getGroup [username]]" +
" [-help [cmd]]\n";
String refreshQueues =
@ -81,13 +83,17 @@ private static void printHelp(String cmd) {
String refreshAdminAcls =
"-refreshAdminAcls: Refresh acls for administration of ResourceManager\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
String refreshServiceAcl =
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tResoureceManager will reload the authorization policy file.\n";
String getGroups =
"-getGroups [username]: Get the groups which given user belongs to\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
if ("refreshQueues".equals(cmd)) {
System.out.println(refreshQueues);
} else if ("refreshNodes".equals(cmd)) {
@ -100,6 +106,8 @@ private static void printHelp(String cmd) {
System.out.println(refreshAdminAcls);
} else if ("refreshServiceAcl".equals(cmd)) {
System.out.println(refreshServiceAcl);
} else if ("getGroups".equals(cmd)) {
System.out.println(getGroups);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
@ -110,6 +118,7 @@ private static void printHelp(String cmd) {
System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshAdminAcls);
System.out.println(refreshServiceAcl);
System.out.println(getGroups);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
@ -133,6 +142,8 @@ private static void printUsage(String cmd) {
System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]");
} else if ("-refreshService".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshServiceAcl]");
} else if ("-getGroups".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-getGroups [username]]");
} else {
System.err.println("Usage: java RMAdmin");
System.err.println(" [-refreshQueues]");
@ -141,6 +152,7 @@ private static void printUsage(String cmd) {
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshAdminAcls]");
System.err.println(" [-refreshServiceAcl]");
System.err.println(" [-getGroups [username]]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@ -229,6 +241,27 @@ private int refreshServiceAcls() throws IOException {
return 0;
}
private int getGroups(String[] usernames) throws IOException {
// Get groups users belongs to
RMAdminProtocol adminProtocol = createAdminProtocol();
if (usernames.length == 0) {
usernames = new String[] { UserGroupInformation.getCurrentUser().getUserName() };
}
for (String username : usernames) {
StringBuilder sb = new StringBuilder();
sb.append(username + " :");
for (String group : adminProtocol.getGroupsForUser(username)) {
sb.append(" ");
sb.append(group);
}
System.out.println(sb);
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 1) {
@ -266,6 +299,9 @@ public int run(String[] args) throws Exception {
exitCode = refreshAdminAcls();
} else if ("-refreshServiceAcl".equals(cmd)) {
exitCode = refreshServiceAcls();
} else if ("-getGroups".equals(cmd)) {
String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames);
} else if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);

View File

@ -30,4 +30,5 @@ service RMAdminProtocolService {
rpc refreshUserToGroupsMappings(RefreshUserToGroupsMappingsRequestProto) returns (RefreshUserToGroupsMappingsResponseProto);
rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns (RefreshAdminAclsResponseProto);
rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto);
rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto);
}

View File

@ -52,3 +52,10 @@ message RefreshServiceAclsRequestProto {
message RefreshServiceAclsResponseProto {
}
message GetGroupsForUserRequestProto {
required string user = 1;
}
message GetGroupsForUserResponseProto {
repeated string groups = 1;
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.tools;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.GetGroupsBase;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
public class GetGroupsForTesting extends GetGroupsBase {
public GetGroupsForTesting(Configuration conf) {
super(conf);
}
public GetGroupsForTesting(Configuration conf, PrintStream out) {
super(conf, out);
}
@Override
protected InetSocketAddress getProtocolAddress(Configuration conf)
throws IOException {
return conf.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
}
@Override
public void setConf(Configuration conf) {
conf = new YarnConfiguration(conf);
super.setConf(conf);
}
@Override
protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
Configuration conf = getConf();
final InetSocketAddress addr = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
final YarnRPC rpc = YarnRPC.create(conf);
RMAdminProtocol adminProtocol = (RMAdminProtocol) rpc.getProxy(
RMAdminProtocol.class, addr, getConf());
return adminProtocol;
}
public static void main(String[] argv) throws Exception {
int res = ToolRunner.run(new GetGroupsForTesting(new YarnConfiguration()), argv);
System.exit(res);
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.tools;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.GetGroupsTestBase;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public class TestGetGroups extends GetGroupsTestBase {
private static final Log LOG = LogFactory.getLog(TestGetGroups.class);
private static ResourceManager resourceManager;
private static Configuration conf;
@BeforeClass
public static void setUpResourceManager() throws IOException, InterruptedException {
conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store) {
@Override
protected void doSecureLogin() throws IOException {
};
};
resourceManager.init(conf);
new Thread() {
public void run() {
resourceManager.start();
};
}.start();
int waitCount = 0;
while (resourceManager.getServiceState() == STATE.INITED
&& waitCount++ < 10) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1000);
}
if (resourceManager.getServiceState() != STATE.STARTED) {
throw new IOException(
"ResourceManager failed to start. Final state is "
+ resourceManager.getServiceState());
}
LOG.info("ResourceManager RMAdmin address: " +
conf.get(YarnConfiguration.RM_ADMIN_ADDRESS));
}
@SuppressWarnings("static-access")
@Before
public void setUpConf() {
super.conf = this.conf;
}
@AfterClass
public static void tearDownResourceManager() throws InterruptedException {
if (resourceManager != null) {
LOG.info("Stopping ResourceManager...");
resourceManager.stop();
}
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroupsForTesting(conf, o);
}
}