getApplicationReport call may raise NPE for removed queues. (Jian He via wangda)

(cherry picked from commit 23248f63aa)
This commit is contained in:
Wangda Tan 2016-05-06 15:30:45 -07:00
parent d1aa3e0f89
commit b68e6b1d6d
6 changed files with 88 additions and 43 deletions

View File

@ -304,9 +304,7 @@ public class ClientRMService extends AbstractService implements
return applicationsACLsManager return applicationsACLsManager
.checkAccess(callerUGI, operationPerformed, owner, .checkAccess(callerUGI, operationPerformed, owner,
application.getApplicationId()) || queueACLsManager application.getApplicationId()) || queueACLsManager
.checkAccess(callerUGI, QueueACL.ADMINISTER_QUEUE, .checkAccess(callerUGI, QueueACL.ADMINISTER_QUEUE, application);
application.getQueue(), application.getApplicationId(),
application.getName());
} }
ApplicationId getNewApplicationId() { ApplicationId getNewApplicationId() {

View File

@ -18,20 +18,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.security; package org.apache.hadoop.yarn.server.resourcemanager.security;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AccessRequest; import org.apache.hadoop.yarn.security.AccessRequest;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
public class QueueACLsManager { public class QueueACLsManager {
private static final Log LOG = LogFactory.getLog(QueueACLsManager.class);
private ResourceScheduler scheduler; private ResourceScheduler scheduler;
private boolean isACLsEnable; private boolean isACLsEnable;
private YarnAuthorizationProvider authorizer; private YarnAuthorizationProvider authorizer;
@ -49,17 +56,28 @@ public class QueueACLsManager {
} }
public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl,
String queueName, ApplicationId appId, String appName) { RMApp app) {
if (!isACLsEnable) { if (!isACLsEnable) {
return true; return true;
} }
if (scheduler instanceof CapacityScheduler) { if (scheduler instanceof CapacityScheduler) {
return authorizer.checkPermission(new AccessRequest( CSQueue queue = ((CapacityScheduler) scheduler).getQueue(app.getQueue());
((CapacityScheduler) scheduler).getQueue(queueName) if (queue == null) {
.getPrivilegedEntity(), callerUGI, // Application exists but the associated queue does not exist.
SchedulerUtils.toAccessType(acl), appId.toString(), appName)); // This may be due to queue is removed after RM restarts. Here, we choose
// to allow users to be able to view the apps for removed queue.
LOG.error("Queue " + app.getQueue() + " does not exist for " + app
.getApplicationId());
return true;
}
return authorizer.checkPermission(
new AccessRequest(queue.getPrivilegedEntity(), callerUGI,
SchedulerUtils.toAccessType(acl),
app.getApplicationId().toString(), app.getName()));
} else { } else {
return scheduler.checkAccess(callerUGI, acl, queueName); return scheduler.checkAccess(callerUGI, acl, app.getQueue());
} }
} }
} }

View File

@ -232,8 +232,7 @@ public class RMWebServices extends WebServices {
ApplicationAccessType.VIEW_APP, app.getUser(), ApplicationAccessType.VIEW_APP, app.getUser(),
app.getApplicationId()) || app.getApplicationId()) ||
this.rm.getQueueACLsManager().checkAccess(callerUGI, this.rm.getQueueACLsManager().checkAccess(callerUGI,
QueueACL.ADMINISTER_QUEUE, app.getQueue(), QueueACL.ADMINISTER_QUEUE, app))) {
app.getApplicationId(), app.getName()))) {
return false; return false;
} }
return true; return true;

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -30,6 +29,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -112,8 +112,7 @@ public class TestApplicationACLs {
Configuration conf) { Configuration conf) {
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString(), any(ApplicationId.class), any(QueueACL.class), any(RMApp.class))).thenAnswer(new Answer() {
anyString())).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) { public Object answer(InvocationOnMock invocation) {
return isQueueUser; return isQueueUser;
} }

View File

