YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar
This commit is contained in:
parent
ebb3b8add0
commit
1f685efc73
|
@ -532,6 +532,24 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String RM_NODES_INCLUDE_FILE_PATH =
|
public static final String RM_NODES_INCLUDE_FILE_PATH =
|
||||||
RM_PREFIX + "nodes.include-path";
|
RM_PREFIX + "nodes.include-path";
|
||||||
public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = "";
|
public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = "";
|
||||||
|
|
||||||
|
/** Enable submission pre-processor.*/
|
||||||
|
public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED =
|
||||||
|
RM_PREFIX + "submission-preprocessor.enabled";
|
||||||
|
public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED =
|
||||||
|
false;
|
||||||
|
|
||||||
|
/** Path to file with hosts for the submission processor to handle.*/
|
||||||
|
public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
|
||||||
|
RM_PREFIX + "submission-preprocessor.file-path";
|
||||||
|
public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
|
||||||
|
"";
|
||||||
|
|
||||||
|
/** Submission processor refresh interval.*/
|
||||||
|
public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS =
|
||||||
|
RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
|
||||||
|
public static final int
|
||||||
|
DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0;
|
||||||
|
|
||||||
/** Path to file with nodes to exclude.*/
|
/** Path to file with nodes to exclude.*/
|
||||||
public static final String RM_NODES_EXCLUDE_FILE_PATH =
|
public static final String RM_NODES_EXCLUDE_FILE_PATH =
|
||||||
|
|
|
@ -4082,4 +4082,25 @@
|
||||||
<name>yarn.nodemanager.containers-launcher.class</name>
|
<name>yarn.nodemanager.containers-launcher.class</name>
|
||||||
<value>org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher</value>
|
<value>org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Enable the Pre processing of Application Submission context with server side configuration
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.submission-preprocessor.enabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Path to file with hosts for the submission processor to handle.</description>
|
||||||
|
<name>yarn.resourcemanager.submission-preprocessor.file-path</name>
|
||||||
|
<value></value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Submission processor refresh interval</description>
|
||||||
|
<name>yarn.resourcemanager.submission-preprocessor.file-refresh-interval-ms</name>
|
||||||
|
<value>60000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -167,6 +167,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.preprocessor.SubmissionContextPreProcessor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
||||||
|
@ -231,6 +232,8 @@ public class ClientRMService extends AbstractService implements
|
||||||
private ReservationSystem reservationSystem;
|
private ReservationSystem reservationSystem;
|
||||||
private ReservationInputValidator rValidator;
|
private ReservationInputValidator rValidator;
|
||||||
|
|
||||||
|
private SubmissionContextPreProcessor contextPreProcessor;
|
||||||
|
|
||||||
private boolean filterAppsByUser = false;
|
private boolean filterAppsByUser = false;
|
||||||
|
|
||||||
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
|
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
|
||||||
|
@ -311,6 +314,14 @@ public class ClientRMService extends AbstractService implements
|
||||||
server.getListenerAddress());
|
server.getListenerAddress());
|
||||||
this.timelineServiceV2Enabled = YarnConfiguration.
|
this.timelineServiceV2Enabled = YarnConfiguration.
|
||||||
timelineServiceV2Enabled(conf);
|
timelineServiceV2Enabled(conf);
|
||||||
|
|
||||||
|
if (conf.getBoolean(
|
||||||
|
YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) {
|
||||||
|
this.contextPreProcessor = new SubmissionContextPreProcessor();
|
||||||
|
this.contextPreProcessor.start(conf);
|
||||||
|
}
|
||||||
|
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,6 +330,9 @@ public class ClientRMService extends AbstractService implements
|
||||||
if (this.server != null) {
|
if (this.server != null) {
|
||||||
this.server.stop();
|
this.server.stop();
|
||||||
}
|
}
|
||||||
|
if (this.contextPreProcessor != null) {
|
||||||
|
this.contextPreProcessor.stop();
|
||||||
|
}
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,6 +344,11 @@ public class ClientRMService extends AbstractService implements
|
||||||
YarnConfiguration.DEFAULT_RM_PORT);
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
SubmissionContextPreProcessor getContextPreProcessor() {
|
||||||
|
return this.contextPreProcessor;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public InetSocketAddress getBindAddress() {
|
public InetSocketAddress getBindAddress() {
|
||||||
return clientBindAddress;
|
return clientBindAddress;
|
||||||
|
@ -661,6 +680,11 @@ public class ClientRMService extends AbstractService implements
|
||||||
checkReservationACLs(submissionContext.getQueue(), AuditConstants
|
checkReservationACLs(submissionContext.getQueue(), AuditConstants
|
||||||
.SUBMIT_RESERVATION_REQUEST, reservationId);
|
.SUBMIT_RESERVATION_REQUEST, reservationId);
|
||||||
|
|
||||||
|
if (this.contextPreProcessor != null) {
|
||||||
|
this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(),
|
||||||
|
applicationId, submissionContext);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// call RMAppManager to submit application directly
|
// call RMAppManager to submit application directly
|
||||||
rmAppManager.submitApplication(submissionContext,
|
rmAppManager.submitApplication(submissionContext,
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the interface providing functionality to process
|
||||||
|
* application submission context.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public interface ContextProcessor {
|
||||||
|
/**
|
||||||
|
* It will enrich the application submission context with value provided.
|
||||||
|
* @param host Address of the host from where application launched.
|
||||||
|
* @param value Value to be filled in ApplicationSubmissionContext.
|
||||||
|
* @param applicationId Application Id of the application.
|
||||||
|
* @param submissionContext Context of the application.
|
||||||
|
*/
|
||||||
|
void process(String host, String value, ApplicationId applicationId,
|
||||||
|
ApplicationSubmissionContext submissionContext);
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Processor will add the node label to application submission context.
|
||||||
|
*/
|
||||||
|
class NodeLabelProcessor implements ContextProcessor {
|
||||||
|
@Override
|
||||||
|
public void process(String host, String value, ApplicationId applicationId,
|
||||||
|
ApplicationSubmissionContext submissionContext) {
|
||||||
|
submissionContext.setNodeLabelExpression(value);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Processor will add queue to application submission context.
|
||||||
|
*/
|
||||||
|
class QueueProcessor implements ContextProcessor {
|
||||||
|
@Override
|
||||||
|
public void process(String host, String value, ApplicationId applicationId,
|
||||||
|
ApplicationSubmissionContext submissionContext) {
|
||||||
|
submissionContext.setQueue(value);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,223 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.regex.PatternSyntaxException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pre process the ApplicationSubmissionContext with server side info.
|
||||||
|
*/
|
||||||
|
public class SubmissionContextPreProcessor {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
SubmissionContextPreProcessor.class);
|
||||||
|
private static final String DEFAULT_COMMANDS = "*";
|
||||||
|
private static final int INITIAL_DELAY = 1000;
|
||||||
|
|
||||||
|
enum ContextProp {
|
||||||
|
// Node label Expression
|
||||||
|
NL(new NodeLabelProcessor()),
|
||||||
|
// Queue
|
||||||
|
Q(new QueueProcessor()),
|
||||||
|
// Tag Add
|
||||||
|
TA(new TagAddProcessor());
|
||||||
|
|
||||||
|
private ContextProcessor cp;
|
||||||
|
ContextProp(ContextProcessor cp) {
|
||||||
|
this.cp = cp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String hostsFilePath;
|
||||||
|
private volatile long lastModified = -1;
|
||||||
|
private volatile Map<String, Map<ContextProp, String>> hostCommands =
|
||||||
|
new HashMap<>();
|
||||||
|
private ScheduledExecutorService executorService;
|
||||||
|
|
||||||
|
public void start(Configuration conf) {
|
||||||
|
this.hostsFilePath =
|
||||||
|
conf.get(
|
||||||
|
YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH);
|
||||||
|
int refreshPeriod =
|
||||||
|
conf.getInt(
|
||||||
|
YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS,
|
||||||
|
YarnConfiguration.
|
||||||
|
DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS);
|
||||||
|
|
||||||
|
LOG.info("Submission Context Preprocessor enabled: file=[{}], "
|
||||||
|
+ "interval=[{}]", this.hostsFilePath, refreshPeriod);
|
||||||
|
|
||||||
|
executorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
Runnable refreshConf = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
refresh();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Error while refreshing Submission PreProcessor file [{}]",
|
||||||
|
hostsFilePath, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if (refreshPeriod > 0) {
|
||||||
|
executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY,
|
||||||
|
refreshPeriod, TimeUnit.MILLISECONDS);
|
||||||
|
} else {
|
||||||
|
executorService.schedule(refreshConf, INITIAL_DELAY,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
if (this.executorService != null) {
|
||||||
|
this.executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void preProcess(String host, ApplicationId applicationId,
|
||||||
|
ApplicationSubmissionContext submissionContext) {
|
||||||
|
Map<ContextProp, String> cMap = hostCommands.get(host);
|
||||||
|
|
||||||
|
// Try regex match
|
||||||
|
if (cMap == null) {
|
||||||
|
for (Map.Entry<String, Map<ContextProp, String>> entry :
|
||||||
|
hostCommands.entrySet()) {
|
||||||
|
if (entry.getKey().equals(DEFAULT_COMMANDS)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Pattern p = Pattern.compile(entry.getKey());
|
||||||
|
Matcher m = p.matcher(host);
|
||||||
|
if (m.find()) {
|
||||||
|
cMap = hostCommands.get(entry.getKey());
|
||||||
|
}
|
||||||
|
} catch (PatternSyntaxException exception) {
|
||||||
|
LOG.warn("Invalid regex pattern: " + entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Set to default value
|
||||||
|
if (cMap == null) {
|
||||||
|
cMap = hostCommands.get(DEFAULT_COMMANDS);
|
||||||
|
}
|
||||||
|
if (cMap != null) {
|
||||||
|
for (Map.Entry<ContextProp, String> entry : cMap.entrySet()) {
|
||||||
|
entry.getKey().cp.process(host, entry.getValue(),
|
||||||
|
applicationId, submissionContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void refresh() throws Exception {
|
||||||
|
if (null == hostsFilePath || hostsFilePath.isEmpty()) {
|
||||||
|
LOG.warn("Host list file path [{}] is empty or does not exist !!",
|
||||||
|
hostsFilePath);
|
||||||
|
} else {
|
||||||
|
File hostFile = new File(hostsFilePath);
|
||||||
|
if (!hostFile.exists() || !hostFile.isFile()) {
|
||||||
|
LOG.warn("Host list file [{}] does not exist or is not a file !!",
|
||||||
|
hostFile);
|
||||||
|
} else if (hostFile.lastModified() <= lastModified) {
|
||||||
|
LOG.debug("Host list file [{}] has not been modified from last refresh",
|
||||||
|
hostFile);
|
||||||
|
} else {
|
||||||
|
FileInputStream fileInputStream = new FileInputStream(hostFile);
|
||||||
|
BufferedReader reader = null;
|
||||||
|
Map<String, Map<ContextProp, String>> tempHostCommands =
|
||||||
|
new HashMap<>();
|
||||||
|
try {
|
||||||
|
reader = new BufferedReader(new InputStreamReader(fileInputStream,
|
||||||
|
StandardCharsets.UTF_8));
|
||||||
|
String line;
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
// Lines should start with hostname and be followed with commands.
|
||||||
|
// Delimiter is any contiguous sequence of space or tab character.
|
||||||
|
// Commands are of the form:
|
||||||
|
// <KEY>=<VALUE>
|
||||||
|
// where KEY can be 'NL', 'Q' or 'TA' (more can be added later)
|
||||||
|
// (TA stands for 'Tag Add')
|
||||||
|
// Sample lines:
|
||||||
|
// ...
|
||||||
|
// host1 NL=foo Q=b
|
||||||
|
// host2 Q=c NL=bar
|
||||||
|
// ...
|
||||||
|
String[] commands = line.split("[ \t\n\f\r]+");
|
||||||
|
if (commands != null && commands.length > 1) {
|
||||||
|
String host = commands[0].trim();
|
||||||
|
if (host.startsWith("#")) {
|
||||||
|
// All lines starting with # is a comment
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<ContextProp, String> cMap = null;
|
||||||
|
for (int i = 1; i < commands.length; i++) {
|
||||||
|
String[] cSplit = commands[i].split("=");
|
||||||
|
if (cSplit == null || cSplit.length != 2) {
|
||||||
|
LOG.error("No commands found for line [{}]", commands[i]);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (cMap == null) {
|
||||||
|
cMap = new HashMap<>();
|
||||||
|
}
|
||||||
|
cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]);
|
||||||
|
}
|
||||||
|
if (cMap != null && cMap.size() > 0) {
|
||||||
|
tempHostCommands.put(host, cMap);
|
||||||
|
LOG.info("Following commands registered for host[{}] : {}",
|
||||||
|
host, cMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastModified = hostFile.lastModified();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// Do not commit the new map if we have an Exception..
|
||||||
|
tempHostCommands = null;
|
||||||
|
throw ex;
|
||||||
|
} finally {
|
||||||
|
if (tempHostCommands != null && tempHostCommands.size() > 0) {
|
||||||
|
hostCommands = tempHostCommands;
|
||||||
|
}
|
||||||
|
IOUtils.cleanupWithLogger(LOG, reader, fileInputStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This processor will add the tag to application submission context.
|
||||||
|
*/
|
||||||
|
class TagAddProcessor implements ContextProcessor {
|
||||||
|
@Override
|
||||||
|
public void process(String host, String value, ApplicationId applicationId,
|
||||||
|
ApplicationSubmissionContext submissionContext) {
|
||||||
|
Set<String> applicationTags = submissionContext.getApplicationTags();
|
||||||
|
if (applicationTags == null) {
|
||||||
|
applicationTags = new HashSet<>();
|
||||||
|
} else {
|
||||||
|
applicationTags = new HashSet<>(applicationTags);
|
||||||
|
}
|
||||||
|
applicationTags.add(value);
|
||||||
|
submissionContext.setApplicationTags(applicationTags);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This package contains classes to pre process the application submission
|
||||||
|
* context with server side configs.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -30,11 +30,15 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.security.AccessControlException;
|
import java.security.AccessControlException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -54,6 +58,7 @@ import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.MockApps;
|
import org.apache.hadoop.yarn.MockApps;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
@ -195,6 +200,7 @@ public class TestClientRMService {
|
||||||
|
|
||||||
private final static String QUEUE_1 = "Q-1";
|
private final static String QUEUE_1 = "Q-1";
|
||||||
private final static String QUEUE_2 = "Q-2";
|
private final static String QUEUE_2 = "Q-2";
|
||||||
|
private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
|
||||||
private File resourceTypesFile = null;
|
private File resourceTypesFile = null;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -972,6 +978,178 @@ public class TestClientRMService {
|
||||||
Assert.assertEquals(0, applications1.size());
|
Assert.assertEquals(0, applications1.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
@SuppressWarnings ("rawtypes")
|
||||||
|
public void testAppSubmitWithSubmissionPreProcessor() throws Exception {
|
||||||
|
ResourceScheduler scheduler = mockResourceScheduler();
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
mockRMContext(scheduler, rmContext);
|
||||||
|
YarnConfiguration yConf = new YarnConfiguration();
|
||||||
|
yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
|
||||||
|
true);
|
||||||
|
yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||||
|
// Override the YARN configuration.
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(yConf);
|
||||||
|
RMStateStore stateStore = mock(RMStateStore.class);
|
||||||
|
when(rmContext.getStateStore()).thenReturn(stateStore);
|
||||||
|
RMAppManager appManager = new RMAppManager(rmContext, scheduler,
|
||||||
|
null, mock(ApplicationACLsManager.class), new Configuration());
|
||||||
|
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
|
||||||
|
new EventHandler<Event>() {
|
||||||
|
public void handle(Event event) {}
|
||||||
|
});
|
||||||
|
ApplicationId appId1 = getApplicationId(100);
|
||||||
|
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
|
||||||
|
when(
|
||||||
|
mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
|
||||||
|
ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
|
||||||
|
|
||||||
|
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||||
|
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||||
|
any(QueueACL.class), any(RMApp.class), any(String.class),
|
||||||
|
any()))
|
||||||
|
.thenReturn(true);
|
||||||
|
|
||||||
|
ClientRMService rmService =
|
||||||
|
new ClientRMService(rmContext, scheduler, appManager,
|
||||||
|
mockAclsManager, mockQueueACLsManager, null);
|
||||||
|
File rulesFile = File.createTempFile("submission_rules", ".tmp");
|
||||||
|
rulesFile.deleteOnExit();
|
||||||
|
rulesFile.createNewFile();
|
||||||
|
|
||||||
|
yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
|
||||||
|
rulesFile.getAbsolutePath());
|
||||||
|
rmService.serviceInit(yConf);
|
||||||
|
rmService.serviceStart();
|
||||||
|
|
||||||
|
BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile));
|
||||||
|
writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1");
|
||||||
|
writer.newLine();
|
||||||
|
writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2");
|
||||||
|
writer.newLine();
|
||||||
|
writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
|
||||||
|
writer.newLine();
|
||||||
|
writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
|
||||||
|
writer.newLine();
|
||||||
|
writer.write("* TA=cluster:other Q=default NL=barfoo");
|
||||||
|
writer.newLine();
|
||||||
|
writer.write("host.testcluster1.com Q=default");
|
||||||
|
writer.flush();
|
||||||
|
writer.close();
|
||||||
|
rmService.getContextPreProcessor().refresh();
|
||||||
|
setupCurrentCall("host.cluster1.com");
|
||||||
|
SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
|
||||||
|
appId1, null, null);
|
||||||
|
try {
|
||||||
|
rmService.submitApplication(submitRequest1);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected.");
|
||||||
|
}
|
||||||
|
RMApp app1 = rmContext.getRMApps().get(appId1);
|
||||||
|
Assert.assertNotNull("app doesn't exist", app1);
|
||||||
|
Assert.assertEquals("app name doesn't match",
|
||||||
|
YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
|
||||||
|
Assert.assertTrue("custom tag not present",
|
||||||
|
app1.getApplicationTags().contains("cluster:cluster1"));
|
||||||
|
Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue());
|
||||||
|
Assert.assertEquals("app node label doesn't match",
|
||||||
|
"foo", app1.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||||
|
setupCurrentCall("host.cluster2.com");
|
||||||
|
ApplicationId appId2 = getApplicationId(101);
|
||||||
|
SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
|
||||||
|
appId2, null, null);
|
||||||
|
submitRequest2.getApplicationSubmissionContext().setApplicationType(
|
||||||
|
"matchType");
|
||||||
|
Set<String> aTags = new HashSet<String>();
|
||||||
|
aTags.add(APPLICATION_TAG_SC_PREPROCESSOR);
|
||||||
|
submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags);
|
||||||
|
try {
|
||||||
|
rmService.submitApplication(submitRequest2);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected.");
|
||||||
|
}
|
||||||
|
RMApp app2 = rmContext.getRMApps().get(appId2);
|
||||||
|
Assert.assertNotNull("app doesn't exist", app2);
|
||||||
|
Assert.assertEquals("app name doesn't match",
|
||||||
|
YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName());
|
||||||
|
Assert.assertTrue("client tag not present",
|
||||||
|
app2.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
|
||||||
|
Assert.assertTrue("custom tag not present",
|
||||||
|
app2.getApplicationTags().contains("cluster:cluster2"));
|
||||||
|
Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue());
|
||||||
|
Assert.assertEquals("app node label doesn't match",
|
||||||
|
"zuess",
|
||||||
|
app2.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||||
|
// Test Default commands
|
||||||
|
setupCurrentCall("host2.cluster3.com");
|
||||||
|
ApplicationId appId3 = getApplicationId(102);
|
||||||
|
SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest(
|
||||||
|
appId3, null, null);
|
||||||
|
submitRequest3.getApplicationSubmissionContext().setApplicationType(
|
||||||
|
"matchType");
|
||||||
|
submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags);
|
||||||
|
try {
|
||||||
|
rmService.submitApplication(submitRequest3);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected.");
|
||||||
|
}
|
||||||
|
RMApp app3 = rmContext.getRMApps().get(appId3);
|
||||||
|
Assert.assertNotNull("app doesn't exist", app3);
|
||||||
|
Assert.assertEquals("app name doesn't match",
|
||||||
|
YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName());
|
||||||
|
Assert.assertTrue("client tag not present",
|
||||||
|
app3.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
|
||||||
|
Assert.assertTrue("custom tag not present",
|
||||||
|
app3.getApplicationTags().contains("cluster:other"));
|
||||||
|
Assert.assertEquals("app queue doesn't match", "default", app3.getQueue());
|
||||||
|
Assert.assertEquals("app node label doesn't match",
|
||||||
|
"barfoo",
|
||||||
|
app3.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||||
|
// Test regex
|
||||||
|
setupCurrentCall("host.cluster100.com");
|
||||||
|
ApplicationId appId4 = getApplicationId(103);
|
||||||
|
SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest(
|
||||||
|
appId4, null, null);
|
||||||
|
try {
|
||||||
|
rmService.submitApplication(submitRequest4);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected.");
|
||||||
|
}
|
||||||
|
RMApp app4 = rmContext.getRMApps().get(appId4);
|
||||||
|
Assert.assertTrue("custom tag not present",
|
||||||
|
app4.getApplicationTags().contains("cluster:reg"));
|
||||||
|
Assert.assertEquals("app node label doesn't match",
|
||||||
|
"reg", app4.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||||
|
testSubmissionContextWithAbsentTAG(rmService, rmContext);
|
||||||
|
rmService.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testSubmissionContextWithAbsentTAG(ClientRMService rmService,
|
||||||
|
RMContext rmContext) throws Exception {
|
||||||
|
setupCurrentCall("host.testcluster1.com");
|
||||||
|
ApplicationId appId5 = getApplicationId(104);
|
||||||
|
SubmitApplicationRequest submitRequest5 = mockSubmitAppRequest(
|
||||||
|
appId5, null, null);
|
||||||
|
try {
|
||||||
|
rmService.submitApplication(submitRequest5);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected.");
|
||||||
|
}
|
||||||
|
RMApp app5 = rmContext.getRMApps().get(appId5);
|
||||||
|
Assert.assertEquals("custom tag present",
|
||||||
|
app5.getApplicationTags().size(), 0);
|
||||||
|
Assert.assertNull("app node label present",
|
||||||
|
app5.getApplicationSubmissionContext().getNodeLabelExpression());
|
||||||
|
Assert.assertEquals("Queue name is not present",
|
||||||
|
app5.getQueue(), "default");
|
||||||
|
}
|
||||||
|
private void setupCurrentCall(String hostName) throws UnknownHostException {
|
||||||
|
Server.Call mockCall = mock(Server.Call.class);
|
||||||
|
when(mockCall.getHostInetAddress()).thenReturn(
|
||||||
|
InetAddress.getByAddress(hostName,
|
||||||
|
new byte[]{123, 123, 123, 123}));
|
||||||
|
Server.getCurCall().set(mockCall);
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@SuppressWarnings ("rawtypes")
|
@SuppressWarnings ("rawtypes")
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class will test the functionality of all the three
|
||||||
|
* processor(Node, Queue, Tag) together on same
|
||||||
|
* ApplicationSubmissionContext.
|
||||||
|
*/
|
||||||
|
public class TestContextProcessor {
|
||||||
|
@Test
|
||||||
|
public void testContextProcessor() {
|
||||||
|
Map<ContextProcessor, String> contextProcessorsAndValues =
|
||||||
|
new HashMap<>();
|
||||||
|
contextProcessorsAndValues.put(new NodeLabelProcessor(), "foo");
|
||||||
|
contextProcessorsAndValues.put(new QueueProcessor(), "queue1");
|
||||||
|
contextProcessorsAndValues.put(new TagAddProcessor(), "cluster:cluster1");
|
||||||
|
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||||
|
ApplicationSubmissionContext applicationSubmissionContext =
|
||||||
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
for(Map.Entry<ContextProcessor, String> entry :
|
||||||
|
contextProcessorsAndValues.entrySet()){
|
||||||
|
entry.getKey().process("host.cluster2.com", entry.getValue(),
|
||||||
|
app, applicationSubmissionContext);
|
||||||
|
}
|
||||||
|
Set<String> applicationTags =new HashSet<String>();
|
||||||
|
applicationTags.add("cluster:cluster1");
|
||||||
|
verify(applicationSubmissionContext, times(1))
|
||||||
|
.setNodeLabelExpression("foo");
|
||||||
|
verify(applicationSubmissionContext, times(1))
|
||||||
|
.setQueue("queue1");
|
||||||
|
verify(applicationSubmissionContext, times(1))
|
||||||
|
.setApplicationTags(applicationTags);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class will test the functionality of NodeLabelProcessor.
|
||||||
|
*/
|
||||||
|
public class TestNodeLabelProcessor {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeLabelProcessor() {
|
||||||
|
ContextProcessor nodeLabelProcessor = new NodeLabelProcessor();
|
||||||
|
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||||
|
ApplicationSubmissionContext applicationSubmissionContext =
|
||||||
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
|
||||||
|
nodeLabelProcessor.process("host.cluster2.com", "foo", app,
|
||||||
|
applicationSubmissionContext);
|
||||||
|
verify(applicationSubmissionContext, times(1))
|
||||||
|
.setNodeLabelExpression("foo");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class will test the functionality of QueueProcessor.
|
||||||
|
*/
|
||||||
|
public class TestQueueProcessor {
|
||||||
|
@Test
|
||||||
|
public void testQueueProcessor() {
|
||||||
|
ContextProcessor queueProcessor = new QueueProcessor();
|
||||||
|
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||||
|
ApplicationSubmissionContext applicationSubmissionContext =
|
||||||
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
|
||||||
|
queueProcessor.process("host.cluster2.com", "queue1",
|
||||||
|
app, applicationSubmissionContext);
|
||||||
|
verify(applicationSubmissionContext, times(1)).setQueue("queue1");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.junit.Test;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class will test the functionality of TagAddProcessor.
|
||||||
|
*/
|
||||||
|
public class TestTagAddProcessor {
|
||||||
|
@Test
|
||||||
|
public void testTagAddProcessor() {
|
||||||
|
ContextProcessor tagAddProcessor = new TagAddProcessor();
|
||||||
|
ApplicationId app = ApplicationId.newInstance(123456, 111);
|
||||||
|
ApplicationSubmissionContext applicationSubmissionContext =
|
||||||
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
|
||||||
|
tagAddProcessor.process("host.cluster2.com",
|
||||||
|
"cluster:cluster1", app, applicationSubmissionContext);
|
||||||
|
Set<String> applicationTags = new HashSet<String>();
|
||||||
|
applicationTags.add("cluster:cluster1");
|
||||||
|
verify(applicationSubmissionContext, times(1))
|
||||||
|
.setApplicationTags(applicationTags);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue