YARN-53. Added the missing getGroups API to ResourceManager. Contributed by Bo Wang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1389176 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9c9b3f09ed
commit
d524942289
|
@ -37,6 +37,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
YARN-82. Change the default local and log dirs to be based on
|
||||
hadoop.tmp.dir and yarn.log.dir. (Hemanth Yamijala via sseth)
|
||||
|
||||
YARN-53. Added the missing getGroups API to ResourceManager. (Bo Wang via
|
||||
vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -272,4 +272,9 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
|||
this.server.refreshServiceAcl(configuration, policyProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
return UserGroupInformation.createRemoteUser(user).getGroupNames();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.api;
|
||||
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
|
@ -32,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.Refresh
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
|
||||
public interface RMAdminProtocol {
|
||||
public interface RMAdminProtocol extends GetUserMappingsProtocol {
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws YarnRemoteException;
|
||||
|
||||
|
|
|
@ -19,14 +19,16 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
|
||||
|
@ -154,5 +156,18 @@ public class RMAdminProtocolPBClientImpl implements RMAdminProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
GetGroupsForUserRequestProto requestProto =
|
||||
GetGroupsForUserRequestProto.newBuilder().setUser(user).build();
|
||||
try {
|
||||
GetGroupsForUserResponseProto responseProto =
|
||||
proxy.getGroupsForUser(null, requestProto);
|
||||
return (String[]) responseProto.getGroupsList().toArray(
|
||||
new String[responseProto.getGroupsCount()]);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,9 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.api.impl.pb.service;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocolPB;
|
||||
|
@ -139,4 +139,22 @@ public class RMAdminProtocolPBServiceImpl implements RMAdminProtocolPB {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetGroupsForUserResponseProto getGroupsForUser(
|
||||
RpcController controller, GetGroupsForUserRequestProto request)
|
||||
throws ServiceException {
|
||||
String user = request.getUser();
|
||||
try {
|
||||
String[] groups = real.getGroupsForUser(user);
|
||||
GetGroupsForUserResponseProto.Builder responseBuilder =
|
||||
GetGroupsForUserResponseProto.newBuilder();
|
||||
for (String group : groups) {
|
||||
responseBuilder.addGroups(group);
|
||||
}
|
||||
return responseBuilder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.tools;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
|
@ -63,6 +64,7 @@ public class RMAdmin extends Configured implements Tool {
|
|||
" [-refreshUserToGroupsMappings]" +
|
||||
" [-refreshAdminAcls]" +
|
||||
" [-refreshServiceAcl]" +
|
||||
" [-getGroup [username]]" +
|
||||
" [-help [cmd]]\n";
|
||||
|
||||
String refreshQueues =
|
||||
|
@ -81,12 +83,16 @@ public class RMAdmin extends Configured implements Tool {
|
|||
|
||||
String refreshAdminAcls =
|
||||
"-refreshAdminAcls: Refresh acls for administration of ResourceManager\n";
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||
"\t\tis specified.\n";
|
||||
|
||||
String refreshServiceAcl =
|
||||
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
|
||||
"\t\tResoureceManager will reload the authorization policy file.\n";
|
||||
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
|
||||
"\t\tResoureceManager will reload the authorization policy file.\n";
|
||||
|
||||
String getGroups =
|
||||
"-getGroups [username]: Get the groups which given user belongs to\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||
"\t\tis specified.\n";
|
||||
|
||||
if ("refreshQueues".equals(cmd)) {
|
||||
System.out.println(refreshQueues);
|
||||
|
@ -100,6 +106,8 @@ public class RMAdmin extends Configured implements Tool {
|
|||
System.out.println(refreshAdminAcls);
|
||||
} else if ("refreshServiceAcl".equals(cmd)) {
|
||||
System.out.println(refreshServiceAcl);
|
||||
} else if ("getGroups".equals(cmd)) {
|
||||
System.out.println(getGroups);
|
||||
} else if ("help".equals(cmd)) {
|
||||
System.out.println(help);
|
||||
} else {
|
||||
|
@ -110,6 +118,7 @@ public class RMAdmin extends Configured implements Tool {
|
|||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
System.out.println(refreshAdminAcls);
|
||||
System.out.println(refreshServiceAcl);
|
||||
System.out.println(getGroups);
|
||||
System.out.println(help);
|
||||
System.out.println();
|
||||
ToolRunner.printGenericCommandUsage(System.out);
|
||||
|
@ -133,6 +142,8 @@ public class RMAdmin extends Configured implements Tool {
|
|||
System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]");
|
||||
} else if ("-refreshService".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-refreshServiceAcl]");
|
||||
} else if ("-getGroups".equals(cmd)){
|
||||
System.err.println("Usage: java RMAdmin" + " [-getGroups [username]]");
|
||||
} else {
|
||||
System.err.println("Usage: java RMAdmin");
|
||||
System.err.println(" [-refreshQueues]");
|
||||
|
@ -141,6 +152,7 @@ public class RMAdmin extends Configured implements Tool {
|
|||
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
|
||||
System.err.println(" [-refreshAdminAcls]");
|
||||
System.err.println(" [-refreshServiceAcl]");
|
||||
System.err.println(" [-getGroups [username]]");
|
||||
System.err.println(" [-help [cmd]]");
|
||||
System.err.println();
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
|
@ -229,6 +241,27 @@ public class RMAdmin extends Configured implements Tool {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private int getGroups(String[] usernames) throws IOException {
|
||||
// Get groups users belongs to
|
||||
RMAdminProtocol adminProtocol = createAdminProtocol();
|
||||
|
||||
if (usernames.length == 0) {
|
||||
usernames = new String[] { UserGroupInformation.getCurrentUser().getUserName() };
|
||||
}
|
||||
|
||||
for (String username : usernames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(username + " :");
|
||||
for (String group : adminProtocol.getGroupsForUser(username)) {
|
||||
sb.append(" ");
|
||||
sb.append(group);
|
||||
}
|
||||
System.out.println(sb);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
|
@ -266,6 +299,9 @@ public class RMAdmin extends Configured implements Tool {
|
|||
exitCode = refreshAdminAcls();
|
||||
} else if ("-refreshServiceAcl".equals(cmd)) {
|
||||
exitCode = refreshServiceAcls();
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
String[] usernames = Arrays.copyOfRange(args, i, args.length);
|
||||
exitCode = getGroups(usernames);
|
||||
} else if ("-help".equals(cmd)) {
|
||||
if (i < args.length) {
|
||||
printUsage(args[i]);
|
||||
|
|
|
@ -30,4 +30,5 @@ service RMAdminProtocolService {
|
|||
rpc refreshUserToGroupsMappings(RefreshUserToGroupsMappingsRequestProto) returns (RefreshUserToGroupsMappingsResponseProto);
|
||||
rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns (RefreshAdminAclsResponseProto);
|
||||
rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto);
|
||||
rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto);
|
||||
}
|
||||
|
|
|
@ -52,3 +52,10 @@ message RefreshServiceAclsRequestProto {
|
|||
message RefreshServiceAclsResponseProto {
|
||||
}
|
||||
|
||||
message GetGroupsForUserRequestProto {
|
||||
required string user = 1;
|
||||
}
|
||||
|
||||
message GetGroupsForUserResponseProto {
|
||||
repeated string groups = 1;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.tools;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.GetGroupsBase;
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
|
||||
|
||||
public class GetGroupsForTesting extends GetGroupsBase {
|
||||
|
||||
public GetGroupsForTesting(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public GetGroupsForTesting(Configuration conf, PrintStream out) {
|
||||
super(conf, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InetSocketAddress getProtocolAddress(Configuration conf)
|
||||
throws IOException {
|
||||
return conf.getSocketAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
conf = new YarnConfiguration(conf);
|
||||
super.setConf(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
|
||||
Configuration conf = getConf();
|
||||
|
||||
final InetSocketAddress addr = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
|
||||
RMAdminProtocol adminProtocol = (RMAdminProtocol) rpc.getProxy(
|
||||
RMAdminProtocol.class, addr, getConf());
|
||||
|
||||
return adminProtocol;
|
||||
}
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
int res = ToolRunner.run(new GetGroupsForTesting(new YarnConfiguration()), argv);
|
||||
System.exit(res);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.tools;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.tools.GetGroupsTestBase;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
public class TestGetGroups extends GetGroupsTestBase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestGetGroups.class);
|
||||
|
||||
private static ResourceManager resourceManager;
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpResourceManager() throws IOException, InterruptedException {
|
||||
conf = new YarnConfiguration();
|
||||
Store store = StoreFactory.getStore(conf);
|
||||
resourceManager = new ResourceManager(store) {
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
};
|
||||
};
|
||||
resourceManager.init(conf);
|
||||
new Thread() {
|
||||
public void run() {
|
||||
resourceManager.start();
|
||||
};
|
||||
}.start();
|
||||
int waitCount = 0;
|
||||
while (resourceManager.getServiceState() == STATE.INITED
|
||||
&& waitCount++ < 10) {
|
||||
LOG.info("Waiting for RM to start...");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
if (resourceManager.getServiceState() != STATE.STARTED) {
|
||||
throw new IOException(
|
||||
"ResourceManager failed to start. Final state is "
|
||||
+ resourceManager.getServiceState());
|
||||
}
|
||||
LOG.info("ResourceManager RMAdmin address: " +
|
||||
conf.get(YarnConfiguration.RM_ADMIN_ADDRESS));
|
||||
}
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
@Before
|
||||
public void setUpConf() {
|
||||
super.conf = this.conf;
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownResourceManager() throws InterruptedException {
|
||||
if (resourceManager != null) {
|
||||
LOG.info("Stopping ResourceManager...");
|
||||
resourceManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Tool getTool(PrintStream o) {
|
||||
return new GetGroupsForTesting(conf, o);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue