diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 324694638b9..4a3a6666327 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -430,6 +430,8 @@ Release 2.8.0 - UNRELEASED
YARN-4145. Make RMHATestBase abstract so its not run when running all
tests under that namespace (adhoot via rkanter)
+ YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9ec25ae64de..cc4f5defdda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2025,6 +2025,15 @@ public class YarnConfiguration extends Configuration {
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
+ public static final String AM_BLACKLISTING_ENABLED =
+ YARN_PREFIX + "am.blacklisting.enabled";
+ public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
+
+ public static final String AM_BLACKLISTING_DISABLE_THRESHOLD =
+ YARN_PREFIX + "am.blacklisting.disable-failure-threshold";
+ public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
+
+
public YarnConfiguration() {
super();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b76defb712d..bcd64c3e124 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2293,4 +2293,22 @@
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor
+
+
+ Enable/disable blacklisting of hosts for AM based on AM failures on those
+ hosts.
+
+ yarn.am.blacklisting.enabled
+ true
+
+
+
+
+ Threshold of ratio number of NodeManager hosts that are allowed to be
+ blacklisted for AM. Beyond this ratio there is no blacklisting to avoid
+ danger of blacklisting the entire cluster.
+
+ yarn.am.blacklisting.disable-failure-threshold
+ 0.8f
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java
new file mode 100644
index 00000000000..f03b4217e24
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistManager.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Tracks blacklists based on failures reported on nodes.
+ */
+@Private
+public interface BlacklistManager {
+
+ /**
+ * Report failure of a container on node.
+ * @param node that has a container failure
+ */
+ void addNode(String node);
+
+ /**
+ * Get {@link BlacklistUpdates} that indicate which nodes should be
+ * added or to removed from the blacklist.
+ * @return {@link BlacklistUpdates}
+ */
+ BlacklistUpdates getBlacklistUpdates();
+
+ /**
+ * Refresh the number of nodemanager hosts available for scheduling.
+ * @param nodeHostCount is the number of node hosts.
+ */
+ void refreshNodeHostCount(int nodeHostCount);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistUpdates.java
new file mode 100644
index 00000000000..c76dfb4d538
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/BlacklistUpdates.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+import java.util.List;
+
+/**
+ * Class to track blacklist additions and removals.
+ */
+@Private
+public class BlacklistUpdates {
+
+ private List additions;
+ private List removals;
+
+ public BlacklistUpdates(List additions,
+ List removals) {
+ this.additions = additions;
+ this.removals = removals;
+ }
+
+ public List getAdditions() {
+ return additions;
+ }
+
+ public List getRemovals() {
+ return removals;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java
new file mode 100644
index 00000000000..f155b45aa50
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/DisabledBlacklistManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import java.util.ArrayList;
+
+/**
+ * A {@link BlacklistManager} that returns no blacklists.
+ */
+public class DisabledBlacklistManager implements BlacklistManager{
+
+ private static final ArrayList EMPTY_LIST = new ArrayList();
+ private BlacklistUpdates noBlacklist =
+ new BlacklistUpdates(EMPTY_LIST, EMPTY_LIST);
+
+ @Override
+ public void addNode(String node) {
+ }
+
+ @Override
+ public BlacklistUpdates getBlacklistUpdates() {
+ return noBlacklist;
+ }
+
+ @Override
+ public void refreshNodeHostCount(int nodeHostCount) {
+ // Do nothing
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java
new file mode 100644
index 00000000000..a544ab88e89
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/SimpleBlacklistManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Maintains a list of failed nodes and returns that as long as number of
+ * blacklisted nodes is below a threshold percentage of total nodes. If more
+ * than threshold number of nodes are marked as failure they all are returned
+ * as removal from blacklist so previous additions are reversed.
+ */
+public class SimpleBlacklistManager implements BlacklistManager {
+
+ private int numberOfNodeManagerHosts;
+ private final double blacklistDisableFailureThreshold;
+ private final Set blacklistNodes = new HashSet<>();
+ private static final ArrayList EMPTY_LIST = new ArrayList<>();
+
+ private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class);
+
+ public SimpleBlacklistManager(int numberOfNodeManagerHosts,
+ double blacklistDisableFailureThreshold) {
+ this.numberOfNodeManagerHosts = numberOfNodeManagerHosts;
+ this.blacklistDisableFailureThreshold = blacklistDisableFailureThreshold;
+ }
+
+ @Override
+ public void addNode(String node) {
+ blacklistNodes.add(node);
+ }
+
+ @Override
+ public void refreshNodeHostCount(int nodeHostCount) {
+ this.numberOfNodeManagerHosts = nodeHostCount;
+ }
+
+ @Override
+ public BlacklistUpdates getBlacklistUpdates() {
+ BlacklistUpdates ret;
+ List blacklist = new ArrayList<>(blacklistNodes);
+ final int currentBlacklistSize = blacklist.size();
+ final double failureThreshold = this.blacklistDisableFailureThreshold *
+ numberOfNodeManagerHosts;
+ if (currentBlacklistSize < failureThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blacklist size " + currentBlacklistSize + " is less than " +
+ "failure threshold ratio " + blacklistDisableFailureThreshold +
+ " out of total usable nodes " + numberOfNodeManagerHosts);
+ }
+ ret = new BlacklistUpdates(blacklist, EMPTY_LIST);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blacklist size " + currentBlacklistSize + " is more than " +
+ "failure threshold ratio " + blacklistDisableFailureThreshold +
+ " out of total usable nodes " + numberOfNodeManagerHosts);
+ }
+ ret = new BlacklistUpdates(EMPTY_LIST, blacklist);
+ }
+ return ret;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2eb74f7829a..7cf39b88855 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -74,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -133,6 +136,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Set applicationTags;
private final long attemptFailuresValidityInterval;
+ private final boolean amBlacklistingEnabled;
+ private final float blacklistDisableThreshold;
private Clock systemClock;
@@ -456,6 +461,18 @@ public class RMAppImpl implements RMApp, Recoverable {
maxLogAggregationDiagnosticsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+
+ amBlacklistingEnabled = conf.getBoolean(
+ YarnConfiguration.AM_BLACKLISTING_ENABLED,
+ YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
+
+ if (amBlacklistingEnabled) {
+ blacklistDisableThreshold = conf.getFloat(
+ YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+ YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
+ } else {
+ blacklistDisableThreshold = 0.0f;
+ }
}
@Override
@@ -797,6 +814,18 @@ public class RMAppImpl implements RMApp, Recoverable {
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
+
+ BlacklistManager currentAMBlacklist;
+ if (currentAttempt != null) {
+ currentAMBlacklist = currentAttempt.getAMBlacklist();
+ } else {
+ if (amBlacklistingEnabled) {
+ currentAMBlacklist = new SimpleBlacklistManager(
+ scheduler.getNumClusterNodes(), blacklistDisableThreshold);
+ } else {
+ currentAMBlacklist = new DisabledBlacklistManager();
+ }
+ }
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
@@ -804,7 +833,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.
- maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
+ maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
+ currentAMBlacklist);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index b85174efcf6..4dd834580b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
@@ -184,6 +185,12 @@ public interface RMAppAttempt extends EventHandler {
*/
ApplicationResourceUsageReport getApplicationResourceUsageReport();
+ /**
+ * Get the {@link BlacklistManager} that manages blacklists for AM failures
+ * @return the {@link BlacklistManager} that tracks AM failures.
+ */
+ BlacklistManager getAMBlacklist();
+
/**
* the start time of the application.
* @return the start time of the application.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 74a4000f08d..629b2a3f9e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -36,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -71,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -182,6 +184,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private RMAppAttemptMetrics attemptMetrics = null;
private ResourceRequest amReq = null;
+ private BlacklistManager blacklistedNodesForAM = null;
private static final StateMachineFactory());
appAttempt.finishedContainersSentToAM.get(nodeId).add(
@@ -1708,6 +1747,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
}
+ private void addAMNodeToBlackList(NodeId nodeId) {
+ blacklistedNodesForAM.addNode(nodeId.getHost().toString());
+ }
+
+ @Override
+ public BlacklistManager getAMBlacklist() {
+ return blacklistedNodesForAM;
+ }
+
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 77ac5b3e640..e318d473df1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -65,7 +65,8 @@ public class AppSchedulingInfo {
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
final Map> requests =
new ConcurrentHashMap>();
- private Set blacklist = new HashSet();
+ private Set userBlacklist = new HashSet<>();
+ private Set amBlacklist = new HashSet<>();
//private final ApplicationStore store;
private ActiveUsersManager activeUsersManager;
@@ -217,21 +218,39 @@ public class AppSchedulingInfo {
}
/**
- * The ApplicationMaster is updating the blacklist
+ * The ApplicationMaster is updating the userBlacklist used for containers
+ * other than AMs.
*
- * @param blacklistAdditions resources to be added to the blacklist
- * @param blacklistRemovals resources to be removed from the blacklist
+ * @param blacklistAdditions resources to be added to the userBlacklist
+ * @param blacklistRemovals resources to be removed from the userBlacklist
*/
- synchronized public void updateBlacklist(
+ public void updateBlacklist(
List blacklistAdditions, List blacklistRemovals) {
- // Add to blacklist
- if (blacklistAdditions != null) {
- blacklist.addAll(blacklistAdditions);
- }
+ updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
+ blacklistRemovals);
+ }
- // Remove from blacklist
- if (blacklistRemovals != null) {
- blacklist.removeAll(blacklistRemovals);
+ /**
+ * RM is updating blacklist for AM containers.
+ * @param blacklistAdditions resources to be added to the amBlacklist
+ * @param blacklistRemovals resources to be added to the amBlacklist
+ */
+ public void updateAMBlacklist(
+ List blacklistAdditions, List blacklistRemovals) {
+ updateUserOrAMBlacklist(amBlacklist, blacklistAdditions,
+ blacklistRemovals);
+ }
+
+ void updateUserOrAMBlacklist(Set blacklist,
+ List blacklistAdditions, List blacklistRemovals) {
+ synchronized (blacklist) {
+ if (blacklistAdditions != null) {
+ blacklist.addAll(blacklistAdditions);
+ }
+
+ if (blacklistRemovals != null) {
+ blacklist.removeAll(blacklistRemovals);
+ }
}
}
@@ -263,8 +282,23 @@ public class AppSchedulingInfo {
return (request == null) ? null : request.getCapability();
}
- public synchronized boolean isBlacklisted(String resourceName) {
- return blacklist.contains(resourceName);
+ /**
+ * Returns if the node is either blacklisted by the user or the system
+ * @param resourceName the resourcename
+ * @param useAMBlacklist true if it should check amBlacklist
+ * @return true if its blacklisted
+ */
+ public boolean isBlacklisted(String resourceName,
+ boolean useAMBlacklist) {
+ if (useAMBlacklist){
+ synchronized (amBlacklist) {
+ return amBlacklist.contains(resourceName);
+ }
+ } else {
+ synchronized (userBlacklist) {
+ return userBlacklist.contains(resourceName);
+ }
+ }
}
/**
@@ -473,19 +507,25 @@ public class AppSchedulingInfo {
this.queue = queue;
}
- public synchronized Set getBlackList() {
- return this.blacklist;
+ public Set getBlackList() {
+ return this.userBlacklist;
}
- public synchronized Set getBlackListCopy() {
- return new HashSet<>(this.blacklist);
+ public Set getBlackListCopy() {
+ synchronized (userBlacklist) {
+ return new HashSet<>(this.userBlacklist);
+ }
}
public synchronized void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) {
// this.priorities = appInfo.getPriorities();
// this.requests = appInfo.getRequests();
- this.blacklist = appInfo.getBlackList();
+ // This should not require locking the userBlacklist since it will not be
+ // used by this instance until after setCurrentAppAttempt.
+ // Should cleanup this to avoid sharing between instances and can
+ // then remove getBlacklist as well.
+ this.userBlacklist = appInfo.getBlackList();
}
public synchronized void recoverContainer(RMContainer rmContainer) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 48725435c09..b361d15362e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -470,16 +470,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
RMContainer rmContainer = i.next();
Container container = rmContainer.getContainer();
ContainerType containerType = ContainerType.TASK;
- // The working knowledge is that masterContainer for AM is null as it
- // itself is the master container.
- RMAppAttempt appAttempt =
- rmContext
- .getRMApps()
- .get(
- container.getId().getApplicationAttemptId()
- .getApplicationId()).getCurrentAppAttempt();
- if (appAttempt.getMasterContainer() == null
- && appAttempt.getSubmissionContext().getUnmanagedAM() == false) {
+ boolean isWaitingForAMContainer = isWaitingForAMContainer(
+ container.getId().getApplicationAttemptId().getApplicationId());
+ if (isWaitingForAMContainer) {
containerType = ContainerType.APPLICATION_MASTER;
}
try {
@@ -509,6 +502,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
}
+ public boolean isWaitingForAMContainer(ApplicationId applicationId) {
+ // The working knowledge is that masterContainer for AM is null as it
+ // itself is the master container.
+ RMAppAttempt appAttempt =
+ rmContext.getRMApps().get(applicationId).getCurrentAppAttempt();
+ return (appAttempt != null && appAttempt.getMasterContainer() == null
+ && appAttempt.getSubmissionContext().getUnmanagedAM() == false);
+ }
+
+ // Blacklist used for user containers
public synchronized void updateBlacklist(
List blacklistAdditions, List blacklistRemovals) {
if (!isStopped) {
@@ -516,9 +519,19 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
blacklistAdditions, blacklistRemovals);
}
}
-
+
+ // Blacklist used for AM containers
+ public synchronized void updateAMBlacklist(
+ List blacklistAdditions, List blacklistRemovals) {
+ if (!isStopped) {
+ this.appSchedulingInfo.updateAMBlacklist(
+ blacklistAdditions, blacklistRemovals);
+ }
+ }
+
public boolean isBlacklisted(String resourceName) {
- return this.appSchedulingInfo.isBlacklisted(resourceName);
+ boolean useAMBlacklist = isWaitingForAMContainer(getApplicationId());
+ return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
}
public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index a7e9d8cb1e0..dbaccaf3d07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -934,7 +933,13 @@ public class CapacityScheduler extends
}
}
- application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ if (application.isWaitingForAMContainer(application.getApplicationId())) {
+ // Allocate is for AM and update AM blacklist for this
+ application.updateAMBlacklist(
+ blacklistAdditions, blacklistRemovals);
+ } else {
+ application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ }
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
deleted file mode 100644
index 9bece9ba50e..00000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
-
-import org.apache.commons.logging.Log;
-
-public class FiCaSchedulerUtils {
-
- public static boolean isBlacklisted(FiCaSchedulerApp application,
- FiCaSchedulerNode node, Log LOG) {
- if (application.isBlacklisted(node.getNodeName())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping 'host' " + node.getNodeName() +
- " for " + application.getApplicationId() +
- " since it has been blacklisted");
- }
- return true;
- }
-
- if (application.isBlacklisted(node.getRackName())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping 'rack' " + node.getRackName() +
- " for " + application.getApplicationId() +
- " since it has been blacklisted");
- }
- return true;
- }
-
- return false;
- }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3eefb8f7286..5243fb3a144 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -955,7 +955,14 @@ public class FairScheduler extends
preemptionContainerIds.add(container.getContainerId());
}
- application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ if (application.isWaitingForAMContainer(application.getApplicationId())) {
+ // Allocate is for AM and update AM blacklist for this
+ application.updateAMBlacklist(
+ blacklistAdditions, blacklistRemovals);
+ } else {
+ application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ }
+
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 6b77ceb8dc9..99760df671e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -352,11 +352,18 @@ public class FifoScheduler extends
application.showRequests();
LOG.debug("allocate:" +
- " applicationId=" + applicationAttemptId +
+ " applicationId=" + applicationAttemptId +
" #ask=" + ask.size());
}
- application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ if (application.isWaitingForAMContainer(application.getApplicationId())) {
+ // Allocate is for AM and update AM blacklist for this
+ application.updateAMBlacklist(
+ blacklistAdditions, blacklistRemovals);
+ } else {
+ application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ }
+
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
Resource headroom = application.getHeadroom();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 50803550af5..e464401387c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -750,10 +750,7 @@ public class MockRM extends ResourceManager {
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
- rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
- RMAppAttempt attempt = app.getCurrentAppAttempt();
- waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
- rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
+ RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
@@ -761,6 +758,15 @@ public class MockRM extends ResourceManager {
return am;
}
+ public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm)
+ throws Exception {
+ rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
+ return attempt;
+ }
+
public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
MockAM am = launchAM(app, rm, nm);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index d579595113d..dc843b9ee98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -49,11 +53,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -82,21 +89,7 @@ public class TestAMRestart {
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 3;
- // allocate NUM_CONTAINERS containers
- am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
- new ArrayList());
- nm1.nodeHeartbeat(true);
-
- // wait for containers to be allocated.
- List containers =
- am1.allocate(new ArrayList(),
- new ArrayList()).getAllocatedContainers();
- while (containers.size() != NUM_CONTAINERS) {
- nm1.nodeHeartbeat(true);
- containers.addAll(am1.allocate(new ArrayList(),
- new ArrayList()).getAllocatedContainers());
- Thread.sleep(200);
- }
+ allocateContainers(nm1, am1, NUM_CONTAINERS);
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
@@ -244,6 +237,29 @@ public class TestAMRestart {
rm1.stop();
}
+ private List allocateContainers(MockNM nm1, MockAM am1,
+ int NUM_CONTAINERS) throws Exception {
+ // allocate NUM_CONTAINERS containers
+ am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
+ new ArrayList());
+ nm1.nodeHeartbeat(true);
+
+ // wait for containers to be allocated.
+ List containers =
+ am1.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers();
+ while (containers.size() != NUM_CONTAINERS) {
+ nm1.nodeHeartbeat(true);
+ containers.addAll(am1.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals("Did not get all containers allocated",
+ NUM_CONTAINERS, containers.size());
+ return containers;
+ }
+
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException {
int count = 0;
@@ -258,6 +274,9 @@ public class TestAMRestart {
public void testNMTokensRebindOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+ // To prevent test from blacklisting nm1 for AM, we sit threshold to half
+ // of 2 nodes which is 1
+ conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f);
MockRM rm1 = new MockRM(conf);
rm1.start();
@@ -355,6 +374,106 @@ public class TestAMRestart {
rm1.stop();
}
+ @Test(timeout = 100000)
+ public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ final DrainDispatcher dispatcher = new DrainDispatcher();
+ MockRM rm1 = new MockRM(conf, memStore) {
+ @Override
+ protected EventHandler createSchedulerEventDispatcher() {
+ return new SchedulerEventDispatcher(this.scheduler) {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ };
+ }
+
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+
+ rm1.start();
+
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ MockNM nm2 =
+ new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
+ nm2.registerNode();
+
+ RMApp app1 = rm1.submitApp(200);
+
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm1.getResourceScheduler();
+ ContainerId amContainer =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ // Preempt the first attempt;
+ RMContainer rmContainer = scheduler.getRMContainer(amContainer);
+ NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
+
+ MockNM currentNode, otherNode;
+ if (nodeWhereAMRan == nm1.getNodeId()) {
+ currentNode = nm1;
+ otherNode = nm2;
+ } else {
+ currentNode = nm2;
+ otherNode = nm1;
+ }
+
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
+ "", ContainerExitStatus.DISKS_FAILED);
+ currentNode.containerStatus(containerStatus);
+ am1.waitForState(RMAppAttemptState.FAILED);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+
+ // restart the am
+ RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1);
+ System.out.println("Launch AM " + attempt.getAppAttemptId());
+
+
+
+ currentNode.nodeHeartbeat(true);
+ dispatcher.await();
+ Assert.assertEquals(
+ "AppAttemptState should still be SCHEDULED if currentNode is " +
+ "blacklisted correctly",
+ RMAppAttemptState.SCHEDULED,
+ attempt.getAppAttemptState());
+
+ otherNode.nodeHeartbeat(true);
+ dispatcher.await();
+
+ MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
+ rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+
+ amContainer =
+ ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
+ rmContainer = scheduler.getRMContainer(amContainer);
+ nodeWhereAMRan = rmContainer.getAllocatedNode();
+ Assert.assertEquals(
+ "After blacklisting AM should have run on the other node",
+ otherNode.getNodeId(), nodeWhereAMRan);
+
+ am2.registerAppAttempt();
+ rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ List allocatedContainers =
+ allocateContainers(currentNode, am2, 1);
+ Assert.assertEquals(
+ "Even though AM is blacklisted from the node, application can still " +
+ "allocate containers there",
+ currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
+ }
+
// AM container preempted, nm disk failure
// should not be counted towards AM max retry count.
@Test(timeout = 100000)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java
new file mode 100644
index 00000000000..96b373f98c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/blacklist/TestBlacklistManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TestBlacklistManager {
+
+ @Test
+ public void testSimpleBlacklistBelowFailureThreshold() {
+ final int numberOfNodeManagerHosts = 3;
+ final double blacklistDisableFailureThreshold = 0.8;
+ BlacklistManager manager = new SimpleBlacklistManager(
+ numberOfNodeManagerHosts, blacklistDisableFailureThreshold);
+ String anyNode = "foo";
+ String anyNode2 = "bar";
+ manager.addNode(anyNode);
+ manager.addNode(anyNode2);
+ BlacklistUpdates blacklist = manager
+ .getBlacklistUpdates();
+
+ List blacklistAdditions = blacklist.getAdditions();
+ Collections.sort(blacklistAdditions);
+ List blacklistRemovals = blacklist.getRemovals();
+ String[] expectedBlacklistAdditions = new String[]{anyNode2, anyNode};
+ Assert.assertArrayEquals(
+ "Blacklist additions was not as expected",
+ expectedBlacklistAdditions,
+ blacklistAdditions.toArray());
+ Assert.assertTrue(
+ "Blacklist removals should be empty but was " +
+ blacklistRemovals,
+ blacklistRemovals.isEmpty());
+ }
+
+ @Test
+ public void testSimpleBlacklistAboveFailureThreshold() {
+ // Create a threshold of 0.5 * 3 i.e at 1.5 node failures.
+ BlacklistManager manager = new SimpleBlacklistManager(3, 0.5);
+ String anyNode = "foo";
+ String anyNode2 = "bar";
+ manager.addNode(anyNode);
+ BlacklistUpdates blacklist = manager
+ .getBlacklistUpdates();
+
+ List blacklistAdditions = blacklist.getAdditions();
+ Collections.sort(blacklistAdditions);
+ List blacklistRemovals = blacklist.getRemovals();
+ String[] expectedBlacklistAdditions = new String[]{anyNode};
+ Assert.assertArrayEquals(
+ "Blacklist additions was not as expected",
+ expectedBlacklistAdditions,
+ blacklistAdditions.toArray());
+ Assert.assertTrue(
+ "Blacklist removals should be empty but was " +
+ blacklistRemovals,
+ blacklistRemovals.isEmpty());
+
+ manager.addNode(anyNode2);
+
+ blacklist = manager
+ .getBlacklistUpdates();
+ blacklistAdditions = blacklist.getAdditions();
+ Collections.sort(blacklistAdditions);
+ blacklistRemovals = blacklist.getRemovals();
+ Collections.sort(blacklistRemovals);
+ String[] expectedBlacklistRemovals = new String[] {anyNode2, anyNode};
+ Assert.assertTrue(
+ "Blacklist additions should be empty but was " +
+ blacklistAdditions,
+ blacklistAdditions.isEmpty());
+ Assert.assertArrayEquals(
+ "Blacklist removals was not as expected",
+ expectedBlacklistRemovals,
+ blacklistRemovals.toArray());
+ }
+
+ @Test
+ public void testDisabledBlacklist() {
+ BlacklistManager disabled = new DisabledBlacklistManager();
+ String anyNode = "foo";
+ disabled.addNode(anyNode);
+ BlacklistUpdates blacklist = disabled
+ .getBlacklistUpdates();
+
+ List blacklistAdditions = blacklist.getAdditions();
+ List blacklistRemovals = blacklist.getRemovals();
+ Assert.assertTrue(
+ "Blacklist additions should be empty but was " +
+ blacklistAdditions,
+ blacklistAdditions.isEmpty());
+ Assert.assertTrue(
+ "Blacklist removals should be empty but was " +
+ blacklistRemovals,
+ blacklistRemovals.isEmpty());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index fccfa19e19f..484a1b627f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -489,7 +489,7 @@ public class TestRMAppLogAggregationStatus {
2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(this.appId, this.rmContext,
conf, "test", "test", "default", submissionContext,
- this.rmContext.getScheduler(),
+ scheduler,
this.rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 2e64d61b08a..a5e3308f565 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -970,7 +970,7 @@ public class TestRMAppTransitions {
appState.getApplicationSubmissionContext().getApplicationId(),
rmContext, conf,
submissionContext.getApplicationName(), null,
- submissionContext.getQueue(), submissionContext, null, null,
+ submissionContext.getQueue(), submissionContext, scheduler, null,
appState.getSubmitTime(), submissionContext.getApplicationType(),
submissionContext.getApplicationTags(),
BuilderUtils.newResourceRequest(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 44773be8ce2..76a1351e56d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -655,6 +656,11 @@ public class TestCapacityScheduler {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+ Container container = mock(Container.class);
+ when(attempt.getMasterContainer()).thenReturn(container);
+ ApplicationSubmissionContext submissionContext = mock(
+ ApplicationSubmissionContext.class);
+ when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
@@ -715,6 +721,11 @@ public class TestCapacityScheduler {
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+ Container container = mock(Container.class);
+ when(attempt1.getMasterContainer()).thenReturn(container);
+ ApplicationSubmissionContext submissionContext = mock(
+ ApplicationSubmissionContext.class);
+ when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
@@ -739,6 +750,8 @@ public class TestCapacityScheduler {
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
+ when(attempt2.getMasterContainer()).thenReturn(container);
+ when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
@@ -2876,6 +2889,11 @@ public class TestCapacityScheduler {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+ Container container = mock(Container.class);
+ when(attempt.getMasterContainer()).thenReturn(container);
+ ApplicationSubmissionContext submissionContext = mock(
+ ApplicationSubmissionContext.class);
+ when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
@@ -2953,6 +2971,11 @@ public class TestCapacityScheduler {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+ Container container = mock(Container.class);
+ when(attempt.getMasterContainer()).thenReturn(container);
+ ApplicationSubmissionContext submissionContext = mock(
+ ApplicationSubmissionContext.class);
+ when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
@@ -2976,6 +2999,8 @@ public class TestCapacityScheduler {
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
+ when(attempt2.getMasterContainer()).thenReturn(container);
+ when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 403c8ea313b..1c9801d7631 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -220,7 +220,7 @@ public class FairSchedulerTestBase {
ApplicationId appId = attId.getApplicationId();
RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
- queue, null, null, false, false, 0, amResource, null), null, null,
+ queue, null, null, false, false, 0, amResource, null), scheduler, null,
0, null, null, null);
rmContext.getRMApps().put(appId, rmApp);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);