YARN-10458. Hive On Tez queries fails upon submission to dynamically created pools. Contributed by Peter Bacsko.

This commit is contained in:
Peter Bacsko 2020-11-10 13:16:52 +01:00
parent aa3807ed1d
commit 0361837c9e
5 changed files with 219 additions and 5 deletions

View File

@ -927,6 +927,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
return usernameUsedForPlacement; return usernameUsedForPlacement;
} }
String queue = appPlacementContext.getQueue(); String queue = appPlacementContext.getQueue();
String parent = appPlacementContext.getParentQueue();
if (scheduler instanceof CapacityScheduler && parent != null) {
queue = parent + "." + queue;
}
if (callerUGI != null && scheduler if (callerUGI != null && scheduler
.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) { .checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) {
usernameUsedForPlacement = userNameFromAppTag; usernameUsedForPlacement = userNameFromAppTag;

View File

@ -2283,6 +2283,21 @@ public class CapacityScheduler extends
public boolean checkAccess(UserGroupInformation callerUGI, public boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) { QueueACL acl, String queueName) {
CSQueue queue = getQueue(queueName); CSQueue queue = getQueue(queueName);
if (queueName.startsWith("root.")) {
// can only check proper ACLs if the path is fully qualified
while (queue == null) {
int sepIndex = queueName.lastIndexOf(".");
String parentName = queueName.substring(0, sepIndex);
if (LOG.isDebugEnabled()) {
LOG.debug("Queue {} does not exist, checking parent {}",
queueName, parentName);
}
queueName = parentName;
queue = queueManager.getQueue(queueName);
}
}
if (queue == null) { if (queue == null) {
LOG.debug("ACL not found for queue access-type {} for queue {}", LOG.debug("ACL not found for queue access-type {} for queue {}",
acl, queueName); acl, queueName);
@ -3294,4 +3309,9 @@ public class CapacityScheduler extends
public boolean placementConstraintEnabled() { public boolean placementConstraintEnabled() {
return true; return true;
} }
@VisibleForTesting
public void setQueueManager(CapacitySchedulerQueueManager qm) {
this.queueManager = qm;
}
} }

View File

@ -84,11 +84,17 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -102,6 +108,7 @@ import java.util.concurrent.ConcurrentMap;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
@ -120,12 +127,16 @@ import static org.mockito.Mockito.when;
*/ */
public class TestAppManager extends AppManagerTestBase{ public class TestAppManager extends AppManagerTestBase{
@Rule
public UseCapacitySchedulerRule shouldUseCs = new UseCapacitySchedulerRule();
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestAppManager.class); LoggerFactory.getLogger(TestAppManager.class);
private static RMAppEventType appEventType = RMAppEventType.KILL; private static RMAppEventType appEventType = RMAppEventType.KILL;
private static String USER = "user_"; private static String USER = "user_";
private static String USER0 = USER + 0; private static String USER0 = USER + 0;
private ResourceScheduler scheduler;
private static final String USER_ID_PREFIX = "userid="; private static final String USER_ID_PREFIX = "userid=";
@ -227,7 +238,13 @@ public class TestAppManager extends AppManagerTestBase{
rmContext = mockRMContext(1, now - 10); rmContext = mockRMContext(1, now - 10);
rmContext rmContext
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class)); .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
ResourceScheduler scheduler = mockResourceScheduler();
if (shouldUseCs.useCapacityScheduler()) {
scheduler = mockResourceScheduler(CapacityScheduler.class);
} else {
scheduler = mockResourceScheduler();
}
((RMContextImpl)rmContext).setScheduler(scheduler); ((RMContextImpl)rmContext).setScheduler(scheduler);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -880,7 +897,7 @@ public class TestAppManager extends AppManagerTestBase{
new int[]{ 1, 1, 1, 1 }}; new int[]{ 1, 1, 1, 1 }};
for (int i = 0; i < globalMaxAppAttempts.length; ++i) { for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
for (int j = 0; j < individualMaxAppAttempts.length; ++j) { for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
ResourceScheduler scheduler = mockResourceScheduler(); scheduler = mockResourceScheduler();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
globalMaxAppAttempts[i]); globalMaxAppAttempts[i]);
@ -1061,7 +1078,12 @@ public class TestAppManager extends AppManagerTestBase{
} }
private static ResourceScheduler mockResourceScheduler() { private static ResourceScheduler mockResourceScheduler() {
ResourceScheduler scheduler = mock(ResourceScheduler.class); return mockResourceScheduler(ResourceScheduler.class);
}
private static <T extends ResourceScheduler> ResourceScheduler
mockResourceScheduler(Class<T> schedulerClass) {
ResourceScheduler scheduler = mock(schedulerClass);
when(scheduler.getMinimumResourceCapability()).thenReturn( when(scheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource( Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
@ -1299,6 +1321,51 @@ public class TestAppManager extends AppManagerTestBase{
Assert.assertEquals(expectedUser, userNameForPlacement); Assert.assertEquals(expectedUser, userNameForPlacement);
} }
@Test
@UseMockCapacityScheduler
public void testCheckAccessFullPathWithCapacityScheduler()
throws YarnException {
// make sure we only combine "parent + queue" if CS is selected
testCheckAccess("root.users", "hadoop");
}
@Test
@UseMockCapacityScheduler
public void testCheckAccessLeafQueueOnlyWithCapacityScheduler()
throws YarnException {
// make sure we that NPE is avoided if there's no parent defined
testCheckAccess(null, "hadoop");
}
private void testCheckAccess(String parent, String queue)
throws YarnException {
enableApplicationTagPlacement(true, "hadoop");
String userIdTag = USER_ID_PREFIX + "hadoop";
setApplicationTags("tag1", userIdTag, "tag2");
PlacementManager placementMgr = mock(PlacementManager.class);
ApplicationPlacementContext appContext;
String expectedQueue;
if (parent == null) {
appContext = new ApplicationPlacementContext(queue);
expectedQueue = queue;
} else {
appContext = new ApplicationPlacementContext(queue, parent);
expectedQueue = parent + "." + queue;
}
when(placementMgr.placeApplication(asContext, "hadoop"))
.thenReturn(appContext);
appMonitor.getUserNameForPlacement("hadoop", asContext, placementMgr);
ArgumentCaptor<String> queueNameCaptor =
ArgumentCaptor.forClass(String.class);
verify(scheduler).checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), queueNameCaptor.capture());
assertEquals("Expected access check for queue",
expectedQueue, queueNameCaptor.getValue());
}
private void enableApplicationTagPlacement(boolean userHasAccessToQueue, private void enableApplicationTagPlacement(boolean userHasAccessToQueue,
String... whiteListedUsers) { String... whiteListedUsers) {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -1307,7 +1374,6 @@ public class TestAppManager extends AppManagerTestBase{
conf.setStrings(YarnConfiguration conf.setStrings(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers); .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers);
((RMContextImpl) rmContext).setYarnConfiguration(conf); ((RMContextImpl) rmContext).setYarnConfiguration(conf);
ResourceScheduler scheduler = mockResourceScheduler();
when(scheduler.checkAccess(any(UserGroupInformation.class), when(scheduler.checkAccess(any(UserGroupInformation.class),
eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class))) eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class)))
.thenReturn(userHasAccessToQueue); .thenReturn(userHasAccessToQueue);
@ -1338,4 +1404,24 @@ public class TestAppManager extends AppManagerTestBase{
Collections.addAll(applicationTags, tags); Collections.addAll(applicationTags, tags);
asContext.setApplicationTags(applicationTags); asContext.setApplicationTags(applicationTags);
} }
private class UseCapacitySchedulerRule extends TestWatcher {
private boolean useCapacityScheduler;
@Override
protected void starting(Description d) {
useCapacityScheduler =
d.getAnnotation(UseMockCapacityScheduler.class) != null;
}
public boolean useCapacityScheduler() {
return useCapacityScheduler;
}
}
@Retention(RetentionPolicy.RUNTIME)
public @interface UseMockCapacityScheduler {
// mark test cases with this which require
// the scheduler type to be CapacityScheduler
}
} }

View File

@ -19,22 +19,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement import org.apache.hadoop.yarn.server.resourcemanager.placement
.ApplicationPlacementContext; .ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -84,7 +89,6 @@ import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -839,4 +843,62 @@ public class TestCapacitySchedulerAutoQueueCreation
} }
} }
} }
@Test
public void testDynamicAutoQueueCreationWithTags()
throws Exception {
MockRM rm = null;
try {
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b"});
csConf.setCapacity("root.a", 90);
csConf.setCapacity("root.b", 10);
csConf.setAutoCreateChildQueueEnabled("root.a", true);
csConf.setAutoCreatedLeafQueueConfigCapacity("root.a", 50);
csConf.setAutoCreatedLeafQueueConfigMaxCapacity("root.a", 100);
csConf.setAcl("root.a", QueueACL.ADMINISTER_QUEUE, "*");
csConf.setAcl("root.a", QueueACL.SUBMIT_APPLICATIONS, "*");
csConf.setBoolean(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true);
csConf.setStrings(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, "hadoop");
csConf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:%user:root.a.%user");
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(csConf);
rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.start();
MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB);
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("apptodynamicqueue")
.withUser("hadoop")
.withAcls(null)
.withUnmanagedAM(false)
.withApplicationTags(Sets.newHashSet("userid=testuser"))
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
MockRM.launchAndRegisterAM(app, rm, nm);
nm.nodeHeartbeat(true);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = cs.getQueue("root.a.testuser");
assertNotNull("Leaf queue has not been auto-created", queue);
assertEquals("Number of running applications", 1,
queue.getNumApplications());
} finally {
if (rm != null) {
rm.close();
}
}
}
} }

