YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar
This commit is contained in:
parent
ae42c8cb61
commit
e7d44e48f7
|
@ -543,6 +543,24 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String RM_NODES_INCLUDE_FILE_PATH =
|
||||
RM_PREFIX + "nodes.include-path";
|
||||
public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = "";
|
||||
|
||||
/** Enable submission pre-processor.*/
|
||||
public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED =
|
||||
RM_PREFIX + "submission-preprocessor.enabled";
|
||||
public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED =
|
||||
false;
|
||||
|
||||
/** Path to file with hosts for the submission processor to handle.*/
|
||||
public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
|
||||
RM_PREFIX + "submission-preprocessor.file-path";
|
||||
public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
|
||||
"";
|
||||
|
||||
/** Submission processor refresh interval.*/
|
||||
public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS =
|
||||
RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
|
||||
public static final int
|
||||
DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0;
|
||||
|
||||
/** Path to file with nodes to exclude.*/
|
||||
public static final String RM_NODES_EXCLUDE_FILE_PATH =
|
||||
|
|
|
@ -4263,4 +4263,25 @@
|
|||
<name>yarn.nodemanager.containers-launcher.class</name>
|
||||
<value>org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Enable the Pre processing of Application Submission context with server side configuration
|
||||
</description>
|
||||
<name>yarn.resourcemanager.submission-preprocessor.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Path to file with hosts for the submission processor to handle.</description>
|
||||
<name>yarn.resourcemanager.submission-preprocessor.file-path</name>
|
||||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Submission processor refresh interval</description>
|
||||
<name>yarn.resourcemanager.submission-preprocessor.file-refresh-interval-ms</name>
|
||||
<value>60000</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -167,6 +167,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.preprocessor.SubmissionContextPreProcessor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
||||
|
@ -232,6 +233,8 @@ public class ClientRMService extends AbstractService implements
|
|||
private ReservationSystem reservationSystem;
|
||||
private ReservationInputValidator rValidator;
|
||||
|
||||
private SubmissionContextPreProcessor contextPreProcessor;
|
||||
|
||||
private boolean filterAppsByUser = false;
|
||||
|
||||
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
|
||||
|
@ -313,6 +316,13 @@ public class ClientRMService extends AbstractService implements
|
|||
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||
timelineServiceV2Enabled(conf);
|
||||
|
||||
if (conf.getBoolean(
|
||||
YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) {
|
||||
this.contextPreProcessor = new SubmissionContextPreProcessor();
|
||||
this.contextPreProcessor.start(conf);
|
||||
}
|
||||
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
|
@ -321,6 +331,9 @@ public class ClientRMService extends AbstractService implements
|
|||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
if (this.contextPreProcessor != null) {
|
||||
this.contextPreProcessor.stop();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
@ -332,6 +345,11 @@ public class ClientRMService extends AbstractService implements
|
|||
YarnConfiguration.DEFAULT_RM_PORT);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
SubmissionContextPreProcessor getContextPreProcessor() {
|
||||
return this.contextPreProcessor;
|
||||
}
|
||||
|
||||
@Private
|
||||
public InetSocketAddress getBindAddress() {
|
||||
return clientBindAddress;
|
||||
|
@ -663,6 +681,11 @@ public class ClientRMService extends AbstractService implements
|
|||
checkReservationACLs(submissionContext.getQueue(), AuditConstants
|
||||
.SUBMIT_RESERVATION_REQUEST, reservationId);
|
||||
|
||||
if (this.contextPreProcessor != null) {
|
||||
this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(),
|
||||
applicationId, submissionContext);
|
||||
}
|
||||
|
||||
try {
|
||||
// call RMAppManager to submit application directly
|
||||
rmAppManager.submitApplication(submissionContext,
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* This is the interface providing functionality to process
|
||||
* application submission context.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface ContextProcessor {
|
||||
/**
|
||||
* It will enrich the application submission context with value provided.
|
||||
* @param host Address of the host from where application launched.
|
||||
* @param value Value to be filled in ApplicationSubmissionContext.
|
||||
* @param applicationId Application Id of the application.
|
||||
* @param submissionContext Context of the application.
|
||||
*/
|
||||
void process(String host, String value, ApplicationId applicationId,
|
||||
ApplicationSubmissionContext submissionContext);
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
||||
/**
|
||||
* Processor will add the node label to application submission context.
|
||||
*/
|
||||
class NodeLabelProcessor implements ContextProcessor {
|
||||
@Override
|
||||
public void process(String host, String value, ApplicationId applicationId,
|
||||
ApplicationSubmissionContext submissionContext) {
|
||||
submissionContext.setNodeLabelExpression(value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
||||
|
||||
/**
|
||||
* Processor will add queue to application submission context.
|
||||
*/
|
||||
class QueueProcessor implements ContextProcessor {
|
||||
@Override
|
||||
public void process(String host, String value, ApplicationId applicationId,
|
||||
ApplicationSubmissionContext submissionContext) {
|
||||
submissionContext.setQueue(value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,223 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Pre process the ApplicationSubmissionContext with server side info.
|
||||
*/
|
||||
public class SubmissionContextPreProcessor {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
SubmissionContextPreProcessor.class);
|
||||
private static final String DEFAULT_COMMANDS = "*";
|
||||
private static final int INITIAL_DELAY = 1000;
|
||||
|
||||
enum ContextProp {
|
||||
// Node label Expression
|
||||
NL(new NodeLabelProcessor()),
|
||||
// Queue
|
||||
Q(new QueueProcessor()),
|
||||
// Tag Add
|
||||
TA(new TagAddProcessor());
|
||||
|
||||
private ContextProcessor cp;
|
||||
ContextProp(ContextProcessor cp) {
|
||||
this.cp = cp;
|
||||
}
|
||||
}
|
||||
|
||||
private String hostsFilePath;
|
||||
private volatile long lastModified = -1;
|
||||
private volatile Map<String, Map<ContextProp, String>> hostCommands =
|
||||
new HashMap<>();
|
||||
private ScheduledExecutorService executorService;
|
||||
|
||||
public void start(Configuration conf) {
|
||||
this.hostsFilePath =
|
||||
conf.get(
|
||||
YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
|
||||
YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH);
|
||||
int refreshPeriod =
|
||||
conf.getInt(
|
||||
YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS,
|
||||
YarnConfiguration.
|
||||
DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS);
|
||||
|
||||
LOG.info("Submission Context Preprocessor enabled: file=[{}], "
|
||||
+ "interval=[{}]", this.hostsFilePath, refreshPeriod);
|
||||
|
||||
executorService = Executors.newSingleThreadScheduledExecutor();
|
||||
Runnable refreshConf = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
refresh();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error while refreshing Submission PreProcessor file [{}]",
|
||||
hostsFilePath, ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
if (refreshPeriod > 0) {
|
||||
executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY,
|
||||
refreshPeriod, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
executorService.schedule(refreshConf, INITIAL_DELAY,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (this.executorService != null) {
|
||||
this.executorService.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
public void preProcess(String host, ApplicationId applicationId,
|
||||
ApplicationSubmissionContext submissionContext) {
|
||||
Map<ContextProp, String> cMap = hostCommands.get(host);
|
||||
|
||||
// Try regex match
|
||||
if (cMap == null) {
|
||||
for (Map.Entry<String, Map<ContextProp, String>> entry :
|
||||
hostCommands.entrySet()) {
|
||||
if (entry.getKey().equals(DEFAULT_COMMANDS)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Pattern p = Pattern.compile(entry.getKey());
|
||||
Matcher m = p.matcher(host);
|
||||
if (m.find()) {
|
||||
cMap = hostCommands.get(entry.getKey());
|
||||
}
|
||||
} catch (PatternSyntaxException exception) {
|
||||
LOG.warn("Invalid regex pattern: " + entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Set to default value
|
||||
if (cMap == null) {
|
||||
cMap = hostCommands.get(DEFAULT_COMMANDS);
|
||||
}
|
||||
if (cMap != null) {
|
||||
for (Map.Entry<ContextProp, String> entry : cMap.entrySet()) {
|
||||
entry.getKey().cp.process(host, entry.getValue(),
|
||||
applicationId, submissionContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void refresh() throws Exception {
|
||||
if (null == hostsFilePath || hostsFilePath.isEmpty()) {
|
||||
LOG.warn("Host list file path [{}] is empty or does not exist !!",
|
||||
hostsFilePath);
|
||||
} else {
|
||||
File hostFile = new File(hostsFilePath);
|
||||
if (!hostFile.exists() || !hostFile.isFile()) {
|
||||
LOG.warn("Host list file [{}] does not exist or is not a file !!",
|
||||
hostFile);
|
||||
} else if (hostFile.lastModified() <= lastModified) {
|
||||
LOG.debug("Host list file [{}] has not been modified from last refresh",
|
||||
hostFile);
|
||||
} else {
|
||||
FileInputStream fileInputStream = new FileInputStream(hostFile);
|
||||
BufferedReader reader = null;
|
||||
Map<String, Map<ContextProp, String>> tempHostCommands =
|
||||
new HashMap<>();
|
||||
try {
|
||||
reader = new BufferedReader(new InputStreamReader(fileInputStream,
|
||||
StandardCharsets.UTF_8));
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
// Lines should start with hostname and be followed with commands.
|
||||
// Delimiter is any contiguous sequence of space or tab character.
|
||||
// Commands are of the form:
|
||||
// <KEY>=<VALUE>
|
||||
// where KEY can be 'NL', 'Q' or 'TA' (more can be added later)
|
||||
// (TA stands for 'Tag Add')
|
||||
// Sample lines:
|
||||
// ...
|
||||
// host1 NL=foo Q=b
|
||||
// host2 Q=c NL=bar
|
||||
// ...
|
||||
String[] commands = line.split("[ \t\n\f\r]+");
|
||||
if (commands != null && commands.length > 1) {
|
||||
String host = commands[0].trim();
|
||||
if (host.startsWith("#")) {
|
||||
// All lines starting with # is a comment
|
||||
continue;
|
||||
}
|
||||
Map<ContextProp, String> cMap = null;
|
||||
for (int i = 1; i < commands.length; i++) {
|
||||
String[] cSplit = commands[i].split("=");
|
||||
if (cSplit == null || cSplit.length != 2) {
|
||||
LOG.error("No commands found for line [{}]", commands[i]);
|
||||
continue;
|
||||
}
|
||||
if (cMap == null) {
|
||||
cMap = new HashMap<>();
|
||||
}
|
||||
cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]);
|
||||
}
|
||||
if (cMap != null && cMap.size() > 0) {
|
||||
tempHostCommands.put(host, cMap);
|
||||
LOG.info("Following commands registered for host[{}] : {}",
|
||||
host, cMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
lastModified = hostFile.lastModified();
|
||||
} catch (Exception ex) {
|
||||
// Do not commit the new map if we have an Exception..
|
||||
tempHostCommands = null;
|
||||
throw ex;
|
||||
} finally {
|
||||
if (tempHostCommands != null && tempHostCommands.size() > 0) {
|
||||
hostCommands = tempHostCommands;
|
||||
}
|
||||
IOUtils.cleanupWithLogger(LOG, reader, fileInputStream);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* This processor will add the tag to application submission context.
|
||||
*/
|
||||
class TagAddProcessor implements ContextProcessor {
|
||||
@Override
|
||||
public void process(String host, String value, ApplicationId applicationId,
|
||||
ApplicationSubmissionContext submissionContext) {
|
||||
Set<String> applicationTags = submissionContext.getApplicationTags();
|
||||
if (applicationTags == null) {
|
||||
applicationTags = new HashSet<>();
|
||||
} else {
|
||||
applicationTags = new HashSet<>(applicationTags);
|
||||
}
|
||||
applicationTags.add(value);
|
||||
submissionContext.setApplicationTags(applicationTags);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* This package contains classes to pre process the application submission
|
||||
* context with server side configs.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -31,11 +31,15 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.AccessControlException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
|
@ -55,6 +59,7 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
|
@ -197,6 +202,7 @@ public class TestClientRMService {
|
|||
|
||||
private final static String QUEUE_1 = "Q-1";
|
||||
private final static String QUEUE_2 = "Q-2";
|
||||
private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
|
||||
private File resourceTypesFile = null;
|
||||
|
||||
@Test
|
||||
|
@ -975,6 +981,178 @@ public class TestClientRMService {
|
|||
Assert.assertEquals(0, applications1.size());
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
@SuppressWarnings ("rawtypes")
|
||||
public void testAppSubmitWithSubmissionPreProcessor() throws Exception {
|
||||
ResourceScheduler scheduler = mockResourceScheduler();
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
mockRMContext(scheduler, rmContext);
|
||||
YarnConfiguration yConf = new YarnConfiguration();
|
||||
yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
|
||||
true);
|
||||
yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
// Override the YARN configuration.
|
||||
when(rmContext.getYarnConfiguration()).thenReturn(yConf);
|
||||
RMStateStore stateStore = mock(RMStateStore.class);
|
||||
when(rmContext.getStateStore()).thenReturn(stateStore);
|
||||
RMAppManager appManager = new RMAppManager(rmContext, scheduler,
|
||||
null, mock(ApplicationACLsManager.class), new Configuration());
|
||||
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
|
||||
new EventHandler<Event>() {
|
||||
public void handle(Event event) {}
|
||||
});
|
||||
ApplicationId appId1 = getApplicationId(100);
|
||||
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
|
||||
when(
|
||||
mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
|
||||
ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
|
||||
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), any(RMApp.class), any(String.class),
|
||||
any()))
|
||||
.thenReturn(true);
|
||||
|
||||
ClientRMService rmService =
|
||||
new ClientRMService(rmContext, scheduler, appManager,
|
||||
mockAclsManager, mockQueueACLsManager, null);
|
||||
File rulesFile = File.createTempFile("submission_rules", ".tmp");
|
||||
rulesFile.deleteOnExit();
|
||||
rulesFile.createNewFile();
|
||||
|
||||
yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
|
||||
rulesFile.getAbsolutePath());
|
||||
rmService.serviceInit(yConf);
|
||||
rmService.serviceStart();
|
||||
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile));
|
||||
writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1");
|
||||
writer.newLine();
|
||||
writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2");
|
||||
writer.newLine();
|
||||
writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
|
||||
writer.newLine();
|
||||
writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
|
||||
writer.newLine();
|
||||
writer.write("* TA=cluster:other Q=default NL=barfoo");
|
||||
writer.newLine();
|
||||
writer.write("host.testcluster1.com Q=default");
|
||||
writer.flush();
|
||||
writer.close();
|
||||
rmService.getContextPreProcessor().refresh();
|
||||
setupCurrentCall("host.cluster1.com");
|
||||
SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
|
||||
appId1, null, null);
|
||||
try {
|
||||
rmService.submitApplication(submitRequest1);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail("Exception is not expected.");
|
||||
}
|
||||
RMApp app1 = rmContext.getRMApps().get(appId1);
|
||||
Assert.assertNotNull("app doesn't exist", app1);
|
||||
Assert.assertEquals("app name doesn't match",
|
||||
YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
|
||||
Assert.assertTrue("custom tag not present",
|
||||
app1.getApplicationTags().contains("cluster:cluster1"));
|
||||
Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue());
|
||||
Assert.assertEquals("app node label doesn't match",
|
||||
"foo", app1.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||
setupCurrentCall("host.cluster2.com");
|
||||
ApplicationId appId2 = getApplicationId(101);
|
||||
SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
|
||||
appId2, null, null);
|
||||
submitRequest2.getApplicationSubmissionContext().setApplicationType(
|
||||
"matchType");
|
||||
Set<String> aTags = new HashSet<String>();
|
||||
aTags.add(APPLICATION_TAG_SC_PREPROCESSOR);
|
||||
submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags);
|
||||
try {
|
||||
rmService.submitApplication(submitRequest2);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail("Exception is not expected.");
|
||||
}
|
||||
RMApp app2 = rmContext.getRMApps().get(appId2);
|
||||
Assert.assertNotNull("app doesn't exist", app2);
|
||||
Assert.assertEquals("app name doesn't match",
|
||||
YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName());
|
||||
Assert.assertTrue("client tag not present",
|
||||
app2.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
|
||||
Assert.assertTrue("custom tag not present",
|
||||
app2.getApplicationTags().contains("cluster:cluster2"));
|
||||
Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue());
|
||||
Assert.assertEquals("app node label doesn't match",
|
||||
"zuess",
|
||||
app2.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||
// Test Default commands
|
||||
setupCurrentCall("host2.cluster3.com");
|
||||
ApplicationId appId3 = getApplicationId(102);
|
||||
SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest(
|
||||
appId3, null, null);
|
||||
submitRequest3.getApplicationSubmissionContext().setApplicationType(
|
||||
"matchType");
|
||||
submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags);
|
||||
try {
|
||||
rmService.submitApplication(submitRequest3);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail("Exception is not expected.");
|
||||
}
|
||||
RMApp app3 = rmContext.getRMApps().get(appId3);
|
||||
Assert.assertNotNull("app doesn't exist", app3);
|
||||
Assert.assertEquals("app name doesn't match",
|
||||
YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName());
|
||||
Assert.assertTrue("client tag not present",
|
||||
app3.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
|
||||
Assert.assertTrue("custom tag not present",
|
||||
app3.getApplicationTags().contains("cluster:other"));
|
||||
Assert.assertEquals("app queue doesn't match", "default", app3.getQueue());
|
||||
Assert.assertEquals("app node label doesn't match",
|
||||
"barfoo",
|
||||
app3.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||
// Test regex
|
||||
setupCurrentCall("host.cluster100.com");
|
||||
ApplicationId appId4 = getApplicationId(103);
|
||||
SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest(
|
||||
appId4, null, null);
|
||||
try {
|
||||
rmService.submitApplication(submitRequest4);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail("Exception is not expected.");
|
||||
}
|
||||
RMApp app4 = rmContext.getRMApps().get(appId4);
|
||||
Assert.assertTrue("custom tag not present",
|
||||
app4.getApplicationTags().contains("cluster:reg"));
|
||||
Assert.assertEquals("app node label doesn't match",
|
||||
"reg", app4.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||
testSubmissionContextWithAbsentTAG(rmService, rmContext);
|
||||
rmService.serviceStop();
|
||||
}
|
||||
|
||||
private void testSubmissionContextWithAbsentTAG(ClientRMService rmService,
|
||||
RMContext rmContext) throws Exception {
|
||||
setupCurrentCall("host.testcluster1.com");
|
||||
ApplicationId appId5 = getApplicationId(104);
|
||||
SubmitApplicationRequest submitRequest5 = mockSubmitAppRequest(
|
||||
appId5, null, null);
|
||||
try {
|
||||
rmService.submitApplication(submitRequest5);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail("Exception is not expected.");
|
||||
}
|
||||
RMApp app5 = rmContext.getRMApps().get(appId5);
|
||||
Assert.assertEquals("custom tag present",
|
||||
app5.getApplicationTags().size(), 0);
|
||||
Assert.assertNull("app node label present",
|
||||
app5.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||
Assert.assertEquals("Queue name is not present",
|
||||
app5.getQueue(), "default");
|
||||
}
|
||||
private void setupCurrentCall(String hostName) throws UnknownHostException {
|
||||
Server.Call mockCall = mock(Server.Call.class);
|
||||
when(mockCall.getHostInetAddress()).thenReturn(
|
||||
InetAddress.getByAddress(hostName,
|
||||
new byte[]{123, 123, 123, 123}));
|
||||
Server.getCurCall().set(mockCall);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
@SuppressWarnings ("rawtypes")
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
||||
/**
|
||||
* This class will test the functionality of all the three
|
||||
* processor(Node, Queue, Tag) together on same
|
||||
* ApplicationSubmissionContext.
|
||||
*/
|
||||
public class TestContextProcessor {
|
||||
@Test
|
||||
public void testContextProcessor() {
|
||||
Map<ContextProcessor, String> contextProcessorsAndValues =
|
||||
new HashMap<>();
|
||||
contextProcessorsAndValues.put(new NodeLabelProcessor(), "foo");
|
||||
contextProcessorsAndValues.put(new QueueProcessor(), "queue1");
|
||||
contextProcessorsAndValues.put(new TagAddProcessor(), "cluster:cluster1");
|
||||
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||
ApplicationSubmissionContext applicationSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
for(Map.Entry<ContextProcessor, String> entry :
|
||||
contextProcessorsAndValues.entrySet()){
|
||||
entry.getKey().process("host.cluster2.com", entry.getValue(),
|
||||
app, applicationSubmissionContext);
|
||||
}
|
||||
Set<String> applicationTags =new HashSet<String>();
|
||||
applicationTags.add("cluster:cluster1");
|
||||
verify(applicationSubmissionContext, times(1))
|
||||
.setNodeLabelExpression("foo");
|
||||
verify(applicationSubmissionContext, times(1))
|
||||
.setQueue("queue1");
|
||||
verify(applicationSubmissionContext, times(1))
|
||||
.setApplicationTags(applicationTags);
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* This class will test the functionality of NodeLabelProcessor.
|
||||
*/
|
||||
public class TestNodeLabelProcessor {
|
||||
|
||||
@Test
|
||||
public void testNodeLabelProcessor() {
|
||||
ContextProcessor nodeLabelProcessor = new NodeLabelProcessor();
|
||||
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||
ApplicationSubmissionContext applicationSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
|
||||
nodeLabelProcessor.process("host.cluster2.com", "foo", app,
|
||||
applicationSubmissionContext);
|
||||
verify(applicationSubmissionContext, times(1))
|
||||
.setNodeLabelExpression("foo");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
||||
/**
|
||||
* This class will test the functionality of QueueProcessor.
|
||||
*/
|
||||
public class TestQueueProcessor {
|
||||
@Test
|
||||
public void testQueueProcessor() {
|
||||
ContextProcessor queueProcessor = new QueueProcessor();
|
||||
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||
ApplicationSubmissionContext applicationSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
|
||||
queueProcessor.process("host.cluster2.com", "queue1",
|
||||
app, applicationSubmissionContext);
|
||||
verify(applicationSubmissionContext, times(1)).setQueue("queue1");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
||||
/**
|
||||
* This class will test the functionality of TagAddProcessor.
|
||||
*/
|
||||
public class TestTagAddProcessor {
|
||||
@Test
|
||||
public void testTagAddProcessor() {
|
||||
ContextProcessor tagAddProcessor = new TagAddProcessor();
|
||||
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||
ApplicationSubmissionContext applicationSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
|
||||
tagAddProcessor.process("host.cluster2.com",
|
||||
"cluster:cluster1", app, applicationSubmissionContext);
|
||||
Set<String> applicationTags = new HashSet<String>();
|
||||
applicationTags.add("cluster:cluster1");
|
||||
verify(applicationSubmissionContext, times(1))
|
||||
.setApplicationTags(applicationTags);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue