From 11f6e3bc415b6a4e344ad516aba95f2d74ffa8a9 Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Fri, 6 Sep 2019 10:02:18 -0700 Subject: [PATCH] YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar --- .../hadoop/yarn/conf/YarnConfiguration.java | 18 ++ .../src/main/resources/yarn-default.xml | 21 ++ .../resourcemanager/ClientRMService.java | 24 ++ .../preprocessor/ContextProcessor.java | 44 ++++ .../preprocessor/NodeLabelProcessor.java | 33 +++ .../preprocessor/QueueProcessor.java | 34 +++ .../SubmissionContextPreProcessor.java | 223 ++++++++++++++++++ .../preprocessor/TagAddProcessor.java | 44 ++++ .../preprocessor/package-info.java | 28 +++ .../resourcemanager/TestClientRMService.java | 178 ++++++++++++++ .../preprocessor/TestContextProcessor.java | 63 +++++ .../preprocessor/TestNodeLabelProcessor.java | 45 ++++ .../preprocessor/TestQueueProcessor.java | 43 ++++ .../preprocessor/TestTagAddProcessor.java | 47 ++++ 14 files changed, 845 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 124169cfcf4..4cd655eebf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -526,6 +526,24 @@ public class YarnConfiguration extends Configuration { public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = ""; + + /** Enable submission pre-processor.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED = + RM_PREFIX + "submission-preprocessor.enabled"; + public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED = + false; + + /** Path to file with hosts for the submission processor to handle.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH = + RM_PREFIX + "submission-preprocessor.file-path"; + public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH = + ""; + + /** Submission processor refresh interval.*/ + public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = + RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms"; + public static final int + DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0; /** Path to file with nodes to exclude.*/ public static final String RM_NODES_EXCLUDE_FILE_PATH = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 748329ca24b..44b54ce2692 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3836,4 +3836,25 @@ yarn.nodemanager.containers-launcher.class org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher + + + + Enable the Pre processing of Application Submission context with server side configuration + + yarn.resourcemanager.submission-preprocessor.enabled + false + + + + Path to file with hosts for the submission processor to handle. + yarn.resourcemanager.submission-preprocessor.file-path + + + + + Submission processor refresh interval + yarn.resourcemanager.submission-preprocessor.file-refresh-interval-ms + 60000 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e0dead9d082..e81a37292aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -153,6 +153,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.preprocessor.SubmissionContextPreProcessor; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; @@ -217,6 +218,8 @@ public class ClientRMService extends AbstractService implements private ReservationSystem reservationSystem; private ReservationInputValidator rValidator; + private SubmissionContextPreProcessor contextPreProcessor; + private boolean filterAppsByUser = false; private static final EnumSet ACTIVE_APP_STATES = EnumSet.of( @@ -297,6 +300,14 @@ public class ClientRMService extends AbstractService implements server.getListenerAddress()); this.timelineServiceV2Enabled = YarnConfiguration. timelineServiceV2Enabled(conf); + + if (conf.getBoolean( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, + YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) { + this.contextPreProcessor = new SubmissionContextPreProcessor(); + this.contextPreProcessor.start(conf); + } + super.serviceStart(); } @@ -305,6 +316,9 @@ public class ClientRMService extends AbstractService implements if (this.server != null) { this.server.stop(); } + if (this.contextPreProcessor != null) { + this.contextPreProcessor.stop(); + } super.serviceStop(); } @@ -316,6 +330,11 @@ public class ClientRMService extends AbstractService implements YarnConfiguration.DEFAULT_RM_PORT); } + @VisibleForTesting + SubmissionContextPreProcessor getContextPreProcessor() { + return this.contextPreProcessor; + } + @Private public InetSocketAddress getBindAddress() { return clientBindAddress; @@ -647,6 +666,11 @@ public class ClientRMService extends AbstractService implements checkReservationACLs(submissionContext.getQueue(), AuditConstants .SUBMIT_RESERVATION_REQUEST, reservationId); + if (this.contextPreProcessor != null) { + this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(), + applicationId, submissionContext); + } + try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java new file mode 100644 index 00000000000..b33069cd8b1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; + + + +/** + * This is the interface providing functionality to process + * application submission context. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface ContextProcessor { + /** + * It will enrich the application submission context with value provided. + * @param host Address of the host from where application launched. + * @param value Value to be filled in ApplicationSubmissionContext. + * @param applicationId Application Id of the application. + * @param submissionContext Context of the application. + */ + void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java new file mode 100644 index 00000000000..30ea4228893 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; + +/** + * Processor will add the node label to application submission context. + */ +class NodeLabelProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + submissionContext.setNodeLabelExpression(value); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java new file mode 100644 index 00000000000..a47dcfed9e6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; + + +/** + * Processor will add queue to application submission context. + */ +class QueueProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + submissionContext.setQueue(value); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java new file mode 100644 index 00000000000..68cc4cf1e20 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Pre process the ApplicationSubmissionContext with server side info. + */ +public class SubmissionContextPreProcessor { + + private static final Logger LOG = LoggerFactory.getLogger( + SubmissionContextPreProcessor.class); + private static final String DEFAULT_COMMANDS = "*"; + private static final int INITIAL_DELAY = 1000; + + enum ContextProp { + // Node label Expression + NL(new NodeLabelProcessor()), + // Queue + Q(new QueueProcessor()), + // Tag Add + TA(new TagAddProcessor()); + + private ContextProcessor cp; + ContextProp(ContextProcessor cp) { + this.cp = cp; + } + } + + private String hostsFilePath; + private volatile long lastModified = -1; + private volatile Map> hostCommands = + new HashMap<>(); + private ScheduledExecutorService executorService; + + public void start(Configuration conf) { + this.hostsFilePath = + conf.get( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, + YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH); + int refreshPeriod = + conf.getInt( + YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS, + YarnConfiguration. + DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS); + + LOG.info("Submission Context Preprocessor enabled: file=[{}], " + + "interval=[{}]", this.hostsFilePath, refreshPeriod); + + executorService = Executors.newSingleThreadScheduledExecutor(); + Runnable refreshConf = new Runnable() { + @Override + public void run() { + try { + refresh(); + } catch (Exception ex) { + LOG.error("Error while refreshing Submission PreProcessor file [{}]", + hostsFilePath, ex); + } + } + }; + if (refreshPeriod > 0) { + executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY, + refreshPeriod, TimeUnit.MILLISECONDS); + } else { + executorService.schedule(refreshConf, INITIAL_DELAY, + TimeUnit.MILLISECONDS); + } + } + + public void stop() { + if (this.executorService != null) { + this.executorService.shutdownNow(); + } + } + + public void preProcess(String host, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + Map cMap = hostCommands.get(host); + + // Try regex match + if (cMap == null) { + for (Map.Entry> entry : + hostCommands.entrySet()) { + if (entry.getKey().equals(DEFAULT_COMMANDS)) { + continue; + } + try { + Pattern p = Pattern.compile(entry.getKey()); + Matcher m = p.matcher(host); + if (m.find()) { + cMap = hostCommands.get(entry.getKey()); + } + } catch (PatternSyntaxException exception) { + LOG.warn("Invalid regex pattern: " + entry.getKey()); + } + } + } + // Set to default value + if (cMap == null) { + cMap = hostCommands.get(DEFAULT_COMMANDS); + } + if (cMap != null) { + for (Map.Entry entry : cMap.entrySet()) { + entry.getKey().cp.process(host, entry.getValue(), + applicationId, submissionContext); + } + } + } + + @VisibleForTesting + public void refresh() throws Exception { + if (null == hostsFilePath || hostsFilePath.isEmpty()) { + LOG.warn("Host list file path [{}] is empty or does not exist !!", + hostsFilePath); + } else { + File hostFile = new File(hostsFilePath); + if (!hostFile.exists() || !hostFile.isFile()) { + LOG.warn("Host list file [{}] does not exist or is not a file !!", + hostFile); + } else if (hostFile.lastModified() <= lastModified) { + LOG.debug("Host list file [{}] has not been modified from last refresh", + hostFile); + } else { + FileInputStream fileInputStream = new FileInputStream(hostFile); + BufferedReader reader = null; + Map> tempHostCommands = + new HashMap<>(); + try { + reader = new BufferedReader(new InputStreamReader(fileInputStream, + StandardCharsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + // Lines should start with hostname and be followed with commands. + // Delimiter is any contiguous sequence of space or tab character. + // Commands are of the form: + // = + // where KEY can be 'NL', 'Q' or 'TA' (more can be added later) + // (TA stands for 'Tag Add') + // Sample lines: + // ... + // host1 NL=foo Q=b + // host2 Q=c NL=bar + // ... + String[] commands = line.split("[ \t\n\f\r]+"); + if (commands != null && commands.length > 1) { + String host = commands[0].trim(); + if (host.startsWith("#")) { + // All lines starting with # is a comment + continue; + } + Map cMap = null; + for (int i = 1; i < commands.length; i++) { + String[] cSplit = commands[i].split("="); + if (cSplit == null || cSplit.length != 2) { + LOG.error("No commands found for line [{}]", commands[i]); + continue; + } + if (cMap == null) { + cMap = new HashMap<>(); + } + cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]); + } + if (cMap != null && cMap.size() > 0) { + tempHostCommands.put(host, cMap); + LOG.info("Following commands registered for host[{}] : {}", + host, cMap); + } + } + } + lastModified = hostFile.lastModified(); + } catch (Exception ex) { + // Do not commit the new map if we have an Exception.. + tempHostCommands = null; + throw ex; + } finally { + if (tempHostCommands != null && tempHostCommands.size() > 0) { + hostCommands = tempHostCommands; + } + IOUtils.cleanupWithLogger(LOG, reader, fileInputStream); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java new file mode 100644 index 00000000000..22bf80544af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; + + + +/** + * This processor will add the tag to application submission context. + */ +class TagAddProcessor implements ContextProcessor { + @Override + public void process(String host, String value, ApplicationId applicationId, + ApplicationSubmissionContext submissionContext) { + Set applicationTags = submissionContext.getApplicationTags(); + if (applicationTags == null) { + applicationTags = new HashSet<>(); + } else { + applicationTags = new HashSet<>(applicationTags); + } + applicationTags.add(value); + submissionContext.setApplicationTags(applicationTags); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java new file mode 100644 index 00000000000..70648b70bdc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to pre process the application submission + * context with server side configs. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 82e105b7653..b0e2502c054 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -30,10 +30,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.security.AccessControlException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -52,6 +56,7 @@ import java.util.concurrent.CyclicBarrier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -179,6 +184,7 @@ public class TestClientRMService { private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; + private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo"; @Test public void testGetDecommissioningClusterNodes() throws Exception { @@ -955,6 +961,178 @@ public class TestClientRMService { Assert.assertEquals(0, applications1.size()); } + @Test (timeout = 30000) + @SuppressWarnings ("rawtypes") + public void testAppSubmitWithSubmissionPreProcessor() throws Exception { + YarnScheduler scheduler = mockYarnScheduler(); + RMContext rmContext = mock(RMContext.class); + mockRMContext(scheduler, rmContext); + YarnConfiguration yConf = new YarnConfiguration(); + yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, + true); + yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + // Override the YARN configuration. + when(rmContext.getYarnConfiguration()).thenReturn(yConf); + RMStateStore stateStore = mock(RMStateStore.class); + when(rmContext.getStateStore()).thenReturn(stateStore); + RMAppManager appManager = new RMAppManager(rmContext, scheduler, + null, mock(ApplicationACLsManager.class), new Configuration()); + when(rmContext.getDispatcher().getEventHandler()).thenReturn( + new EventHandler() { + public void handle(Event event) {} + }); + ApplicationId appId1 = getApplicationId(100); + ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); + when( + mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), + ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); + + QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); + when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), + any(QueueACL.class), any(RMApp.class), any(String.class), + any())) + .thenReturn(true); + + ClientRMService rmService = + new ClientRMService(rmContext, scheduler, appManager, + mockAclsManager, mockQueueACLsManager, null); + File rulesFile = File.createTempFile("submission_rules", ".tmp"); + rulesFile.deleteOnExit(); + rulesFile.createNewFile(); + + yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, + rulesFile.getAbsolutePath()); + rmService.serviceInit(yConf); + rmService.serviceStart(); + + BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile)); + writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1"); + writer.newLine(); + writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2"); + writer.newLine(); + writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); + writer.newLine(); + writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); + writer.newLine(); + writer.write("* TA=cluster:other Q=default NL=barfoo"); + writer.newLine(); + writer.write("host.testcluster1.com Q=default"); + writer.flush(); + writer.close(); + rmService.getContextPreProcessor().refresh(); + setupCurrentCall("host.cluster1.com"); + SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( + appId1, null, null); + try { + rmService.submitApplication(submitRequest1); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app1 = rmContext.getRMApps().get(appId1); + Assert.assertNotNull("app doesn't exist", app1); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName()); + Assert.assertTrue("custom tag not present", + app1.getApplicationTags().contains("cluster:cluster1")); + Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue()); + Assert.assertEquals("app node label doesn't match", + "foo", app1.getApplicationSubmissionContext().getNodeLabelExpression()); + setupCurrentCall("host.cluster2.com"); + ApplicationId appId2 = getApplicationId(101); + SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( + appId2, null, null); + submitRequest2.getApplicationSubmissionContext().setApplicationType( + "matchType"); + Set aTags = new HashSet(); + aTags.add(APPLICATION_TAG_SC_PREPROCESSOR); + submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags); + try { + rmService.submitApplication(submitRequest2); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app2 = rmContext.getRMApps().get(appId2); + Assert.assertNotNull("app doesn't exist", app2); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName()); + Assert.assertTrue("client tag not present", + app2.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR)); + Assert.assertTrue("custom tag not present", + app2.getApplicationTags().contains("cluster:cluster2")); + Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue()); + Assert.assertEquals("app node label doesn't match", + "zuess", + app2.getApplicationSubmissionContext().getNodeLabelExpression()); + // Test Default commands + setupCurrentCall("host2.cluster3.com"); + ApplicationId appId3 = getApplicationId(102); + SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest( + appId3, null, null); + submitRequest3.getApplicationSubmissionContext().setApplicationType( + "matchType"); + submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags); + try { + rmService.submitApplication(submitRequest3); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app3 = rmContext.getRMApps().get(appId3); + Assert.assertNotNull("app doesn't exist", app3); + Assert.assertEquals("app name doesn't match", + YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName()); + Assert.assertTrue("client tag not present", + app3.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR)); + Assert.assertTrue("custom tag not present", + app3.getApplicationTags().contains("cluster:other")); + Assert.assertEquals("app queue doesn't match", "default", app3.getQueue()); + Assert.assertEquals("app node label doesn't match", + "barfoo", + app3.getApplicationSubmissionContext().getNodeLabelExpression()); + // Test regex + setupCurrentCall("host.cluster100.com"); + ApplicationId appId4 = getApplicationId(103); + SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest( + appId4, null, null); + try { + rmService.submitApplication(submitRequest4); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app4 = rmContext.getRMApps().get(appId4); + Assert.assertTrue("custom tag not present", + app4.getApplicationTags().contains("cluster:reg")); + Assert.assertEquals("app node label doesn't match", + "reg", app4.getApplicationSubmissionContext().getNodeLabelExpression()); + testSubmissionContextWithAbsentTAG(rmService, rmContext); + rmService.serviceStop(); + } + + private void testSubmissionContextWithAbsentTAG(ClientRMService rmService, + RMContext rmContext) throws Exception { + setupCurrentCall("host.testcluster1.com"); + ApplicationId appId5 = getApplicationId(104); + SubmitApplicationRequest submitRequest5 = mockSubmitAppRequest( + appId5, null, null); + try { + rmService.submitApplication(submitRequest5); + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } + RMApp app5 = rmContext.getRMApps().get(appId5); + Assert.assertEquals("custom tag present", + app5.getApplicationTags().size(), 0); + Assert.assertNull("app node label present", + app5.getApplicationSubmissionContext().getNodeLabelExpression()); + Assert.assertEquals("Queue name is not present", + app5.getQueue(), "default"); + } + private void setupCurrentCall(String hostName) throws UnknownHostException { + Server.Call mockCall = mock(Server.Call.class); + when(mockCall.getHostInetAddress()).thenReturn( + InetAddress.getByAddress(hostName, + new byte[]{123, 123, 123, 123})); + Server.getCurCall().set(mockCall); + } @Test (timeout = 30000) @SuppressWarnings ("rawtypes") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java new file mode 100644 index 00000000000..039e794eddb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +/** + * This class will test the functionality of all the three + * processor(Node, Queue, Tag) together on same + * ApplicationSubmissionContext. + */ +public class TestContextProcessor { + @Test + public void testContextProcessor() { + Map contextProcessorsAndValues = + new HashMap<>(); + contextProcessorsAndValues.put(new NodeLabelProcessor(), "foo"); + contextProcessorsAndValues.put(new QueueProcessor(), "queue1"); + contextProcessorsAndValues.put(new TagAddProcessor(), "cluster:cluster1"); + ApplicationId app = ApplicationId.newInstance(123456, 111); + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + for(Map.Entry entry : + contextProcessorsAndValues.entrySet()){ + entry.getKey().process("host.cluster2.com", entry.getValue(), + app, applicationSubmissionContext); + } + Set applicationTags =new HashSet(); + applicationTags.add("cluster:cluster1"); + verify(applicationSubmissionContext, times(1)) + .setNodeLabelExpression("foo"); + verify(applicationSubmissionContext, times(1)) + .setQueue("queue1"); + verify(applicationSubmissionContext, times(1)) + .setApplicationTags(applicationTags); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java new file mode 100644 index 00000000000..bb55b72ce5b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +/** + * This class will test the functionality of NodeLabelProcessor. + */ +public class TestNodeLabelProcessor { + + @Test + public void testNodeLabelProcessor() { + ContextProcessor nodeLabelProcessor = new NodeLabelProcessor(); + ApplicationId app = ApplicationId.newInstance(123456, 111); + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(applicationSubmissionContext.getApplicationId()).thenReturn(app); + nodeLabelProcessor.process("host.cluster2.com", "foo", app, + applicationSubmissionContext); + verify(applicationSubmissionContext, times(1)) + .setNodeLabelExpression("foo"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java new file mode 100644 index 00000000000..fc032bb1497 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +/** + * This class will test the functionality of QueueProcessor. + */ +public class TestQueueProcessor { + @Test + public void testQueueProcessor() { + ContextProcessor queueProcessor = new QueueProcessor(); + ApplicationId app = ApplicationId.newInstance(123456, 111); + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(applicationSubmissionContext.getApplicationId()).thenReturn(app); + queueProcessor.process("host.cluster2.com", "queue1", + app, applicationSubmissionContext); + verify(applicationSubmissionContext, times(1)).setQueue("queue1"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java new file mode 100644 index 00000000000..abe06740450 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.preprocessor; + +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.junit.Test; +import static org.mockito.Mockito.*; + + +/** + * This class will test the functionality of TagAddProcessor. + */ +public class TestTagAddProcessor { + @Test + public void testTagAddProcessor() { + ContextProcessor tagAddProcessor = new TagAddProcessor(); + ApplicationId app = ApplicationId.newInstance(123456, 111); + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(applicationSubmissionContext.getApplicationId()).thenReturn(app); + tagAddProcessor.process("host.cluster2.com", + "cluster:cluster1", app, applicationSubmissionContext); + Set applicationTags = new HashSet(); + applicationTags.add("cluster:cluster1"); + verify(applicationSubmissionContext, times(1)) + .setApplicationTags(applicationTags); + } +}