View File

@ -17,15 +17,22 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase; import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase;
import org.junit.Test;
public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase { public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
@Override @Override
@ -132,6 +139,7 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
.reinitialize(csConf, resourceManager.getRMContext()); .reinitialize(csConf, resourceManager.getRMContext());
} }
private void setQueueCapacity(CapacitySchedulerConfiguration csConf, private void setQueueCapacity(CapacitySchedulerConfiguration csConf,
float capacity, String queuePath) { float capacity, String queuePath) {
csConf.setCapacity(queuePath, capacity); csConf.setCapacity(queuePath, capacity);
@ -142,4 +150,38 @@ public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl); csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl);
csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl); csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl);
} }
@Test
public void testCheckAccessForUserWithOnlyLeafNameProvided() {
testCheckAccess(false, "dynamicQueue");
}
@Test
public void testCheckAccessForUserWithFullPathProvided() {
testCheckAccess(true, "root.users.dynamicQueue");
}
@Test
public void testCheckAccessForRootQueue() {
testCheckAccess(false, "root");
}
private void testCheckAccess(boolean expectedResult, String queueName) {
CapacitySchedulerQueueManager qm =
mock(CapacitySchedulerQueueManager.class);
CSQueue root = mock(ParentQueue.class);
CSQueue users = mock(ManagedParentQueue.class);
when(qm.getQueue("root")).thenReturn(root);
when(qm.getQueue("root.users")).thenReturn(users);
when(users.hasAccess(any(QueueACL.class),
any(UserGroupInformation.class))).thenReturn(true);
UserGroupInformation mockUGI = mock(UserGroupInformation.class);
CapacityScheduler cs =
(CapacityScheduler) resourceManager.getResourceScheduler();
cs.setQueueManager(qm);
assertEquals("checkAccess() failed", expectedResult,
cs.checkAccess(mockUGI, QueueACL.ADMINISTER_QUEUE, queueName));
}
} }