@ -473,8 +473,7 @@ public class TestClientRMService {
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when( when(
mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString(), any(ApplicationId.class), any(QueueACL.class), any(RMApp.class))).thenReturn(true);
anyString())).thenReturn(true);
return new ClientRMService(rmContext, yarnScheduler, appManager, return new ClientRMService(rmContext, yarnScheduler, appManager,
mockAclsManager, mockQueueACLsManager, null); mockAclsManager, mockQueueACLsManager, null);
} }
@ -575,8 +574,7 @@ public class TestClientRMService {
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString(), any(ApplicationId.class), any(QueueACL.class), any(RMApp.class))).thenReturn(true);
anyString())).thenReturn(true);
when(mockAclsManager.checkAccess(any(UserGroupInformation.class), when(mockAclsManager.checkAccess(any(UserGroupInformation.class),
any(ApplicationAccessType.class), anyString(), any(ApplicationAccessType.class), anyString(),
any(ApplicationId.class))).thenReturn(true); any(ApplicationId.class))).thenReturn(true);
@ -602,8 +600,7 @@ public class TestClientRMService {
QueueACLsManager mockQueueACLsManager1 = QueueACLsManager mockQueueACLsManager1 =
mock(QueueACLsManager.class); mock(QueueACLsManager.class);
when(mockQueueACLsManager1.checkAccess(any(UserGroupInformation.class), when(mockQueueACLsManager1.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString(), any(ApplicationId.class), any(QueueACL.class), any(RMApp.class))).thenReturn(false);
anyString())).thenReturn(false);
when(mockAclsManager1.checkAccess(any(UserGroupInformation.class), when(mockAclsManager1.checkAccess(any(UserGroupInformation.class),
any(ApplicationAccessType.class), anyString(), any(ApplicationAccessType.class), anyString(),
any(ApplicationId.class))).thenReturn(false); any(ApplicationId.class))).thenReturn(false);
@ -642,8 +639,7 @@ public class TestClientRMService {
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString(), any(ApplicationId.class), any(QueueACL.class), any(RMApp.class))).thenReturn(true);
anyString())).thenReturn(true);
ClientRMService rmService = ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager, new ClientRMService(rmContext, yarnScheduler, appManager,
mockAclsManager, mockQueueACLsManager, null); mockAclsManager, mockQueueACLsManager, null);
@ -731,8 +727,7 @@ public class TestClientRMService {
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString(), any(ApplicationId.class), any(QueueACL.class), any(RMApp.class))).thenReturn(true);
anyString())).thenReturn(true);
ClientRMService rmService = ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager, new ClientRMService(rmContext, yarnScheduler, appManager,
mockAclsManager, mockQueueACLsManager, null); mockAclsManager, mockQueueACLsManager, null);

View File

@ -18,23 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals; import com.google.common.base.Supplier;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -99,7 +83,22 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase { public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase {
@ -564,6 +563,43 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f); .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
} }
// 1. submit an app to default queue and let it finish
// 2. restart rm with no default queue
// 3. getApplicationReport call should succeed (with no NPE)
@Test (timeout = 30000)
public void testRMRestartWithRemovedQueue() throws Exception{
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "");
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
final RMApp app1 = rm1.submitApp(1024, "app1", USER_1, null);
MockAM am1 = MockRM.launchAndRegisterAM(app1,rm1, nm1);
MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST});
final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST;
csConf.setCapacity(noQueue, 100);
rm2 = new MockRM(csConf,memStore);
rm2.start();
UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2");
ApplicationReport report =
user2.doAs(new PrivilegedExceptionAction<ApplicationReport>() {
@Override
public ApplicationReport run() throws Exception {
return rm2.getApplicationReport(app1.getApplicationId());
}
});
Assert.assertNotNull(report);
}
// Test CS recovery with multi-level queues and multi-users: // Test CS recovery with multi-level queues and multi-users:
// 1. setup 2 NMs each with 8GB memory; // 1. setup 2 NMs each with 8GB memory;
// 2. setup 2 level queues: Default -> (QueueA, QueueB) // 2. setup 2 level queues: Default -> (QueueA, QueueB)