YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)
(cherry picked from commit 81df7b586a
)
This commit is contained in:
parent
988749e6ea
commit
11e2fa151c
|
@ -378,6 +378,8 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4145. Make RMHATestBase abstract so its not run when running all
|
||||
tests under that namespace (adhoot via rkanter)
|
||||
|
||||
YARN-2005. Blacklisting support for scheduling AMs. (Anubhav Dhoot via kasha)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -2025,6 +2025,15 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
|
||||
|
||||
public static final String AM_BLACKLISTING_ENABLED =
|
||||
YARN_PREFIX + "am.blacklisting.enabled";
|
||||
public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
|
||||
|
||||
public static final String AM_BLACKLISTING_DISABLE_THRESHOLD =
|
||||
YARN_PREFIX + "am.blacklisting.disable-failure-threshold";
|
||||
public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
|
||||
|
||||
|
||||
public YarnConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
|
|
@ -2293,4 +2293,22 @@
|
|||
<value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Enable/disable blacklisting of hosts for AM based on AM failures on those
|
||||
hosts.
|
||||
</description>
|
||||
<name>yarn.am.blacklisting.enabled</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Threshold of ratio number of NodeManager hosts that are allowed to be
|
||||
blacklisted for AM. Beyond this ratio there is no blacklisting to avoid
|
||||
danger of blacklisting the entire cluster.
|
||||
</description>
|
||||
<name>yarn.am.blacklisting.disable-failure-threshold</name>
|
||||
<value>0.8f</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
||||
/**
|
||||
* Tracks blacklists based on failures reported on nodes.
|
||||
*/
|
||||
@Private
|
||||
public interface BlacklistManager {
|
||||
|
||||
/**
|
||||
* Report failure of a container on node.
|
||||
* @param node that has a container failure
|
||||
*/
|
||||
void addNode(String node);
|
||||
|
||||
/**
|
||||
* Get {@link BlacklistUpdates} that indicate which nodes should be
|
||||
* added or to removed from the blacklist.
|
||||
* @return {@link BlacklistUpdates}
|
||||
*/
|
||||
BlacklistUpdates getBlacklistUpdates();
|
||||
|
||||
/**
|
||||
* Refresh the number of nodemanager hosts available for scheduling.
|
||||
* @param nodeHostCount is the number of node hosts.
|
||||
*/
|
||||
void refreshNodeHostCount(int nodeHostCount);
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Class to track blacklist additions and removals.
|
||||
*/
|
||||
@Private
|
||||
public class BlacklistUpdates {
|
||||
|
||||
private List<String> additions;
|
||||
private List<String> removals;
|
||||
|
||||
public BlacklistUpdates(List<String> additions,
|
||||
List<String> removals) {
|
||||
this.additions = additions;
|
||||
this.removals = removals;
|
||||
}
|
||||
|
||||
public List<String> getAdditions() {
|
||||
return additions;
|
||||
}
|
||||
|
||||
public List<String> getRemovals() {
|
||||
return removals;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* A {@link BlacklistManager} that returns no blacklists.
|
||||
*/
|
||||
public class DisabledBlacklistManager implements BlacklistManager{
|
||||
|
||||
private static final ArrayList<String> EMPTY_LIST = new ArrayList<String>();
|
||||
private BlacklistUpdates noBlacklist =
|
||||
new BlacklistUpdates(EMPTY_LIST, EMPTY_LIST);
|
||||
|
||||
@Override
|
||||
public void addNode(String node) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlacklistUpdates getBlacklistUpdates() {
|
||||
return noBlacklist;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshNodeHostCount(int nodeHostCount) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Maintains a list of failed nodes and returns that as long as number of
|
||||
* blacklisted nodes is below a threshold percentage of total nodes. If more
|
||||
* than threshold number of nodes are marked as failure they all are returned
|
||||
* as removal from blacklist so previous additions are reversed.
|
||||
*/
|
||||
public class SimpleBlacklistManager implements BlacklistManager {
|
||||
|
||||
private int numberOfNodeManagerHosts;
|
||||
private final double blacklistDisableFailureThreshold;
|
||||
private final Set<String> blacklistNodes = new HashSet<>();
|
||||
private static final ArrayList<String> EMPTY_LIST = new ArrayList<>();
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(SimpleBlacklistManager.class);
|
||||
|
||||
public SimpleBlacklistManager(int numberOfNodeManagerHosts,
|
||||
double blacklistDisableFailureThreshold) {
|
||||
this.numberOfNodeManagerHosts = numberOfNodeManagerHosts;
|
||||
this.blacklistDisableFailureThreshold = blacklistDisableFailureThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addNode(String node) {
|
||||
blacklistNodes.add(node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshNodeHostCount(int nodeHostCount) {
|
||||
this.numberOfNodeManagerHosts = nodeHostCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlacklistUpdates getBlacklistUpdates() {
|
||||
BlacklistUpdates ret;
|
||||
List<String> blacklist = new ArrayList<>(blacklistNodes);
|
||||
final int currentBlacklistSize = blacklist.size();
|
||||
final double failureThreshold = this.blacklistDisableFailureThreshold *
|
||||
numberOfNodeManagerHosts;
|
||||
if (currentBlacklistSize < failureThreshold) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("blacklist size " + currentBlacklistSize + " is less than " +
|
||||
"failure threshold ratio " + blacklistDisableFailureThreshold +
|
||||
" out of total usable nodes " + numberOfNodeManagerHosts);
|
||||
}
|
||||
ret = new BlacklistUpdates(blacklist, EMPTY_LIST);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("blacklist size " + currentBlacklistSize + " is more than " +
|
||||
"failure threshold ratio " + blacklistDisableFailureThreshold +
|
||||
" out of total usable nodes " + numberOfNodeManagerHosts);
|
||||
}
|
||||
ret = new BlacklistUpdates(EMPTY_LIST, blacklist);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
|
@ -74,6 +74,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
|
@ -133,6 +136,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
private final Set<String> applicationTags;
|
||||
|
||||
private final long attemptFailuresValidityInterval;
|
||||
private final boolean amBlacklistingEnabled;
|
||||
private final float blacklistDisableThreshold;
|
||||
|
||||
private Clock systemClock;
|
||||
|
||||
|
@ -456,6 +461,18 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
maxLogAggregationDiagnosticsInMemory = conf.getInt(
|
||||
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
|
||||
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
|
||||
|
||||
amBlacklistingEnabled = conf.getBoolean(
|
||||
YarnConfiguration.AM_BLACKLISTING_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
|
||||
|
||||
if (amBlacklistingEnabled) {
|
||||
blacklistDisableThreshold = conf.getFloat(
|
||||
YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
|
||||
} else {
|
||||
blacklistDisableThreshold = 0.0f;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -797,6 +814,18 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
private void createNewAttempt() {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
|
||||
|
||||
BlacklistManager currentAMBlacklist;
|
||||
if (currentAttempt != null) {
|
||||
currentAMBlacklist = currentAttempt.getAMBlacklist();
|
||||
} else {
|
||||
if (amBlacklistingEnabled) {
|
||||
currentAMBlacklist = new SimpleBlacklistManager(
|
||||
scheduler.getNumClusterNodes(), blacklistDisableThreshold);
|
||||
} else {
|
||||
currentAMBlacklist = new DisabledBlacklistManager();
|
||||
}
|
||||
}
|
||||
RMAppAttempt attempt =
|
||||
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
||||
submissionContext, conf,
|
||||
|
@ -804,7 +833,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// previously failed attempts(which should not include Preempted,
|
||||
// hardware error and NM resync) + 1) equal to the max-attempt
|
||||
// limit.
|
||||
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
|
||||
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
|
||||
currentAMBlacklist);
|
||||
attempts.put(appAttemptId, attempt);
|
||||
currentAttempt = attempt;
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
|
||||
/**
|
||||
|
@ -184,6 +185,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
|||
*/
|
||||
ApplicationResourceUsageReport getApplicationResourceUsageReport();
|
||||
|
||||
/**
|
||||
* Get the {@link BlacklistManager} that manages blacklists for AM failures
|
||||
* @return the {@link BlacklistManager} that tracks AM failures.
|
||||
*/
|
||||
BlacklistManager getAMBlacklist();
|
||||
|
||||
/**
|
||||
* the start time of the application.
|
||||
* @return the start time of the application.
|
||||
|
|
|
@ -36,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -71,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
|
@ -182,6 +184,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
|
||||
private RMAppAttemptMetrics attemptMetrics = null;
|
||||
private ResourceRequest amReq = null;
|
||||
private BlacklistManager blacklistedNodesForAM = null;
|
||||
|
||||
private static final StateMachineFactory<RMAppAttemptImpl,
|
||||
RMAppAttemptState,
|
||||
|
@ -434,6 +437,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
ApplicationMasterService masterService,
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) {
|
||||
this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
|
||||
conf, maybeLastAttempt, amReq, new DisabledBlacklistManager());
|
||||
}
|
||||
|
||||
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
||||
RMContext rmContext, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService,
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq,
|
||||
BlacklistManager amBlacklist) {
|
||||
this.conf = conf;
|
||||
this.applicationAttemptId = appAttemptId;
|
||||
this.rmContext = rmContext;
|
||||
|
@ -454,6 +467,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
new RMAppAttemptMetrics(applicationAttemptId, rmContext);
|
||||
|
||||
this.amReq = amReq;
|
||||
this.blacklistedNodesForAM = amBlacklist;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -939,12 +953,25 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
|
||||
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
|
||||
appAttempt.amReq.setRelaxLocality(true);
|
||||
|
||||
|
||||
appAttempt.getAMBlacklist().refreshNodeHostCount(
|
||||
appAttempt.scheduler.getNumClusterNodes());
|
||||
|
||||
BlacklistUpdates amBlacklist = appAttempt.getAMBlacklist()
|
||||
.getBlacklistUpdates();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Using blacklist for AM: additions(" +
|
||||
amBlacklist.getAdditions() + ") and removals(" +
|
||||
amBlacklist.getRemovals() + ")");
|
||||
}
|
||||
// AM resource has been checked when submission
|
||||
Allocation amContainerAllocation =
|
||||
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
|
||||
appAttempt.scheduler.allocate(
|
||||
appAttempt.applicationAttemptId,
|
||||
Collections.singletonList(appAttempt.amReq),
|
||||
EMPTY_CONTAINER_RELEASE_LIST, null, null);
|
||||
EMPTY_CONTAINER_RELEASE_LIST,
|
||||
amBlacklist.getAdditions(),
|
||||
amBlacklist.getRemovals());
|
||||
if (amContainerAllocation != null
|
||||
&& amContainerAllocation.getContainers() != null) {
|
||||
assert (amContainerAllocation.getContainers().size() == 0);
|
||||
|
@ -1331,7 +1358,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class UnmanagedAMAttemptSavedTransition
|
||||
private boolean shouldCountTowardsNodeBlacklisting(int exitStatus) {
|
||||
return exitStatus == ContainerExitStatus.DISKS_FAILED;
|
||||
}
|
||||
|
||||
private static final class UnmanagedAMAttemptSavedTransition
|
||||
extends AMLaunchedTransition {
|
||||
@Override
|
||||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
|
@ -1694,6 +1725,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
|
||||
NodeId nodeId = containerFinishedEvent.getNodeId();
|
||||
if (containerFinishedEvent.getContainerStatus() != null) {
|
||||
if (shouldCountTowardsNodeBlacklisting(containerFinishedEvent
|
||||
.getContainerStatus().getExitStatus())) {
|
||||
appAttempt.addAMNodeToBlackList(containerFinishedEvent.getNodeId());
|
||||
}
|
||||
} else {
|
||||
LOG.warn("No ContainerStatus in containerFinishedEvent");
|
||||
}
|
||||
finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||
new ArrayList<ContainerStatus>());
|
||||
appAttempt.finishedContainersSentToAM.get(nodeId).add(
|
||||
|
@ -1708,6 +1747,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
private void addAMNodeToBlackList(NodeId nodeId) {
|
||||
blacklistedNodesForAM.addNode(nodeId.getHost().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlacklistManager getAMBlacklist() {
|
||||
return blacklistedNodesForAM;
|
||||
}
|
||||
|
||||
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
|
||||
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
|
||||
|
|
|
@ -65,7 +65,8 @@ public class AppSchedulingInfo {
|
|||
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
|
||||
final Map<Priority, Map<String, ResourceRequest>> requests =
|
||||
new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
|
||||
private Set<String> blacklist = new HashSet<String>();
|
||||
private Set<String> userBlacklist = new HashSet<>();
|
||||
private Set<String> amBlacklist = new HashSet<>();
|
||||
|
||||
//private final ApplicationStore store;
|
||||
private ActiveUsersManager activeUsersManager;
|
||||
|
@ -217,21 +218,39 @@ public class AppSchedulingInfo {
|
|||
}
|
||||
|
||||
/**
|
||||
* The ApplicationMaster is updating the blacklist
|
||||
* The ApplicationMaster is updating the userBlacklist used for containers
|
||||
* other than AMs.
|
||||
*
|
||||
* @param blacklistAdditions resources to be added to the blacklist
|
||||
* @param blacklistRemovals resources to be removed from the blacklist
|
||||
* @param blacklistAdditions resources to be added to the userBlacklist
|
||||
* @param blacklistRemovals resources to be removed from the userBlacklist
|
||||
*/
|
||||
synchronized public void updateBlacklist(
|
||||
public void updateBlacklist(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
// Add to blacklist
|
||||
if (blacklistAdditions != null) {
|
||||
blacklist.addAll(blacklistAdditions);
|
||||
}
|
||||
updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
|
||||
blacklistRemovals);
|
||||
}
|
||||
|
||||
// Remove from blacklist
|
||||
if (blacklistRemovals != null) {
|
||||
blacklist.removeAll(blacklistRemovals);
|
||||
/**
|
||||
* RM is updating blacklist for AM containers.
|
||||
* @param blacklistAdditions resources to be added to the amBlacklist
|
||||
* @param blacklistRemovals resources to be added to the amBlacklist
|
||||
*/
|
||||
public void updateAMBlacklist(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
updateUserOrAMBlacklist(amBlacklist, blacklistAdditions,
|
||||
blacklistRemovals);
|
||||
}
|
||||
|
||||
void updateUserOrAMBlacklist(Set<String> blacklist,
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
synchronized (blacklist) {
|
||||
if (blacklistAdditions != null) {
|
||||
blacklist.addAll(blacklistAdditions);
|
||||
}
|
||||
|
||||
if (blacklistRemovals != null) {
|
||||
blacklist.removeAll(blacklistRemovals);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -263,8 +282,23 @@ public class AppSchedulingInfo {
|
|||
return (request == null) ? null : request.getCapability();
|
||||
}
|
||||
|
||||
public synchronized boolean isBlacklisted(String resourceName) {
|
||||
return blacklist.contains(resourceName);
|
||||
/**
|
||||
* Returns if the node is either blacklisted by the user or the system
|
||||
* @param resourceName the resourcename
|
||||
* @param useAMBlacklist true if it should check amBlacklist
|
||||
* @return true if its blacklisted
|
||||
*/
|
||||
public boolean isBlacklisted(String resourceName,
|
||||
boolean useAMBlacklist) {
|
||||
if (useAMBlacklist){
|
||||
synchronized (amBlacklist) {
|
||||
return amBlacklist.contains(resourceName);
|
||||
}
|
||||
} else {
|
||||
synchronized (userBlacklist) {
|
||||
return userBlacklist.contains(resourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -473,19 +507,25 @@ public class AppSchedulingInfo {
|
|||
this.queue = queue;
|
||||
}
|
||||
|
||||
public synchronized Set<String> getBlackList() {
|
||||
return this.blacklist;
|
||||
public Set<String> getBlackList() {
|
||||
return this.userBlacklist;
|
||||
}
|
||||
|
||||
public synchronized Set<String> getBlackListCopy() {
|
||||
return new HashSet<>(this.blacklist);
|
||||
public Set<String> getBlackListCopy() {
|
||||
synchronized (userBlacklist) {
|
||||
return new HashSet<>(this.userBlacklist);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void transferStateFromPreviousAppSchedulingInfo(
|
||||
AppSchedulingInfo appInfo) {
|
||||
// this.priorities = appInfo.getPriorities();
|
||||
// this.requests = appInfo.getRequests();
|
||||
this.blacklist = appInfo.getBlackList();
|
||||
// This should not require locking the userBlacklist since it will not be
|
||||
// used by this instance until after setCurrentAppAttempt.
|
||||
// Should cleanup this to avoid sharing between instances and can
|
||||
// then remove getBlacklist as well.
|
||||
this.userBlacklist = appInfo.getBlackList();
|
||||
}
|
||||
|
||||
public synchronized void recoverContainer(RMContainer rmContainer) {
|
||||
|
|
|
@ -470,16 +470,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
RMContainer rmContainer = i.next();
|
||||
Container container = rmContainer.getContainer();
|
||||
ContainerType containerType = ContainerType.TASK;
|
||||
// The working knowledge is that masterContainer for AM is null as it
|
||||
// itself is the master container.
|
||||
RMAppAttempt appAttempt =
|
||||
rmContext
|
||||
.getRMApps()
|
||||
.get(
|
||||
container.getId().getApplicationAttemptId()
|
||||
.getApplicationId()).getCurrentAppAttempt();
|
||||
if (appAttempt.getMasterContainer() == null
|
||||
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false) {
|
||||
boolean isWaitingForAMContainer = isWaitingForAMContainer(
|
||||
container.getId().getApplicationAttemptId().getApplicationId());
|
||||
if (isWaitingForAMContainer) {
|
||||
containerType = ContainerType.APPLICATION_MASTER;
|
||||
}
|
||||
try {
|
||||
|
@ -509,6 +502,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
|
||||
}
|
||||
|
||||
public boolean isWaitingForAMContainer(ApplicationId applicationId) {
|
||||
// The working knowledge is that masterContainer for AM is null as it
|
||||
// itself is the master container.
|
||||
RMAppAttempt appAttempt =
|
||||
rmContext.getRMApps().get(applicationId).getCurrentAppAttempt();
|
||||
return (appAttempt != null && appAttempt.getMasterContainer() == null
|
||||
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false);
|
||||
}
|
||||
|
||||
// Blacklist used for user containers
|
||||
public synchronized void updateBlacklist(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
if (!isStopped) {
|
||||
|
@ -516,9 +519,19 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Blacklist used for AM containers
|
||||
public synchronized void updateAMBlacklist(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
if (!isStopped) {
|
||||
this.appSchedulingInfo.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isBlacklisted(String resourceName) {
|
||||
return this.appSchedulingInfo.isBlacklisted(resourceName);
|
||||
boolean useAMBlacklist = isWaitingForAMContainer(getApplicationId());
|
||||
return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
|
||||
}
|
||||
|
||||
public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
|
@ -934,7 +933,13 @@ public class CapacityScheduler extends
|
|||
}
|
||||
}
|
||||
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
} else {
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
allocation = application.getAllocation(getResourceCalculator(),
|
||||
clusterResource, getMinimumResourceCapability());
|
||||
|
|
|
@ -1,48 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
||||
public class FiCaSchedulerUtils {
|
||||
|
||||
public static boolean isBlacklisted(FiCaSchedulerApp application,
|
||||
FiCaSchedulerNode node, Log LOG) {
|
||||
if (application.isBlacklisted(node.getNodeName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping 'host' " + node.getNodeName() +
|
||||
" for " + application.getApplicationId() +
|
||||
" since it has been blacklisted");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (application.isBlacklisted(node.getRackName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping 'rack' " + node.getRackName() +
|
||||
" for " + application.getApplicationId() +
|
||||
" since it has been blacklisted");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -955,7 +955,14 @@ public class FairScheduler extends
|
|||
preemptionContainerIds.add(container.getContainerId());
|
||||
}
|
||||
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
} else {
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
ContainersAndNMTokensAllocation allocation =
|
||||
application.pullNewlyAllocatedContainersAndNMTokens();
|
||||
|
||||
|
|
|
@ -352,11 +352,18 @@ public class FifoScheduler extends
|
|||
application.showRequests();
|
||||
|
||||
LOG.debug("allocate:" +
|
||||
" applicationId=" + applicationAttemptId +
|
||||
" applicationId=" + applicationAttemptId +
|
||||
" #ask=" + ask.size());
|
||||
}
|
||||
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
} else {
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
ContainersAndNMTokensAllocation allocation =
|
||||
application.pullNewlyAllocatedContainersAndNMTokens();
|
||||
Resource headroom = application.getHeadroom();
|
||||
|
|
|
@ -750,10 +750,7 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||
throws Exception {
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
|
||||
RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
|
||||
System.out.println("Launch AM " + attempt.getAppAttemptId());
|
||||
nm.nodeHeartbeat(true);
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
|
@ -761,6 +758,15 @@ public class MockRM extends ResourceManager {
|
|||
return am;
|
||||
}
|
||||
|
||||
public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm)
|
||||
throws Exception {
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
|
||||
return attempt;
|
||||
}
|
||||
|
||||
public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
|
||||
throws Exception {
|
||||
MockAM am = launchAM(app, rm, nm);
|
||||
|
|
|
@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
|
@ -49,11 +53,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
@ -82,21 +89,7 @@ public class TestAMRestart {
|
|||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
int NUM_CONTAINERS = 3;
|
||||
// allocate NUM_CONTAINERS containers
|
||||
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
||||
new ArrayList<ContainerId>());
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
// wait for containers to be allocated.
|
||||
List<Container> containers =
|
||||
am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
while (containers.size() != NUM_CONTAINERS) {
|
||||
nm1.nodeHeartbeat(true);
|
||||
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
allocateContainers(nm1, am1, NUM_CONTAINERS);
|
||||
|
||||
// launch the 2nd container, for testing running container transferred.
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
||||
|
@ -244,6 +237,29 @@ public class TestAMRestart {
|
|||
rm1.stop();
|
||||
}
|
||||
|
||||
private List<Container> allocateContainers(MockNM nm1, MockAM am1,
|
||||
int NUM_CONTAINERS) throws Exception {
|
||||
// allocate NUM_CONTAINERS containers
|
||||
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
||||
new ArrayList<ContainerId>());
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
// wait for containers to be allocated.
|
||||
List<Container> containers =
|
||||
am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
while (containers.size() != NUM_CONTAINERS) {
|
||||
nm1.nodeHeartbeat(true);
|
||||
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
|
||||
Assert.assertEquals("Did not get all containers allocated",
|
||||
NUM_CONTAINERS, containers.size());
|
||||
return containers;
|
||||
}
|
||||
|
||||
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
|
||||
throws InterruptedException {
|
||||
int count = 0;
|
||||
|
@ -258,6 +274,9 @@ public class TestAMRestart {
|
|||
public void testNMTokensRebindOnAMRestart() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
|
||||
// To prevent test from blacklisting nm1 for AM, we sit threshold to half
|
||||
// of 2 nodes which is 1
|
||||
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f);
|
||||
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
|
@ -355,6 +374,106 @@ public class TestAMRestart {
|
|||
rm1.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
MockRM rm1 = new MockRM(conf, memStore) {
|
||||
@Override
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new SchedulerEventDispatcher(this.scheduler) {
|
||||
@Override
|
||||
public void handle(SchedulerEvent event) {
|
||||
scheduler.handle(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
|
||||
rm1.start();
|
||||
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
CapacityScheduler scheduler =
|
||||
(CapacityScheduler) rm1.getResourceScheduler();
|
||||
ContainerId amContainer =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
// Preempt the first attempt;
|
||||
RMContainer rmContainer = scheduler.getRMContainer(amContainer);
|
||||
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
|
||||
MockNM currentNode, otherNode;
|
||||
if (nodeWhereAMRan == nm1.getNodeId()) {
|
||||
currentNode = nm1;
|
||||
otherNode = nm2;
|
||||
} else {
|
||||
currentNode = nm2;
|
||||
otherNode = nm1;
|
||||
}
|
||||
|
||||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
|
||||
"", ContainerExitStatus.DISKS_FAILED);
|
||||
currentNode.containerStatus(containerStatus);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// restart the am
|
||||
RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1);
|
||||
System.out.println("Launch AM " + attempt.getAppAttemptId());
|
||||
|
||||
|
||||
|
||||
currentNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
Assert.assertEquals(
|
||||
"AppAttemptState should still be SCHEDULED if currentNode is " +
|
||||
"blacklisted correctly",
|
||||
RMAppAttemptState.SCHEDULED,
|
||||
attempt.getAppAttemptState());
|
||||
|
||||
otherNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
|
||||
amContainer =
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||
rmContainer = scheduler.getRMContainer(amContainer);
|
||||
nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
Assert.assertEquals(
|
||||
"After blacklisting AM should have run on the other node",
|
||||
otherNode.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
am2.registerAppAttempt();
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
List<Container> allocatedContainers =
|
||||
allocateContainers(currentNode, am2, 1);
|
||||
Assert.assertEquals(
|
||||
"Even though AM is blacklisted from the node, application can still " +
|
||||
"allocate containers there",
|
||||
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
||||
}
|
||||
|
||||
// AM container preempted, nm disk failure
|
||||
// should not be counted towards AM max retry count.
|
||||
@Test(timeout = 100000)
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class TestBlacklistManager {
|
||||
|
||||
@Test
|
||||
public void testSimpleBlacklistBelowFailureThreshold() {
|
||||
final int numberOfNodeManagerHosts = 3;
|
||||
final double blacklistDisableFailureThreshold = 0.8;
|
||||
BlacklistManager manager = new SimpleBlacklistManager(
|
||||
numberOfNodeManagerHosts, blacklistDisableFailureThreshold);
|
||||
String anyNode = "foo";
|
||||
String anyNode2 = "bar";
|
||||
manager.addNode(anyNode);
|
||||
manager.addNode(anyNode2);
|
||||
BlacklistUpdates blacklist = manager
|
||||
.getBlacklistUpdates();
|
||||
|
||||
List<String> blacklistAdditions = blacklist.getAdditions();
|
||||
Collections.sort(blacklistAdditions);
|
||||
List<String> blacklistRemovals = blacklist.getRemovals();
|
||||
String[] expectedBlacklistAdditions = new String[]{anyNode2, anyNode};
|
||||
Assert.assertArrayEquals(
|
||||
"Blacklist additions was not as expected",
|
||||
expectedBlacklistAdditions,
|
||||
blacklistAdditions.toArray());
|
||||
Assert.assertTrue(
|
||||
"Blacklist removals should be empty but was " +
|
||||
blacklistRemovals,
|
||||
blacklistRemovals.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleBlacklistAboveFailureThreshold() {
|
||||
// Create a threshold of 0.5 * 3 i.e at 1.5 node failures.
|
||||
BlacklistManager manager = new SimpleBlacklistManager(3, 0.5);
|
||||
String anyNode = "foo";
|
||||
String anyNode2 = "bar";
|
||||
manager.addNode(anyNode);
|
||||
BlacklistUpdates blacklist = manager
|
||||
.getBlacklistUpdates();
|
||||
|
||||
List<String> blacklistAdditions = blacklist.getAdditions();
|
||||
Collections.sort(blacklistAdditions);
|
||||
List<String> blacklistRemovals = blacklist.getRemovals();
|
||||
String[] expectedBlacklistAdditions = new String[]{anyNode};
|
||||
Assert.assertArrayEquals(
|
||||
"Blacklist additions was not as expected",
|
||||
expectedBlacklistAdditions,
|
||||
blacklistAdditions.toArray());
|
||||
Assert.assertTrue(
|
||||
"Blacklist removals should be empty but was " +
|
||||
blacklistRemovals,
|
||||
blacklistRemovals.isEmpty());
|
||||
|
||||
manager.addNode(anyNode2);
|
||||
|
||||
blacklist = manager
|
||||
.getBlacklistUpdates();
|
||||
blacklistAdditions = blacklist.getAdditions();
|
||||
Collections.sort(blacklistAdditions);
|
||||
blacklistRemovals = blacklist.getRemovals();
|
||||
Collections.sort(blacklistRemovals);
|
||||
String[] expectedBlacklistRemovals = new String[] {anyNode2, anyNode};
|
||||
Assert.assertTrue(
|
||||
"Blacklist additions should be empty but was " +
|
||||
blacklistAdditions,
|
||||
blacklistAdditions.isEmpty());
|
||||
Assert.assertArrayEquals(
|
||||
"Blacklist removals was not as expected",
|
||||
expectedBlacklistRemovals,
|
||||
blacklistRemovals.toArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisabledBlacklist() {
|
||||
BlacklistManager disabled = new DisabledBlacklistManager();
|
||||
String anyNode = "foo";
|
||||
disabled.addNode(anyNode);
|
||||
BlacklistUpdates blacklist = disabled
|
||||
.getBlacklistUpdates();
|
||||
|
||||
List<String> blacklistAdditions = blacklist.getAdditions();
|
||||
List<String> blacklistRemovals = blacklist.getRemovals();
|
||||
Assert.assertTrue(
|
||||
"Blacklist additions should be empty but was " +
|
||||
blacklistAdditions,
|
||||
blacklistAdditions.isEmpty());
|
||||
Assert.assertTrue(
|
||||
"Blacklist removals should be empty but was " +
|
||||
blacklistRemovals,
|
||||
blacklistRemovals.isEmpty());
|
||||
}
|
||||
}
|
|
@ -489,7 +489,7 @@ public class TestRMAppLogAggregationStatus {
|
|||
2, Resource.newInstance(10, 2), "test");
|
||||
return new RMAppImpl(this.appId, this.rmContext,
|
||||
conf, "test", "test", "default", submissionContext,
|
||||
this.rmContext.getScheduler(),
|
||||
scheduler,
|
||||
this.rmContext.getApplicationMasterService(),
|
||||
System.currentTimeMillis(), "test",
|
||||
null, null);
|
||||
|
|
|
@ -970,7 +970,7 @@ public class TestRMAppTransitions {
|
|||
appState.getApplicationSubmissionContext().getApplicationId(),
|
||||
rmContext, conf,
|
||||
submissionContext.getApplicationName(), null,
|
||||
submissionContext.getQueue(), submissionContext, null, null,
|
||||
submissionContext.getQueue(), submissionContext, scheduler, null,
|
||||
appState.getSubmitTime(), submissionContext.getApplicationType(),
|
||||
submissionContext.getApplicationTags(),
|
||||
BuilderUtils.newResourceRequest(
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -655,6 +656,11 @@ public class TestCapacityScheduler {
|
|||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(appId);
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
Container container = mock(Container.class);
|
||||
when(attempt.getMasterContainer()).thenReturn(container);
|
||||
ApplicationSubmissionContext submissionContext = mock(
|
||||
ApplicationSubmissionContext.class);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
|
@ -715,6 +721,11 @@ public class TestCapacityScheduler {
|
|||
RMAppImpl app1 = mock(RMAppImpl.class);
|
||||
when(app1.getApplicationId()).thenReturn(appId1);
|
||||
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
|
||||
Container container = mock(Container.class);
|
||||
when(attempt1.getMasterContainer()).thenReturn(container);
|
||||
ApplicationSubmissionContext submissionContext = mock(
|
||||
ApplicationSubmissionContext.class);
|
||||
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
|
||||
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
|
||||
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
|
||||
|
@ -739,6 +750,8 @@ public class TestCapacityScheduler {
|
|||
RMAppImpl app2 = mock(RMAppImpl.class);
|
||||
when(app2.getApplicationId()).thenReturn(appId2);
|
||||
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
|
||||
when(attempt2.getMasterContainer()).thenReturn(container);
|
||||
when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
|
||||
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
|
||||
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
|
||||
|
@ -2876,6 +2889,11 @@ public class TestCapacityScheduler {
|
|||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(appId);
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
Container container = mock(Container.class);
|
||||
when(attempt.getMasterContainer()).thenReturn(container);
|
||||
ApplicationSubmissionContext submissionContext = mock(
|
||||
ApplicationSubmissionContext.class);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
|
@ -2953,6 +2971,11 @@ public class TestCapacityScheduler {
|
|||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(appId);
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
Container container = mock(Container.class);
|
||||
when(attempt.getMasterContainer()).thenReturn(container);
|
||||
ApplicationSubmissionContext submissionContext = mock(
|
||||
ApplicationSubmissionContext.class);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
|
@ -2976,6 +2999,8 @@ public class TestCapacityScheduler {
|
|||
RMAppImpl app2 = mock(RMAppImpl.class);
|
||||
when(app2.getApplicationId()).thenReturn(appId2);
|
||||
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
|
||||
when(attempt2.getMasterContainer()).thenReturn(container);
|
||||
when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
|
||||
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
|
||||
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
|
||||
|
|
|
@ -220,7 +220,7 @@ public class FairSchedulerTestBase {
|
|||
ApplicationId appId = attId.getApplicationId();
|
||||
RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
|
||||
null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
|
||||
queue, null, null, false, false, 0, amResource, null), null, null,
|
||||
queue, null, null, false, false, 0, amResource, null), scheduler, null,
|
||||
0, null, null, null);
|
||||
rmContext.getRMApps().put(appId, rmApp);
|
||||
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
|
||||
|
|
Loading…
Reference in New Issue