YARN-10787. Queue submit ACL check is wrong when CS queue is ambiguous. Contributed by Gergely Pollak
This commit is contained in:
parent
e9339aa376
commit
2707f69251
|
@ -84,9 +84,9 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFu
|
||||||
import org.apache.hadoop.yarn.util.StringHelper;
|
import org.apache.hadoop.yarn.util.StringHelper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class manages the list of applications for the resource manager.
|
* This class manages the list of applications for the resource manager.
|
||||||
*/
|
*/
|
||||||
public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
Recoverable {
|
Recoverable {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
|
@ -143,7 +143,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
static final Logger LOG = LoggerFactory.
|
static final Logger LOG = LoggerFactory.
|
||||||
getLogger(ApplicationSummary.class);
|
getLogger(ApplicationSummary.class);
|
||||||
|
|
||||||
// Escape sequences
|
// Escape sequences
|
||||||
static final char EQUALS = '=';
|
static final char EQUALS = '=';
|
||||||
static final char[] charsToEscape =
|
static final char[] charsToEscape =
|
||||||
{StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
|
{StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
|
||||||
|
@ -182,7 +182,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* create a summary of the application's runtime.
|
* create a summary of the application's runtime.
|
||||||
*
|
*
|
||||||
* @param app {@link RMApp} whose summary is to be created, cannot
|
* @param app {@link RMApp} whose summary is to be created, cannot
|
||||||
* be <code>null</code>.
|
* be <code>null</code>.
|
||||||
*/
|
*/
|
||||||
|
@ -247,7 +247,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log a summary of the application's runtime.
|
* Log a summary of the application's runtime.
|
||||||
*
|
*
|
||||||
* @param app {@link RMApp} whose summary is to be logged
|
* @param app {@link RMApp} whose summary is to be logged
|
||||||
*/
|
*/
|
||||||
public static void logAppSummary(RMApp app) {
|
public static void logAppSummary(RMApp app) {
|
||||||
|
@ -274,7 +274,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized int getCompletedAppsListSize() {
|
protected synchronized int getCompletedAppsListSize() {
|
||||||
return this.completedApps.size();
|
return this.completedApps.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void finishApplication(ApplicationId applicationId) {
|
protected synchronized void finishApplication(ApplicationId applicationId) {
|
||||||
|
@ -285,7 +285,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
|
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
completedApps.add(applicationId);
|
completedApps.add(applicationId);
|
||||||
completedAppsInStateStore++;
|
completedAppsInStateStore++;
|
||||||
writeAuditLog(applicationId);
|
writeAuditLog(applicationId);
|
||||||
|
@ -297,26 +297,26 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
String operation = "UNKONWN";
|
String operation = "UNKONWN";
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
switch (app.getState()) {
|
switch (app.getState()) {
|
||||||
case FAILED:
|
case FAILED:
|
||||||
operation = AuditConstants.FINISH_FAILED_APP;
|
operation = AuditConstants.FINISH_FAILED_APP;
|
||||||
break;
|
break;
|
||||||
case FINISHED:
|
case FINISHED:
|
||||||
operation = AuditConstants.FINISH_SUCCESS_APP;
|
operation = AuditConstants.FINISH_SUCCESS_APP;
|
||||||
success = true;
|
success = true;
|
||||||
break;
|
break;
|
||||||
case KILLED:
|
case KILLED:
|
||||||
operation = AuditConstants.FINISH_KILLED_APP;
|
operation = AuditConstants.FINISH_KILLED_APP;
|
||||||
success = true;
|
success = true;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
RMAuditLogger.logSuccess(app.getUser(), operation,
|
RMAuditLogger.logSuccess(app.getUser(), operation,
|
||||||
"RMAppManager", app.getApplicationId());
|
"RMAppManager", app.getApplicationId());
|
||||||
} else {
|
} else {
|
||||||
StringBuilder diag = app.getDiagnostics();
|
StringBuilder diag = app.getDiagnostics();
|
||||||
String msg = diag == null ? null : diag.toString();
|
String msg = diag == null ? null : diag.toString();
|
||||||
RMAuditLogger.logFailure(app.getUser(), operation, msg, "RMAppManager",
|
RMAuditLogger.logFailure(app.getUser(), operation, msg, "RMAppManager",
|
||||||
"App failed with state: " + app.getState(), appId);
|
"App failed with state: " + app.getState(), appId);
|
||||||
|
@ -445,7 +445,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
|
|
||||||
if (!isRecovery && YarnConfiguration.isAclEnabled(conf)) {
|
if (!isRecovery && YarnConfiguration.isAclEnabled(conf)) {
|
||||||
if (scheduler instanceof CapacityScheduler) {
|
if (scheduler instanceof CapacityScheduler) {
|
||||||
String queueName = submissionContext.getQueue();
|
String queueName = placementContext == null ?
|
||||||
|
submissionContext.getQueue() : placementContext.getFullQueuePath();
|
||||||
|
|
||||||
String appName = submissionContext.getApplicationName();
|
String appName = submissionContext.getApplicationName();
|
||||||
CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
|
CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.*;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
|
||||||
|
|
||||||
|
public class TestCapacitySchedulerAmbiguousLeafs {
|
||||||
|
/**
|
||||||
|
* Internal counter for incremental application id generation
|
||||||
|
*/
|
||||||
|
int appId = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to submit applications via RMClientService, to make sure
|
||||||
|
* all submissions go through RMAppManager.
|
||||||
|
* @param rm The resource manager instance
|
||||||
|
* @param queue Name of the queue to submit the application to
|
||||||
|
* @return ApplicationID of the submitted application
|
||||||
|
* @throws IOException
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
private ApplicationId submitApplication(MockRM rm, String queue)
|
||||||
|
throws IOException, YarnException {
|
||||||
|
//Generating incremental application id
|
||||||
|
final ApplicationAttemptId appAttemptId = TestUtils
|
||||||
|
.getMockApplicationAttemptId(appId++, 1);
|
||||||
|
|
||||||
|
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||||
|
ContainerLaunchContext amContainerSpec = ContainerLaunchContext
|
||||||
|
.newInstance(null, null, null, null, null, null);
|
||||||
|
ApplicationSubmissionContext asc = ApplicationSubmissionContext
|
||||||
|
.newInstance(appAttemptId.getApplicationId(), "Test application",
|
||||||
|
queue, null, amContainerSpec, false, true, 1, resource,
|
||||||
|
"applicationType");
|
||||||
|
|
||||||
|
SubmitApplicationRequest req = SubmitApplicationRequest.newInstance(asc);
|
||||||
|
rm.getClientRMService().submitApplication(req);
|
||||||
|
return appAttemptId.getApplicationId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAmbiguousSubmissionWithACL() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
|
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
|
||||||
|
CapacitySchedulerConfiguration schedulerConf = cs.getConfiguration();
|
||||||
|
|
||||||
|
schedulerConf.setQueues(ROOT, new String[] {"a", "b", "default"});
|
||||||
|
schedulerConf.setAcl(ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
|
||||||
|
schedulerConf.setAcl(ROOT, QueueACL.ADMINISTER_QUEUE, "forbidden forbidden");
|
||||||
|
|
||||||
|
schedulerConf.setQueues(ROOT + ".a", new String[] {"unique", "ambi"});
|
||||||
|
schedulerConf.setAcl(ROOT + ".a", QueueACL.SUBMIT_APPLICATIONS, "forbidden forbidden");
|
||||||
|
schedulerConf.setCapacity(ROOT + ".a", 45);
|
||||||
|
|
||||||
|
schedulerConf.setQueues(ROOT + ".b", new String[] {"ambi"});
|
||||||
|
schedulerConf.setCapacity(ROOT + ".b", 45);
|
||||||
|
schedulerConf.setCapacity(ROOT + ".default", 10);
|
||||||
|
|
||||||
|
schedulerConf.setCapacity(ROOT + ".a.unique", 50);
|
||||||
|
schedulerConf.setAcl(ROOT + ".a.unique", QueueACL.SUBMIT_APPLICATIONS, "* *");
|
||||||
|
schedulerConf.setCapacity(ROOT + ".a.ambi", 50);
|
||||||
|
schedulerConf.setAcl(ROOT + ".a.ambi", QueueACL.SUBMIT_APPLICATIONS, "* *");
|
||||||
|
schedulerConf.setCapacity(ROOT + ".b.ambi", 100);
|
||||||
|
|
||||||
|
schedulerConf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, "json");
|
||||||
|
//Simple %specified mapping rule for all submissions with skip fallback
|
||||||
|
//The %specified needed rule to make sure we get an
|
||||||
|
//ApplicationPlacementContext which is required for validating YARN-10787
|
||||||
|
schedulerConf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON,
|
||||||
|
"{\"rules\" : [{\"type\": \"user\", \"policy\" : \"specified\", " +
|
||||||
|
"\"fallbackResult\" : \"skip\", \"matches\" : \"*\"}]}");
|
||||||
|
schedulerConf.setOverrideWithQueueMappings(true);
|
||||||
|
|
||||||
|
rm.start();
|
||||||
|
cs.reinitialize(schedulerConf, rm.getRMContext());
|
||||||
|
|
||||||
|
|
||||||
|
ApplicationId id = submitApplication(rm, "root.a.unique");
|
||||||
|
rm.waitForState(id, RMAppState.ACCEPTED);
|
||||||
|
|
||||||
|
id = submitApplication(rm, "unique");
|
||||||
|
rm.waitForState(id, RMAppState.ACCEPTED);
|
||||||
|
|
||||||
|
id = submitApplication(rm, "ambi");
|
||||||
|
rm.waitForState(id, RMAppState.FAILED);
|
||||||
|
|
||||||
|
id = submitApplication(rm, "root.a.ambi");
|
||||||
|
rm.waitForState(id, RMAppState.ACCEPTED);
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue