YARN-10458. Hive On Tez queries fails upon submission to dynamically created pools. (Peter Bacsko via wangda)
Change-Id: I518dc925187ce55e9d35a37ba20878c0f4e37e5c
This commit is contained in:
parent
8ee6bc2518
commit
c47c9fd65d
|
@ -927,6 +927,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
return usernameUsedForPlacement;
|
return usernameUsedForPlacement;
|
||||||
}
|
}
|
||||||
String queue = appPlacementContext.getQueue();
|
String queue = appPlacementContext.getQueue();
|
||||||
|
String parent = appPlacementContext.getParentQueue();
|
||||||
|
if (scheduler instanceof CapacityScheduler && parent != null) {
|
||||||
|
queue = parent + "." + queue;
|
||||||
|
}
|
||||||
if (callerUGI != null && scheduler
|
if (callerUGI != null && scheduler
|
||||||
.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) {
|
.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) {
|
||||||
usernameUsedForPlacement = userNameFromAppTag;
|
usernameUsedForPlacement = userNameFromAppTag;
|
||||||
|
|
|
@ -2290,6 +2290,21 @@ public class CapacityScheduler extends
|
||||||
public boolean checkAccess(UserGroupInformation callerUGI,
|
public boolean checkAccess(UserGroupInformation callerUGI,
|
||||||
QueueACL acl, String queueName) {
|
QueueACL acl, String queueName) {
|
||||||
CSQueue queue = getQueue(queueName);
|
CSQueue queue = getQueue(queueName);
|
||||||
|
|
||||||
|
if (queueName.startsWith("root.")) {
|
||||||
|
// can only check proper ACLs if the path is fully qualified
|
||||||
|
while (queue == null) {
|
||||||
|
int sepIndex = queueName.lastIndexOf(".");
|
||||||
|
String parentName = queueName.substring(0, sepIndex);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Queue {} does not exist, checking parent {}",
|
||||||
|
queueName, parentName);
|
||||||
|
}
|
||||||
|
queueName = parentName;
|
||||||
|
queue = queueManager.getQueue(queueName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
LOG.debug("ACL not found for queue access-type {} for queue {}",
|
LOG.debug("ACL not found for queue access-type {} for queue {}",
|
||||||
acl, queueName);
|
acl, queueName);
|
||||||
|
@ -3307,4 +3322,9 @@ public class CapacityScheduler extends
|
||||||
public void setActivitiesManagerEnabled(boolean enabled) {
|
public void setActivitiesManagerEnabled(boolean enabled) {
|
||||||
this.activitiesManagerEnabled = enabled;
|
this.activitiesManagerEnabled = enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setQueueManager(CapacitySchedulerQueueManager qm) {
|
||||||
|
this.queueManager = qm;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,11 +84,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestWatcher;
|
||||||
|
import org.junit.runner.Description;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -102,6 +108,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
@ -120,12 +127,16 @@ import static org.mockito.Mockito.when;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public class TestAppManager extends AppManagerTestBase{
|
public class TestAppManager extends AppManagerTestBase{
|
||||||
|
@Rule
|
||||||
|
public UseCapacitySchedulerRule shouldUseCs = new UseCapacitySchedulerRule();
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TestAppManager.class);
|
LoggerFactory.getLogger(TestAppManager.class);
|
||||||
private static RMAppEventType appEventType = RMAppEventType.KILL;
|
private static RMAppEventType appEventType = RMAppEventType.KILL;
|
||||||
|
|
||||||
private static String USER = "user_";
|
private static String USER = "user_";
|
||||||
private static String USER0 = USER + 0;
|
private static String USER0 = USER + 0;
|
||||||
|
private ResourceScheduler scheduler;
|
||||||
|
|
||||||
private static final String USER_ID_PREFIX = "userid=";
|
private static final String USER_ID_PREFIX = "userid=";
|
||||||
|
|
||||||
|
@ -227,7 +238,13 @@ public class TestAppManager extends AppManagerTestBase{
|
||||||
rmContext = mockRMContext(1, now - 10);
|
rmContext = mockRMContext(1, now - 10);
|
||||||
rmContext
|
rmContext
|
||||||
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
|
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
|
||||||
ResourceScheduler scheduler = mockResourceScheduler();
|
|
||||||
|
if (shouldUseCs.useCapacityScheduler()) {
|
||||||
|
scheduler = mockResourceScheduler(CapacityScheduler.class);
|
||||||
|
} else {
|
||||||
|
scheduler = mockResourceScheduler();
|
||||||
|
}
|
||||||
|
|
||||||
((RMContextImpl)rmContext).setScheduler(scheduler);
|
((RMContextImpl)rmContext).setScheduler(scheduler);
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -880,7 +897,7 @@ public class TestAppManager extends AppManagerTestBase{
|
||||||
new int[]{ 1, 1, 1, 1 }};
|
new int[]{ 1, 1, 1, 1 }};
|
||||||
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
|
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
|
||||||
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
|
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
|
||||||
ResourceScheduler scheduler = mockResourceScheduler();
|
scheduler = mockResourceScheduler();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
|
conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
|
||||||
globalMaxAppAttempts[i]);
|
globalMaxAppAttempts[i]);
|
||||||
|
@ -1061,7 +1078,12 @@ public class TestAppManager extends AppManagerTestBase{
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ResourceScheduler mockResourceScheduler() {
|
private static ResourceScheduler mockResourceScheduler() {
|
||||||
ResourceScheduler scheduler = mock(ResourceScheduler.class);
|
return mockResourceScheduler(ResourceScheduler.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T extends ResourceScheduler> ResourceScheduler
|
||||||
|
mockResourceScheduler(Class<T> schedulerClass) {
|
||||||
|
ResourceScheduler scheduler = mock(schedulerClass);
|
||||||
when(scheduler.getMinimumResourceCapability()).thenReturn(
|
when(scheduler.getMinimumResourceCapability()).thenReturn(
|
||||||
Resources.createResource(
|
Resources.createResource(
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
|
||||||
|
@ -1299,6 +1321,51 @@ public class TestAppManager extends AppManagerTestBase{
|
||||||
Assert.assertEquals(expectedUser, userNameForPlacement);
|
Assert.assertEquals(expectedUser, userNameForPlacement);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@UseMockCapacityScheduler
|
||||||
|
public void testCheckAccessFullPathWithCapacityScheduler()
|
||||||
|
throws YarnException {
|
||||||
|
// make sure we only combine "parent + queue" if CS is selected
|
||||||
|
testCheckAccess("root.users", "hadoop");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@UseMockCapacityScheduler
|
||||||
|
public void testCheckAccessLeafQueueOnlyWithCapacityScheduler()
|
||||||
|
throws YarnException {
|
||||||
|
// make sure we that NPE is avoided if there's no parent defined
|
||||||
|
testCheckAccess(null, "hadoop");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testCheckAccess(String parent, String queue)
|
||||||
|
throws YarnException {
|
||||||
|
enableApplicationTagPlacement(true, "hadoop");
|
||||||
|
String userIdTag = USER_ID_PREFIX + "hadoop";
|
||||||
|
setApplicationTags("tag1", userIdTag, "tag2");
|
||||||
|
PlacementManager placementMgr = mock(PlacementManager.class);
|
||||||
|
ApplicationPlacementContext appContext;
|
||||||
|
String expectedQueue;
|
||||||
|
if (parent == null) {
|
||||||
|
appContext = new ApplicationPlacementContext(queue);
|
||||||
|
expectedQueue = queue;
|
||||||
|
} else {
|
||||||
|
appContext = new ApplicationPlacementContext(queue, parent);
|
||||||
|
expectedQueue = parent + "." + queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
when(placementMgr.placeApplication(asContext, "hadoop"))
|
||||||
|
.thenReturn(appContext);
|
||||||
|
appMonitor.getUserNameForPlacement("hadoop", asContext, placementMgr);
|
||||||
|
|
||||||
|
ArgumentCaptor<String> queueNameCaptor =
|
||||||
|
ArgumentCaptor.forClass(String.class);
|
||||||
|
verify(scheduler).checkAccess(any(UserGroupInformation.class),
|
||||||
|
any(QueueACL.class), queueNameCaptor.capture());
|
||||||
|
|
||||||
|
assertEquals("Expected access check for queue",
|
||||||
|
expectedQueue, queueNameCaptor.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
private void enableApplicationTagPlacement(boolean userHasAccessToQueue,
|
private void enableApplicationTagPlacement(boolean userHasAccessToQueue,
|
||||||
String... whiteListedUsers) {
|
String... whiteListedUsers) {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -1307,7 +1374,6 @@ public class TestAppManager extends AppManagerTestBase{
|
||||||
conf.setStrings(YarnConfiguration
|
conf.setStrings(YarnConfiguration
|
||||||
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers);
|
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers);
|
||||||
((RMContextImpl) rmContext).setYarnConfiguration(conf);
|
((RMContextImpl) rmContext).setYarnConfiguration(conf);
|
||||||
ResourceScheduler scheduler = mockResourceScheduler();
|
|
||||||
when(scheduler.checkAccess(any(UserGroupInformation.class),
|
when(scheduler.checkAccess(any(UserGroupInformation.class),
|
||||||
eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class)))
|
eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class)))
|
||||||
.thenReturn(userHasAccessToQueue);
|
.thenReturn(userHasAccessToQueue);
|
||||||
|
@ -1338,4 +1404,24 @@ public class TestAppManager extends AppManagerTestBase{
|
||||||
Collections.addAll(applicationTags, tags);
|
Collections.addAll(applicationTags, tags);
|
||||||
asContext.setApplicationTags(applicationTags);
|
asContext.setApplicationTags(applicationTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class UseCapacitySchedulerRule extends TestWatcher {
|
||||||
|
private boolean useCapacityScheduler;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void starting(Description d) {
|
||||||
|
useCapacityScheduler =
|
||||||
|
d.getAnnotation(UseMockCapacityScheduler.class) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean useCapacityScheduler() {
|
||||||
|
return useCapacityScheduler;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
public @interface UseMockCapacityScheduler {
|
||||||
|
// mark test cases with this which require
|
||||||
|
// the scheduler type to be CapacityScheduler
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,22 +19,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||||
.ApplicationPlacementContext;
|
.ApplicationPlacementContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -84,7 +89,6 @@ import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
|
||||||
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -839,4 +843,62 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDynamicAutoQueueCreationWithTags()
|
||||||
|
throws Exception {
|
||||||
|
MockRM rm = null;
|
||||||
|
try {
|
||||||
|
CapacitySchedulerConfiguration csConf
|
||||||
|
= new CapacitySchedulerConfiguration();
|
||||||
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {"a", "b"});
|
||||||
|
csConf.setCapacity("root.a", 90);
|
||||||
|
csConf.setCapacity("root.b", 10);
|
||||||
|
csConf.setAutoCreateChildQueueEnabled("root.a", true);
|
||||||
|
csConf.setAutoCreatedLeafQueueConfigCapacity("root.a", 50);
|
||||||
|
csConf.setAutoCreatedLeafQueueConfigMaxCapacity("root.a", 100);
|
||||||
|
csConf.setAcl("root.a", QueueACL.ADMINISTER_QUEUE, "*");
|
||||||
|
csConf.setAcl("root.a", QueueACL.SUBMIT_APPLICATIONS, "*");
|
||||||
|
csConf.setBoolean(YarnConfiguration
|
||||||
|
.APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true);
|
||||||
|
csConf.setStrings(YarnConfiguration
|
||||||
|
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, "hadoop");
|
||||||
|
csConf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
|
||||||
|
"u:%user:root.a.%user");
|
||||||
|
|
||||||
|
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||||
|
mgr.init(csConf);
|
||||||
|
rm = new MockRM(csConf) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB);
|
||||||
|
|
||||||
|
MockRMAppSubmissionData data =
|
||||||
|
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
|
||||||
|
.withAppName("apptodynamicqueue")
|
||||||
|
.withUser("hadoop")
|
||||||
|
.withAcls(null)
|
||||||
|
.withUnmanagedAM(false)
|
||||||
|
.withApplicationTags(Sets.newHashSet("userid=testuser"))
|
||||||
|
.build();
|
||||||
|
RMApp app = MockRMAppSubmitter.submit(rm, data);
|
||||||
|
MockRM.launchAndRegisterAM(app, rm, nm);
|
||||||
|
nm.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
CSQueue queue = cs.getQueue("root.a.testuser");
|
||||||
|
assertNotNull("Leaf queue has not been auto-created", queue);
|
||||||
|
assertEquals("Number of running applications", 1,
|
||||||
|
queue.getNumApplications());
|
||||||
|
} finally {
|
||||||
|
if (rm != null) {
|
||||||
|
rm.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,15 +17,22 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase;
|
import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
|
public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
|
||||||
@Override
|
@Override
|
||||||
|
@ -132,6 +139,7 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
|
||||||
.reinitialize(csConf, resourceManager.getRMContext());
|
.reinitialize(csConf, resourceManager.getRMContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void setQueueCapacity(CapacitySchedulerConfiguration csConf,
|
private void setQueueCapacity(CapacitySchedulerConfiguration csConf,
|
||||||
float capacity, String queuePath) {
|
float capacity, String queuePath) {
|
||||||
csConf.setCapacity(queuePath, capacity);
|
csConf.setCapacity(queuePath, capacity);
|
||||||
|
@ -142,4 +150,38 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
|
||||||
csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl);
|
csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl);
|
||||||
csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl);
|
csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAccessForUserWithOnlyLeafNameProvided() {
|
||||||
|
testCheckAccess(false, "dynamicQueue");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAccessForUserWithFullPathProvided() {
|
||||||
|
testCheckAccess(true, "root.users.dynamicQueue");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAccessForRootQueue() {
|
||||||
|
testCheckAccess(false, "root");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testCheckAccess(boolean expectedResult, String queueName) {
|
||||||
|
CapacitySchedulerQueueManager qm =
|
||||||
|
mock(CapacitySchedulerQueueManager.class);
|
||||||
|
CSQueue root = mock(ParentQueue.class);
|
||||||
|
CSQueue users = mock(ManagedParentQueue.class);
|
||||||
|
when(qm.getQueue("root")).thenReturn(root);
|
||||||
|
when(qm.getQueue("root.users")).thenReturn(users);
|
||||||
|
when(users.hasAccess(any(QueueACL.class),
|
||||||
|
any(UserGroupInformation.class))).thenReturn(true);
|
||||||
|
UserGroupInformation mockUGI = mock(UserGroupInformation.class);
|
||||||
|
|
||||||
|
CapacityScheduler cs =
|
||||||
|
(CapacityScheduler) resourceManager.getResourceScheduler();
|
||||||
|
cs.setQueueManager(qm);
|
||||||
|
|
||||||
|
assertEquals("checkAccess() failed", expectedResult,
|
||||||
|
cs.checkAccess(mockUGI, QueueACL.ADMINISTER_QUEUE, queueName));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue