YARN-4837. User facing aspects of 'AM blacklisting' feature need fixing. (vinodkv via wangda)
This commit is contained in:
parent
3bf2e16f76
commit
9bf420b3bc
|
@ -1,67 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* Specific AMBlacklistingRequest from AM to enable/disable blacklisting.
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract class AMBlackListingRequest {
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static AMBlackListingRequest newInstance(
|
||||
boolean isAMBlackListingEnabled, float disableFailureThreshold) {
|
||||
AMBlackListingRequest blackListRequest = Records
|
||||
.newRecord(AMBlackListingRequest.class);
|
||||
blackListRequest.setBlackListingEnabled(isAMBlackListingEnabled);
|
||||
blackListRequest
|
||||
.setBlackListingDisableFailureThreshold(disableFailureThreshold);
|
||||
return blackListRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return AM Blacklisting is enabled.
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract boolean isAMBlackListingEnabled();
|
||||
|
||||
/**
|
||||
* @return AM Blacklisting disable failure threshold
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract float getBlackListingDisableFailureThreshold();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setBlackListingEnabled(boolean isAMBlackListingEnabled);
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setBlackListingDisableFailureThreshold(
|
||||
float disableFailureThreshold);
|
||||
}
|
|
@ -535,27 +535,4 @@ public abstract class ApplicationSubmissionContext {
|
|||
@Public
|
||||
@Unstable
|
||||
public abstract void setReservationID(ReservationId reservationID);
|
||||
|
||||
/**
|
||||
* Get AM Blacklisting request object to know whether application needs any
|
||||
* specific blacklisting for AM Nodes.
|
||||
*
|
||||
* @return AMBlackListingRequest object which has blacklisting information.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract AMBlackListingRequest getAMBlackListRequest();
|
||||
|
||||
/**
|
||||
* Get AM Blacklisting request object to know whether application needs any
|
||||
* specific blacklisting for AM Nodes.
|
||||
*
|
||||
* @param blackListRequest
|
||||
* object which has blacklisting information such as
|
||||
* "enable/disable AM blacklisting" and "disable failure threshold".
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void setAMBlackListRequest(
|
||||
AMBlackListingRequest blackListRequest);
|
||||
}
|
||||
|
|
|
@ -2348,14 +2348,25 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
|
||||
30 * 60 * 1000;
|
||||
|
||||
public static final String AM_BLACKLISTING_ENABLED =
|
||||
YARN_PREFIX + "am.blacklisting.enabled";
|
||||
public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
|
||||
|
||||
public static final String AM_BLACKLISTING_DISABLE_THRESHOLD =
|
||||
YARN_PREFIX + "am.blacklisting.disable-failure-threshold";
|
||||
public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
|
||||
@Private
|
||||
/**
|
||||
* This is a private feature that isn't supposed to be used by end-users.
|
||||
*/
|
||||
public static final String AM_SCHEDULING_NODE_BLACKLISTING_ENABLED =
|
||||
RM_PREFIX + "am-scheduling.node-blacklisting-enabled";
|
||||
@Private
|
||||
public static final boolean DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED =
|
||||
true;
|
||||
|
||||
@Private
|
||||
/**
|
||||
* This is a private feature that isn't supposed to be used by end-users.
|
||||
*/
|
||||
public static final String AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD =
|
||||
RM_PREFIX + "am-scheduling.node-blacklisting-disable-threshold";
|
||||
@Private
|
||||
public static final float
|
||||
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
|
||||
|
||||
private static final String NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PREFIX =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "script.";
|
||||
|
|
|
@ -353,7 +353,6 @@ message ApplicationSubmissionContextProto {
|
|||
optional ReservationIdProto reservation_id = 15;
|
||||
optional string node_label_expression = 16;
|
||||
optional ResourceRequestProto am_container_resource_request = 17;
|
||||
optional AMBlackListingRequestProto am_blacklisting_request = 18;
|
||||
}
|
||||
|
||||
message LogAggregationContextProto {
|
||||
|
|
|
@ -91,6 +91,13 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
|||
configurationPropsToSkipCompare
|
||||
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
|
||||
|
||||
// Ignore blacklisting nodes for AM failures feature since it is still a
|
||||
// "work in progress"
|
||||
configurationPropsToSkipCompare.add(YarnConfiguration.
|
||||
AM_SCHEDULING_NODE_BLACKLISTING_ENABLED);
|
||||
configurationPropsToSkipCompare.add(YarnConfiguration.
|
||||
AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD);
|
||||
|
||||
// Ignore all YARN Application Timeline Service (version 1) properties
|
||||
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
|
||||
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProtoOrBuilder;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class AMBlackListingRequestPBImpl extends AMBlackListingRequest {
|
||||
AMBlackListingRequestProto proto = AMBlackListingRequestProto
|
||||
.getDefaultInstance();
|
||||
AMBlackListingRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public AMBlackListingRequestPBImpl() {
|
||||
builder = AMBlackListingRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public AMBlackListingRequestPBImpl(AMBlackListingRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public AMBlackListingRequestProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = AMBlackListingRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAMBlackListingEnabled() {
|
||||
AMBlackListingRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getBlacklistingEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getBlackListingDisableFailureThreshold() {
|
||||
AMBlackListingRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getBlacklistingFailureThreshold();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBlackListingEnabled(boolean isAMBlackListingEnabled) {
|
||||
maybeInitBuilder();
|
||||
builder.setBlacklistingEnabled(isAMBlackListingEnabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBlackListingDisableFailureThreshold(
|
||||
float disableFailureThreshold) {
|
||||
maybeInitBuilder();
|
||||
builder.setBlacklistingFailureThreshold(disableFailureThreshold);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
|
@ -24,7 +24,6 @@ import java.util.Set;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
|
@ -34,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
|
||||
|
@ -65,7 +63,6 @@ extends ApplicationSubmissionContext {
|
|||
private ResourceRequest amResourceRequest = null;
|
||||
private LogAggregationContext logAggregationContext = null;
|
||||
private ReservationId reservationId = null;
|
||||
private AMBlackListingRequest amBlackListRequest = null;
|
||||
|
||||
public ApplicationSubmissionContextPBImpl() {
|
||||
builder = ApplicationSubmissionContextProto.newBuilder();
|
||||
|
@ -134,10 +131,6 @@ extends ApplicationSubmissionContext {
|
|||
if (this.reservationId != null) {
|
||||
builder.setReservationId(convertToProtoFormat(this.reservationId));
|
||||
}
|
||||
if (this.amBlackListRequest != null) {
|
||||
builder.setAmBlacklistingRequest(
|
||||
convertToProtoFormat(this.amBlackListRequest));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -420,29 +413,6 @@ extends ApplicationSubmissionContext {
|
|||
return p.getKeepContainersAcrossApplicationAttempts();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AMBlackListingRequest getAMBlackListRequest() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (amBlackListRequest != null) {
|
||||
return amBlackListRequest;
|
||||
}
|
||||
if (!p.hasAmBlacklistingRequest()) {
|
||||
return null;
|
||||
}
|
||||
amBlackListRequest = convertFromProtoFormat(p.getAmBlacklistingRequest());
|
||||
return amBlackListRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAMBlackListRequest(AMBlackListingRequest amBlackListRequest) {
|
||||
maybeInitBuilder();
|
||||
if (amBlackListRequest == null) {
|
||||
builder.clearAmBlacklistingRequest();
|
||||
return;
|
||||
}
|
||||
this.amBlackListRequest = amBlackListRequest;
|
||||
}
|
||||
|
||||
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
|
||||
return new PriorityPBImpl(p);
|
||||
}
|
||||
|
@ -485,16 +455,6 @@ extends ApplicationSubmissionContext {
|
|||
return ((ResourcePBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private AMBlackListingRequestPBImpl convertFromProtoFormat(
|
||||
AMBlackListingRequestProto a) {
|
||||
return new AMBlackListingRequestPBImpl(a);
|
||||
}
|
||||
|
||||
private AMBlackListingRequestProto convertToProtoFormat(
|
||||
AMBlackListingRequest a) {
|
||||
return ((AMBlackListingRequestPBImpl) a).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeLabelExpression() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
|
|
@ -2604,25 +2604,6 @@
|
|||
<value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Enable/disable blacklisting of hosts for AM based on AM failures on those
|
||||
hosts.
|
||||
</description>
|
||||
<name>yarn.am.blacklisting.enabled</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Threshold of ratio number of NodeManager hosts that are allowed to be
|
||||
blacklisted for AM. Beyond this ratio there is no blacklisting to avoid
|
||||
danger of blacklisting the entire cluster.
|
||||
</description>
|
||||
<name>yarn.am.blacklisting.disable-failure-threshold</name>
|
||||
<value>0.8f</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Choose different implementation of node label's storage
|
||||
|
|
|
@ -109,7 +109,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersReso
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -153,7 +152,6 @@ import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
|||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.AMBlackListingRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptReportPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
|
@ -187,7 +185,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl
|
|||
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.YarnClusterMetricsPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
|
@ -509,7 +506,6 @@ public class TestPBImplRecords {
|
|||
generateByNewInstance(ResourceAllocationRequest.class);
|
||||
generateByNewInstance(ReservationAllocationState.class);
|
||||
generateByNewInstance(ResourceUtilization.class);
|
||||
generateByNewInstance(AMBlackListingRequest.class);
|
||||
}
|
||||
|
||||
private class GetSetPair {
|
||||
|
@ -1343,10 +1339,4 @@ public class TestPBImplRecords {
|
|||
validatePBImplRecord(CheckForDecommissioningNodesResponsePBImpl.class,
|
||||
CheckForDecommissioningNodesResponseProto.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMBlackListingRequestPBImpl() throws Exception {
|
||||
validatePBImplRecord(AMBlackListingRequestPBImpl.class,
|
||||
AMBlackListingRequestProto.class);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
|
||||
/**
|
||||
* Tracks blacklists based on failures reported on nodes.
|
||||
|
@ -33,14 +34,14 @@ public interface BlacklistManager {
|
|||
void addNode(String node);
|
||||
|
||||
/**
|
||||
* Get {@link BlacklistUpdates} that indicate which nodes should be
|
||||
* Get {@link ResourceBlacklistRequest} that indicate which nodes should be
|
||||
* added or to removed from the blacklist.
|
||||
* @return {@link BlacklistUpdates}
|
||||
* @return {@link ResourceBlacklistRequest}
|
||||
*/
|
||||
BlacklistUpdates getBlacklistUpdates();
|
||||
ResourceBlacklistRequest getBlacklistUpdates();
|
||||
|
||||
/**
|
||||
* Refresh the number of nodemanager hosts available for scheduling.
|
||||
* Refresh the number of NodeManagers available for scheduling.
|
||||
* @param nodeHostCount is the number of node hosts.
|
||||
*/
|
||||
void refreshNodeHostCount(int nodeHostCount);
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Class to track blacklist additions and removals.
|
||||
*/
|
||||
@Private
|
||||
public class BlacklistUpdates {
|
||||
|
||||
private List<String> additions;
|
||||
private List<String> removals;
|
||||
|
||||
public BlacklistUpdates(List<String> additions,
|
||||
List<String> removals) {
|
||||
this.additions = additions;
|
||||
this.removals = removals;
|
||||
}
|
||||
|
||||
public List<String> getAdditions() {
|
||||
return additions;
|
||||
}
|
||||
|
||||
public List<String> getRemovals() {
|
||||
return removals;
|
||||
}
|
||||
}
|
|
@ -20,21 +20,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
|||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
|
||||
/**
|
||||
* A {@link BlacklistManager} that returns no blacklists.
|
||||
*/
|
||||
public class DisabledBlacklistManager implements BlacklistManager{
|
||||
public class DisabledBlacklistManager implements BlacklistManager {
|
||||
|
||||
private static final ArrayList<String> EMPTY_LIST = new ArrayList<String>();
|
||||
private BlacklistUpdates noBlacklist =
|
||||
new BlacklistUpdates(EMPTY_LIST, EMPTY_LIST);
|
||||
private ResourceBlacklistRequest noBlacklist =
|
||||
ResourceBlacklistRequest.newInstance(EMPTY_LIST, EMPTY_LIST);
|
||||
|
||||
@Override
|
||||
public void addNode(String node) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlacklistUpdates getBlacklistUpdates() {
|
||||
public ResourceBlacklistRequest getBlacklistUpdates() {
|
||||
return noBlacklist;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,14 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
|
||||
/**
|
||||
* Maintains a list of failed nodes and returns that as long as number of
|
||||
* blacklisted nodes is below a threshold percentage of total nodes. If more
|
||||
|
@ -58,8 +59,8 @@ public class SimpleBlacklistManager implements BlacklistManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public BlacklistUpdates getBlacklistUpdates() {
|
||||
BlacklistUpdates ret;
|
||||
public ResourceBlacklistRequest getBlacklistUpdates() {
|
||||
ResourceBlacklistRequest ret;
|
||||
List<String> blacklist = new ArrayList<>(blacklistNodes);
|
||||
final int currentBlacklistSize = blacklist.size();
|
||||
final double failureThreshold = this.blacklistDisableFailureThreshold *
|
||||
|
@ -70,13 +71,15 @@ public class SimpleBlacklistManager implements BlacklistManager {
|
|||
"failure threshold ratio " + blacklistDisableFailureThreshold +
|
||||
" out of total usable nodes " + numberOfNodeManagerHosts);
|
||||
}
|
||||
ret = new BlacklistUpdates(blacklist, EMPTY_LIST);
|
||||
ret = ResourceBlacklistRequest.newInstance(blacklist, EMPTY_LIST);
|
||||
} else {
|
||||
LOG.warn("Ignoring Blacklists, blacklist size " + currentBlacklistSize
|
||||
+ " is more than failure threshold ratio "
|
||||
+ blacklistDisableFailureThreshold + " out of total usable nodes "
|
||||
+ numberOfNodeManagerHosts);
|
||||
ret = new BlacklistUpdates(EMPTY_LIST, blacklist);
|
||||
// TODO: After the threshold hits, we will keep sending a long list
|
||||
// every time a new AM is to be scheduled.
|
||||
ret = ResourceBlacklistRequest.newInstance(EMPTY_LIST, blacklist);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -391,8 +391,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
stateMachine;
|
||||
|
||||
private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
|
||||
private static final float MINIMUM_THRESHOLD_VALUE = 0.0f;
|
||||
private static final float MAXIMUM_THRESHOLD_VALUE = 1.0f;
|
||||
private static final float MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 0.0f;
|
||||
private static final float MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 1.0f;
|
||||
|
||||
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||
Configuration config, String name, String user, String queue,
|
||||
|
@ -471,42 +471,24 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
|
||||
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
|
||||
|
||||
// amBlacklistingEnabled can be configured globally and by each
|
||||
// application.
|
||||
// Case 1: If AMBlackListRequest is available in submission context, we
|
||||
// will consider only app level request (RM level configuration will be
|
||||
// skipped).
|
||||
// Case 2: AMBlackListRequest is available in submission context and
|
||||
// amBlacklisting is disabled. In this case, AM blacklisting wont be
|
||||
// enabled for this app even if this feature is enabled in RM level.
|
||||
// Case 3: AMBlackListRequest is not available through submission context.
|
||||
// RM level AM black listing configuration will be considered.
|
||||
if (null != submissionContext.getAMBlackListRequest()) {
|
||||
amBlacklistingEnabled = submissionContext.getAMBlackListRequest()
|
||||
.isAMBlackListingEnabled();
|
||||
blacklistDisableThreshold = 0.0f;
|
||||
if (amBlacklistingEnabled) {
|
||||
blacklistDisableThreshold = submissionContext.getAMBlackListRequest()
|
||||
.getBlackListingDisableFailureThreshold();
|
||||
|
||||
// Verify whether blacklistDisableThreshold is valid. And for invalid
|
||||
// threshold, reset to global level blacklistDisableThreshold
|
||||
// configured.
|
||||
if (blacklistDisableThreshold < MINIMUM_THRESHOLD_VALUE
|
||||
|| blacklistDisableThreshold > MAXIMUM_THRESHOLD_VALUE) {
|
||||
blacklistDisableThreshold = conf.getFloat(
|
||||
YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
amBlacklistingEnabled = conf.getBoolean(
|
||||
YarnConfiguration.AM_BLACKLISTING_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
|
||||
if (amBlacklistingEnabled) {
|
||||
blacklistDisableThreshold = conf.getFloat(
|
||||
YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
|
||||
// amBlacklistingEnabled can be configured globally
|
||||
// Just use the global values
|
||||
amBlacklistingEnabled =
|
||||
conf.getBoolean(
|
||||
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED);
|
||||
if (amBlacklistingEnabled) {
|
||||
blacklistDisableThreshold = conf.getFloat(
|
||||
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
YarnConfiguration.
|
||||
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD);
|
||||
// Verify whether blacklistDisableThreshold is valid. And for invalid
|
||||
// threshold, reset to global level blacklistDisableThreshold
|
||||
// configured.
|
||||
if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE ||
|
||||
blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) {
|
||||
blacklistDisableThreshold = YarnConfiguration.
|
||||
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -858,15 +840,16 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
|
||||
|
||||
BlacklistManager currentAMBlacklist;
|
||||
BlacklistManager currentAMBlacklistManager;
|
||||
if (currentAttempt != null) {
|
||||
currentAMBlacklist = currentAttempt.getAMBlacklist();
|
||||
// Transfer over the blacklist from the previous app-attempt.
|
||||
currentAMBlacklistManager = currentAttempt.getAMBlacklistManager();
|
||||
} else {
|
||||
if (amBlacklistingEnabled) {
|
||||
currentAMBlacklist = new SimpleBlacklistManager(
|
||||
currentAMBlacklistManager = new SimpleBlacklistManager(
|
||||
scheduler.getNumClusterNodes(), blacklistDisableThreshold);
|
||||
} else {
|
||||
currentAMBlacklist = new DisabledBlacklistManager();
|
||||
currentAMBlacklistManager = new DisabledBlacklistManager();
|
||||
}
|
||||
}
|
||||
RMAppAttempt attempt =
|
||||
|
@ -877,7 +860,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// hardware error and NM resync) + 1) equal to the max-attempt
|
||||
// limit.
|
||||
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
|
||||
currentAMBlacklist);
|
||||
currentAMBlacklistManager);
|
||||
attempts.put(appAttemptId, attempt);
|
||||
currentAttempt = attempt;
|
||||
}
|
||||
|
@ -1800,14 +1783,4 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
|
||||
rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public boolean isAmBlacklistingEnabled() {
|
||||
return amBlacklistingEnabled;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public float getAmBlacklistingDisableThreshold() {
|
||||
return blacklistDisableThreshold;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,7 +190,7 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
|||
* Get the {@link BlacklistManager} that manages blacklists for AM failures
|
||||
* @return the {@link BlacklistManager} that tracks AM failures.
|
||||
*/
|
||||
BlacklistManager getAMBlacklist();
|
||||
BlacklistManager getAMBlacklistManager();
|
||||
|
||||
/**
|
||||
* the start time of the application.
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -74,7 +75,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
|
@ -491,7 +491,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
ApplicationMasterService masterService,
|
||||
ApplicationSubmissionContext submissionContext,
|
||||
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq,
|
||||
BlacklistManager amBlacklist) {
|
||||
BlacklistManager amBlacklistManager) {
|
||||
this.conf = conf;
|
||||
this.applicationAttemptId = appAttemptId;
|
||||
this.rmContext = rmContext;
|
||||
|
@ -512,7 +512,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
new RMAppAttemptMetrics(applicationAttemptId, rmContext);
|
||||
|
||||
this.amReq = amReq;
|
||||
this.blacklistedNodesForAM = amBlacklist;
|
||||
this.blacklistedNodesForAM = amBlacklistManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1033,15 +1033,15 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
|
||||
appAttempt.amReq.setRelaxLocality(true);
|
||||
|
||||
appAttempt.getAMBlacklist().refreshNodeHostCount(
|
||||
appAttempt.getAMBlacklistManager().refreshNodeHostCount(
|
||||
appAttempt.scheduler.getNumClusterNodes());
|
||||
|
||||
BlacklistUpdates amBlacklist = appAttempt.getAMBlacklist()
|
||||
.getBlacklistUpdates();
|
||||
ResourceBlacklistRequest amBlacklist =
|
||||
appAttempt.getAMBlacklistManager().getBlacklistUpdates();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Using blacklist for AM: additions(" +
|
||||
amBlacklist.getAdditions() + ") and removals(" +
|
||||
amBlacklist.getRemovals() + ")");
|
||||
amBlacklist.getBlacklistAdditions() + ") and removals(" +
|
||||
amBlacklist.getBlacklistRemovals() + ")");
|
||||
}
|
||||
// AM resource has been checked when submission
|
||||
Allocation amContainerAllocation =
|
||||
|
@ -1049,8 +1049,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.applicationAttemptId,
|
||||
Collections.singletonList(appAttempt.amReq),
|
||||
EMPTY_CONTAINER_RELEASE_LIST,
|
||||
amBlacklist.getAdditions(),
|
||||
amBlacklist.getRemovals(), null, null);
|
||||
amBlacklist.getBlacklistAdditions(),
|
||||
amBlacklist.getBlacklistRemovals(), null, null);
|
||||
if (amContainerAllocation != null
|
||||
&& amContainerAllocation.getContainers() != null) {
|
||||
assert (amContainerAllocation.getContainers().size() == 0);
|
||||
|
@ -1483,9 +1483,36 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean shouldCountTowardsNodeBlacklisting(int exitStatus) {
|
||||
return !(exitStatus == ContainerExitStatus.SUCCESS
|
||||
|| exitStatus == ContainerExitStatus.PREEMPTED);
|
||||
private static boolean shouldCountTowardsNodeBlacklisting(int exitStatus) {
|
||||
switch (exitStatus) {
|
||||
case ContainerExitStatus.PREEMPTED:
|
||||
case ContainerExitStatus.KILLED_BY_RESOURCEMANAGER:
|
||||
case ContainerExitStatus.KILLED_BY_APPMASTER:
|
||||
case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
|
||||
case ContainerExitStatus.ABORTED:
|
||||
// Neither the app's fault nor the system's fault. This happens by design,
|
||||
// so no need for skipping nodes
|
||||
return false;
|
||||
case ContainerExitStatus.DISKS_FAILED:
|
||||
// This container is marked with this exit-status means that the node is
|
||||
// already marked as unhealthy given that most of the disks failed. So, no
|
||||
// need for any explicit skipping of nodes.
|
||||
return false;
|
||||
case ContainerExitStatus.KILLED_EXCEEDED_VMEM:
|
||||
case ContainerExitStatus.KILLED_EXCEEDED_PMEM:
|
||||
// No point in skipping the node as it's not the system's fault
|
||||
return false;
|
||||
case ContainerExitStatus.SUCCESS:
|
||||
return false;
|
||||
case ContainerExitStatus.INVALID:
|
||||
// Ideally, this shouldn't be considered for skipping a node. But in
|
||||
// reality, it seems like there are cases where we are not setting
|
||||
// exit-code correctly and so it's better to be conservative. See
|
||||
// YARN-4284.
|
||||
return true;
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class UnmanagedAMAttemptSavedTransition
|
||||
|
@ -1805,7 +1832,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
if (appAttempt.masterContainer != null
|
||||
&& appAttempt.masterContainer.getId().equals(
|
||||
containerStatus.getContainerId())) {
|
||||
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
|
||||
appAttempt.amContainerFinished(appAttempt, containerFinishedEvent);
|
||||
|
||||
// Remember the follow up transition and save the final attempt state.
|
||||
appAttempt.rememberTargetTransitionsAndStoreState(event,
|
||||
|
@ -1851,13 +1878,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
|
||||
// Add am container to the list so that am container instance will be
|
||||
// removed from NMContext.
|
||||
private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
|
||||
private static void amContainerFinished(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
|
||||
|
||||
NodeId nodeId = containerFinishedEvent.getNodeId();
|
||||
if (containerFinishedEvent.getContainerStatus() != null) {
|
||||
if (shouldCountTowardsNodeBlacklisting(containerFinishedEvent
|
||||
.getContainerStatus().getExitStatus())) {
|
||||
appAttempt.addAMNodeToBlackList(containerFinishedEvent.getNodeId());
|
||||
|
||||
ContainerStatus containerStatus =
|
||||
containerFinishedEvent.getContainerStatus();
|
||||
if (containerStatus != null) {
|
||||
int exitStatus = containerStatus.getExitStatus();
|
||||
if (shouldCountTowardsNodeBlacklisting(exitStatus)) {
|
||||
appAttempt.addAMNodeToBlackList(nodeId);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("No ContainerStatus in containerFinishedEvent");
|
||||
|
@ -1865,14 +1896,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
|
||||
if (!appAttempt.getSubmissionContext()
|
||||
.getKeepContainersAcrossApplicationAttempts()) {
|
||||
finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||
new ArrayList<ContainerStatus>());
|
||||
appAttempt.finishedContainersSentToAM.get(nodeId).add(
|
||||
containerFinishedEvent.getContainerStatus());
|
||||
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
|
||||
appAttempt.sendFinishedContainersToNM();
|
||||
} else {
|
||||
appAttempt.sendFinishedAMContainerToNM(nodeId,
|
||||
containerFinishedEvent.getContainerStatus().getContainerId());
|
||||
containerStatus.getContainerId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1887,7 +1917,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public BlacklistManager getAMBlacklist() {
|
||||
public BlacklistManager getAMBlacklistManager() {
|
||||
return blacklistedNodesForAM;
|
||||
}
|
||||
|
||||
|
@ -1946,7 +1976,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
containerStatus.getContainerId())) {
|
||||
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
||||
appAttempt, containerFinishedEvent);
|
||||
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
|
||||
appAttempt.amContainerFinished(appAttempt, containerFinishedEvent);
|
||||
return RMAppAttemptState.FINISHED;
|
||||
}
|
||||
// Add all finished containers so that they can be acked to NM.
|
||||
|
@ -1971,7 +2001,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
// Thus, we still return FINAL_SAVING state here.
|
||||
if (appAttempt.masterContainer.getId().equals(
|
||||
containerStatus.getContainerId())) {
|
||||
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
|
||||
|
||||
appAttempt.amContainerFinished(appAttempt, containerFinishedEvent);
|
||||
|
||||
if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
|
||||
|| appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
|
||||
|
|
|
@ -70,15 +70,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
||||
.RMContainerNMDoneChangeResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerNMDoneChangeResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
||||
.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
|
@ -189,7 +188,8 @@ public abstract class AbstractYarnScheduler
|
|||
public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app,
|
||||
List<NodeId> blacklistNodeIdList) {
|
||||
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
|
||||
if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) {
|
||||
if (SchedulerAppUtils.isPlaceBlacklisted(app, nodeEntry.getValue(),
|
||||
LOG)) {
|
||||
blacklistNodeIdList.add(nodeEntry.getKey());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,9 +73,13 @@ public class AppSchedulingInfo {
|
|||
private ActiveUsersManager activeUsersManager;
|
||||
private boolean pending = true; // whether accepted/allocated by scheduler
|
||||
private ResourceUsage appResourceUsage;
|
||||
|
||||
private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
|
||||
private final Set<String> amBlacklist = new HashSet<>();
|
||||
private Set<String> userBlacklist = new HashSet<>();
|
||||
// Set of places (nodes / racks) blacklisted by the system. Today, this only
|
||||
// has places blacklisted for AM containers.
|
||||
private final Set<String> placesBlacklistedBySystem = new HashSet<>();
|
||||
private Set<String> placesBlacklistedByApp = new HashSet<>();
|
||||
|
||||
private Set<String> requestedPartitions = new HashSet<>();
|
||||
|
||||
final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
|
||||
|
@ -446,32 +450,38 @@ public class AppSchedulingInfo {
|
|||
}
|
||||
|
||||
/**
|
||||
* The ApplicationMaster is updating the userBlacklist used for containers
|
||||
* other than AMs.
|
||||
* The ApplicationMaster is updating the placesBlacklistedByApp used for
|
||||
* containers other than AMs.
|
||||
*
|
||||
* @param blacklistAdditions resources to be added to the userBlacklist
|
||||
* @param blacklistRemovals resources to be removed from the userBlacklist
|
||||
* @param blacklistAdditions
|
||||
* resources to be added to the userBlacklist
|
||||
* @param blacklistRemovals
|
||||
* resources to be removed from the userBlacklist
|
||||
*/
|
||||
public void updateBlacklist(
|
||||
public void updatePlacesBlacklistedByApp(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
if (updateUserOrAMBlacklist(userBlacklist, blacklistAdditions,
|
||||
if (updateBlacklistedPlaces(placesBlacklistedByApp, blacklistAdditions,
|
||||
blacklistRemovals)) {
|
||||
userBlacklistChanged.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* RM is updating blacklist for AM containers.
|
||||
* @param blacklistAdditions resources to be added to the amBlacklist
|
||||
* @param blacklistRemovals resources to be added to the amBlacklist
|
||||
* Update the list of places that are blacklisted by the system. Today the
|
||||
* system only blacklists places when it sees that AMs failed there
|
||||
*
|
||||
* @param blacklistAdditions
|
||||
* resources to be added to placesBlacklistedBySystem
|
||||
* @param blacklistRemovals
|
||||
* resources to be removed from placesBlacklistedBySystem
|
||||
*/
|
||||
public void updateAMBlacklist(
|
||||
public void updatePlacesBlacklistedBySystem(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
updateUserOrAMBlacklist(amBlacklist, blacklistAdditions,
|
||||
updateBlacklistedPlaces(placesBlacklistedBySystem, blacklistAdditions,
|
||||
blacklistRemovals);
|
||||
}
|
||||
|
||||
boolean updateUserOrAMBlacklist(Set<String> blacklist,
|
||||
private static boolean updateBlacklistedPlaces(Set<String> blacklist,
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
boolean changed = false;
|
||||
synchronized (blacklist) {
|
||||
|
@ -480,9 +490,7 @@ public class AppSchedulingInfo {
|
|||
}
|
||||
|
||||
if (blacklistRemovals != null) {
|
||||
if (blacklist.removeAll(blacklistRemovals)) {
|
||||
changed = true;
|
||||
}
|
||||
changed = blacklist.removeAll(blacklistRemovals) || changed;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
|
@ -521,20 +529,24 @@ public class AppSchedulingInfo {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns if the node is either blacklisted by the user or the system
|
||||
* @param resourceName the resourcename
|
||||
* @param useAMBlacklist true if it should check amBlacklist
|
||||
* Returns if the place (node/rack today) is either blacklisted by the
|
||||
* application (user) or the system
|
||||
*
|
||||
* @param resourceName
|
||||
* the resourcename
|
||||
* @param blacklistedBySystem
|
||||
* true if it should check amBlacklist
|
||||
* @return true if its blacklisted
|
||||
*/
|
||||
public boolean isBlacklisted(String resourceName,
|
||||
boolean useAMBlacklist) {
|
||||
if (useAMBlacklist){
|
||||
synchronized (amBlacklist) {
|
||||
return amBlacklist.contains(resourceName);
|
||||
public boolean isPlaceBlacklisted(String resourceName,
|
||||
boolean blacklistedBySystem) {
|
||||
if (blacklistedBySystem){
|
||||
synchronized (placesBlacklistedBySystem) {
|
||||
return placesBlacklistedBySystem.contains(resourceName);
|
||||
}
|
||||
} else {
|
||||
synchronized (userBlacklist) {
|
||||
return userBlacklist.contains(resourceName);
|
||||
synchronized (placesBlacklistedByApp) {
|
||||
return placesBlacklistedByApp.contains(resourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -772,12 +784,12 @@ public class AppSchedulingInfo {
|
|||
}
|
||||
|
||||
public Set<String> getBlackList() {
|
||||
return this.userBlacklist;
|
||||
return this.placesBlacklistedByApp;
|
||||
}
|
||||
|
||||
public Set<String> getBlackListCopy() {
|
||||
synchronized (userBlacklist) {
|
||||
return new HashSet<>(this.userBlacklist);
|
||||
synchronized (placesBlacklistedByApp) {
|
||||
return new HashSet<>(this.placesBlacklistedByApp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -785,7 +797,7 @@ public class AppSchedulingInfo {
|
|||
AppSchedulingInfo appInfo) {
|
||||
// This should not require locking the userBlacklist since it will not be
|
||||
// used by this instance until after setCurrentAppAttempt.
|
||||
this.userBlacklist = appInfo.getBlackList();
|
||||
this.placesBlacklistedByApp = appInfo.getBlackList();
|
||||
}
|
||||
|
||||
public synchronized void recoverContainer(RMContainer rmContainer) {
|
||||
|
|
|
@ -22,20 +22,20 @@ import org.apache.commons.logging.Log;
|
|||
|
||||
public class SchedulerAppUtils {
|
||||
|
||||
public static boolean isBlacklisted(SchedulerApplicationAttempt application,
|
||||
SchedulerNode node, Log LOG) {
|
||||
if (application.isBlacklisted(node.getNodeName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping 'host' " + node.getNodeName() +
|
||||
public static boolean isPlaceBlacklisted(
|
||||
SchedulerApplicationAttempt application, SchedulerNode node, Log log) {
|
||||
if (application.isPlaceBlacklisted(node.getNodeName())) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Skipping 'host' " + node.getNodeName() +
|
||||
" for " + application.getApplicationId() +
|
||||
" since it has been blacklisted");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (application.isBlacklisted(node.getRackName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping 'rack' " + node.getRackName() +
|
||||
if (application.isPlaceBlacklisted(node.getRackName())) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Skipping 'rack' " + node.getRackName() +
|
||||
" for " + application.getApplicationId() +
|
||||
" since it has been blacklisted");
|
||||
}
|
||||
|
|
|
@ -590,27 +590,26 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
return (!unmanagedAM && appAttempt.getMasterContainer() == null);
|
||||
}
|
||||
|
||||
// Blacklist used for user containers
|
||||
public synchronized void updateBlacklist(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
public synchronized void updateBlacklist(List<String> blacklistAdditions,
|
||||
List<String> blacklistRemovals) {
|
||||
if (!isStopped) {
|
||||
this.appSchedulingInfo.updateBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
if (isWaitingForAMContainer()) {
|
||||
// The request is for the AM-container, and the AM-container is launched
|
||||
// by the system. So, update the places that are blacklisted by system
|
||||
// (as opposed to those blacklisted by the application).
|
||||
this.appSchedulingInfo.updatePlacesBlacklistedBySystem(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
} else {
|
||||
this.appSchedulingInfo.updatePlacesBlacklistedByApp(blacklistAdditions,
|
||||
blacklistRemovals);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Blacklist used for AM containers
|
||||
public synchronized void updateAMBlacklist(
|
||||
List<String> blacklistAdditions, List<String> blacklistRemovals) {
|
||||
if (!isStopped) {
|
||||
this.appSchedulingInfo.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isBlacklisted(String resourceName) {
|
||||
boolean useAMBlacklist = isWaitingForAMContainer();
|
||||
return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
|
||||
public boolean isPlaceBlacklisted(String resourceName) {
|
||||
boolean forAMContainer = isWaitingForAMContainer();
|
||||
return this.appSchedulingInfo.isPlaceBlacklisted(resourceName,
|
||||
forAMContainer);
|
||||
}
|
||||
|
||||
public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
|
||||
|
|
|
@ -982,14 +982,7 @@ public class CapacityScheduler extends
|
|||
}
|
||||
}
|
||||
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
} else {
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
|
||||
allocation = application.getAllocation(getResourceCalculator(),
|
||||
clusterResource, getMinimumResourceCapability());
|
||||
|
|
|
@ -79,7 +79,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
|
||||
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
||||
ResourceLimits resourceLimits, Priority priority) {
|
||||
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
|
||||
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
|
||||
application.updateAppSkipNodeDiagnostics(
|
||||
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
|
||||
return ContainerAllocation.APP_SKIPPED;
|
||||
|
|
|
@ -331,7 +331,7 @@ public class FSLeafQueue extends FSQueue {
|
|||
readLock.unlock();
|
||||
}
|
||||
for (FSAppAttempt sched : pendingForResourceApps) {
|
||||
if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
|
||||
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
|
||||
continue;
|
||||
}
|
||||
assigned = sched.assignContainer(node);
|
||||
|
|
|
@ -1007,13 +1007,7 @@ public class FairScheduler extends
|
|||
preemptionContainerIds.add(container.getContainerId());
|
||||
}
|
||||
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
} else {
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
|
||||
List<Container> newlyAllocatedContainers =
|
||||
application.pullNewlyAllocatedContainers();
|
||||
|
|
|
@ -361,13 +361,7 @@ public class FifoScheduler extends
|
|||
" #ask=" + ask.size());
|
||||
}
|
||||
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
} else {
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
|
||||
Resource headroom = application.getHeadroom();
|
||||
application.setApplicationHeadroomForMetrics(headroom);
|
||||
|
@ -502,7 +496,7 @@ public class FifoScheduler extends
|
|||
application.showRequests();
|
||||
synchronized (application) {
|
||||
// Check if this resource is on the blacklist
|
||||
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
|
||||
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -209,8 +209,9 @@ public class RMAppAttemptBlock extends AppAttemptBlock{
|
|||
String appBlacklistedNodes =
|
||||
getNodeString(rmAppAttempt.getBlacklistedNodes());
|
||||
// nodes which are blacklisted by the RM for AM launches
|
||||
String rmBlackListedNodes = getNodeString(
|
||||
rmAppAttempt.getAMBlacklist().getBlacklistUpdates().getAdditions());
|
||||
String rmBlackListedNodes =
|
||||
getNodeString(rmAppAttempt.getAMBlacklistManager()
|
||||
.getBlacklistUpdates().getBlacklistAdditions());
|
||||
|
||||
info("Application Attempt Overview")
|
||||
._(
|
||||
|
@ -245,8 +246,8 @@ public class RMAppAttemptBlock extends AppAttemptBlock{
|
|||
"Diagnostics Info:",
|
||||
appAttempt.getDiagnosticsInfo() == null ? "" : appAttempt
|
||||
.getDiagnosticsInfo())
|
||||
._("Application Blacklisted Nodes:", appBlacklistedNodes)
|
||||
._("RM Blacklisted Nodes(for AM launches)", rmBlackListedNodes);
|
||||
._("Nodes blacklisted by the application:", appBlacklistedNodes)
|
||||
._("Nodes blacklisted by the system:", rmBlackListedNodes);
|
||||
}
|
||||
|
||||
private String getNodeString(Collection<String> nodes) {
|
||||
|
|
|
@ -113,10 +113,10 @@ public class RMAppBlock extends AppBlock{
|
|||
Hamlet.TBODY<Hamlet.TABLE<Hamlet>> tbody =
|
||||
html.table("#attempts").thead().tr().th(".id", "Attempt ID")
|
||||
.th(".started", "Started").th(".node", "Node").th(".logs", "Logs")
|
||||
.th(".appBlacklistednodes", "Nodes black listed by the application",
|
||||
"App Blacklisted Nodes")
|
||||
.th(".rmBlacklistednodes", "Nodes black listed by the RM for the"
|
||||
+ " app", "RM Blacklisted Nodes")._()._().tbody();
|
||||
.th(".appBlacklistednodes", "Nodes blacklisted by the application",
|
||||
"Nodes blacklisted by the app")
|
||||
.th(".rmBlacklistednodes", "Nodes blacklisted by the RM for the"
|
||||
+ " app", "Nodes blacklisted by the system")._()._().tbody();
|
||||
|
||||
RMApp rmApp = this.rm.getRMContext().getRMApps().get(this.appID);
|
||||
if (rmApp == null) {
|
||||
|
@ -136,8 +136,9 @@ public class RMAppBlock extends AppBlock{
|
|||
// nodes which are blacklisted by the application
|
||||
String appBlacklistedNodesCount = String.valueOf(nodes.size());
|
||||
// nodes which are blacklisted by the RM for AM launches
|
||||
String rmBlacklistedNodesCount = String.valueOf(rmAppAttempt
|
||||
.getAMBlacklist().getBlacklistUpdates().getAdditions().size());
|
||||
String rmBlacklistedNodesCount =
|
||||
String.valueOf(rmAppAttempt.getAMBlacklistManager()
|
||||
.getBlacklistUpdates().getBlacklistAdditions().size());
|
||||
String nodeLink = attemptInfo.getNodeHttpAddress();
|
||||
if (nodeLink != null) {
|
||||
nodeLink = WebAppUtils.getHttpSchemePrefix(conf) + nodeLink;
|
||||
|
|
|
@ -86,8 +86,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
|
@ -166,10 +170,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationReque
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||
|
|
|
@ -40,7 +40,7 @@ public class AppAttemptInfo {
|
|||
protected String nodeId;
|
||||
protected String logsLink;
|
||||
protected String blacklistedNodes;
|
||||
protected String rmBlacklistedNodesForAMLaunches;
|
||||
protected String nodesBlacklistedBySystem;
|
||||
|
||||
public AppAttemptInfo() {
|
||||
}
|
||||
|
@ -65,9 +65,9 @@ public class AppAttemptInfo {
|
|||
+ masterContainer.getNodeHttpAddress(),
|
||||
masterContainer.getId().toString(), user);
|
||||
|
||||
rmBlacklistedNodesForAMLaunches = StringUtils.join(
|
||||
attempt.getAMBlacklist().getBlacklistUpdates().getAdditions(),
|
||||
", ");
|
||||
nodesBlacklistedBySystem =
|
||||
StringUtils.join(attempt.getAMBlacklistManager()
|
||||
.getBlacklistUpdates().getBlacklistAdditions(), ", ");
|
||||
if (rm.getResourceScheduler() instanceof AbstractYarnScheduler) {
|
||||
AbstractYarnScheduler ayScheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Validate system behavior when the am-scheduling logic 'blacklists' a node for
|
||||
* an application because of AM failures.
|
||||
*/
|
||||
public class TestNodeBlacklistingOnAMFailures {
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testNodeBlacklistingOnAMFailure() throws Exception {
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
|
||||
true);
|
||||
|
||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
MockRM rm = startRM(conf, dispatcher);
|
||||
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
|
||||
RMApp app = rm.submitApp(200);
|
||||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
|
||||
ContainerId amContainerId =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
|
||||
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
|
||||
MockNM currentNode, otherNode;
|
||||
if (nodeWhereAMRan.equals(nm1.getNodeId())) {
|
||||
currentNode = nm1;
|
||||
otherNode = nm2;
|
||||
} else {
|
||||
currentNode = nm2;
|
||||
otherNode = nm1;
|
||||
}
|
||||
|
||||
// Set the exist status to INVALID so that we can verify that the system
|
||||
// automatically blacklisting the node
|
||||
makeAMContainerExit(rm, amContainerId, currentNode,
|
||||
ContainerExitStatus.INVALID);
|
||||
|
||||
// restart the am
|
||||
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
|
||||
System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
|
||||
|
||||
// Try the current node a few times
|
||||
for (int i = 0; i <= 2; i++) {
|
||||
currentNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
Assert.assertEquals(
|
||||
"AppAttemptState should still be SCHEDULED if currentNode is "
|
||||
+ "blacklisted correctly", RMAppAttemptState.SCHEDULED,
|
||||
attempt.getAppAttemptState());
|
||||
}
|
||||
|
||||
// Now try the other node
|
||||
otherNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
// Now the AM container should be allocated
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED,
|
||||
20000);
|
||||
|
||||
MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
amContainerId =
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||
rmContainer = scheduler.getRMContainer(amContainerId);
|
||||
nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
|
||||
// The other node should now receive the assignment
|
||||
Assert.assertEquals(
|
||||
"After blacklisting, AM should have run on the other node",
|
||||
otherNode.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
am2.registerAppAttempt();
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
List<Container> allocatedContainers =
|
||||
TestAMRestart.allocateContainers(currentNode, am2, 1);
|
||||
Assert.assertEquals(
|
||||
"Even though AM is blacklisted from the node, application can "
|
||||
+ "still allocate non-AM containers there",
|
||||
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testNoBlacklistingForNonSystemErrors() throws Exception {
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
|
||||
true);
|
||||
// disable the float so it is possible to blacklist the entire cluster
|
||||
conf.setFloat(
|
||||
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
1.5f);
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 100);
|
||||
|
||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
MockRM rm = startRM(conf, dispatcher);
|
||||
|
||||
MockNM node =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
|
||||
node.registerNode();
|
||||
|
||||
RMApp app = rm.submitApp(200);
|
||||
ApplicationId appId = app.getApplicationId();
|
||||
|
||||
int numAppAttempts = 1;
|
||||
|
||||
// Now the AM container should be allocated
|
||||
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
|
||||
node.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED,
|
||||
20000);
|
||||
rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, numAppAttempts);
|
||||
ContainerId amContainerId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
|
||||
for (int containerExitStatus : new int[] {
|
||||
ContainerExitStatus.PREEMPTED,
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
||||
// ContainerExitStatus.KILLED_BY_APPMASTER,
|
||||
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
|
||||
ContainerExitStatus.ABORTED, ContainerExitStatus.DISKS_FAILED,
|
||||
ContainerExitStatus.KILLED_EXCEEDED_VMEM,
|
||||
ContainerExitStatus.KILLED_EXCEEDED_PMEM }) {
|
||||
|
||||
// Set the exist status to be containerExitStatus so that we can verify
|
||||
// that the system automatically blacklisting the node
|
||||
makeAMContainerExit(rm, amContainerId, node, containerExitStatus);
|
||||
|
||||
// restart the am
|
||||
attempt = MockRM.waitForAttemptScheduled(app, rm);
|
||||
System.out
|
||||
.println("New AppAttempt launched " + attempt.getAppAttemptId());
|
||||
|
||||
node.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED,
|
||||
20000);
|
||||
rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
|
||||
numAppAttempts++;
|
||||
appAttemptId = ApplicationAttemptId.newInstance(appId, numAppAttempts);
|
||||
amContainerId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
rm.waitForState(node, amContainerId, RMContainerState.ACQUIRED);
|
||||
}
|
||||
}
|
||||
|
||||
private void makeAMContainerExit(MockRM rm, ContainerId amContainer,
|
||||
MockNM node, int exitStatus) throws Exception, InterruptedException {
|
||||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
|
||||
"", exitStatus, Resources.createResource(200));
|
||||
node.containerStatus(containerStatus);
|
||||
ApplicationAttemptId amAttemptID = amContainer.getApplicationAttemptId();
|
||||
rm.waitForState(amAttemptID, RMAppAttemptState.FAILED);
|
||||
rm.waitForState(amAttemptID.getApplicationId(), RMAppState.ACCEPTED);
|
||||
}
|
||||
|
||||
private MockRM startRM(YarnConfiguration conf,
|
||||
final DrainDispatcher dispatcher) {
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf, memStore) {
|
||||
@Override
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new SchedulerEventDispatcher(this.scheduler) {
|
||||
@Override
|
||||
public void handle(SchedulerEvent event) {
|
||||
super.handle(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
|
||||
rm1.start();
|
||||
return rm1;
|
||||
}
|
||||
}
|
|
@ -35,12 +35,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
|
@ -53,18 +49,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -237,7 +229,7 @@ public class TestAMRestart {
|
|||
rm1.stop();
|
||||
}
|
||||
|
||||
private List<Container> allocateContainers(MockNM nm1, MockAM am1,
|
||||
public static List<Container> allocateContainers(MockNM nm1, MockAM am1,
|
||||
int NUM_CONTAINERS) throws Exception {
|
||||
// allocate NUM_CONTAINERS containers
|
||||
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
||||
|
@ -276,7 +268,9 @@ public class TestAMRestart {
|
|||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
|
||||
// To prevent test from blacklisting nm1 for AM, we sit threshold to half
|
||||
// of 2 nodes which is 1
|
||||
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 0.5f);
|
||||
conf.setFloat(
|
||||
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
0.5f);
|
||||
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
|
@ -378,165 +372,6 @@ public class TestAMRestart {
|
|||
rm1.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
testAMBlacklistPreventRestartOnSameNode(false, conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testAMBlacklistPreventsRestartOnSameNodeForMinicluster()
|
||||
throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
|
||||
true);
|
||||
testAMBlacklistPreventRestartOnSameNode(false, conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 100000)
|
||||
public void testAMBlacklistPreemption() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
// disable the float so it is possible to blacklist the entire cluster
|
||||
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 1.5f);
|
||||
// since the exit status is PREEMPTED, it should not lead to the node being
|
||||
// blacklisted
|
||||
testAMBlacklistPreventRestartOnSameNode(true, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests AM blacklisting. In the multi-node mode (i.e. singleNode = false),
|
||||
* it tests the blacklisting behavior so that the AM container gets allocated
|
||||
* on the node that is not blacklisted. In the single-node mode, it tests the
|
||||
* PREEMPTED status to see if the AM container can continue to be scheduled.
|
||||
*/
|
||||
private void testAMBlacklistPreventRestartOnSameNode(boolean singleNode,
|
||||
YarnConfiguration conf) throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
MockRM rm1 = new MockRM(conf, memStore) {
|
||||
@Override
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new SchedulerEventDispatcher(this.scheduler) {
|
||||
@Override
|
||||
public void handle(SchedulerEvent event) {
|
||||
scheduler.handle(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
|
||||
rm1.start();
|
||||
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
MockNM nm2 = null;
|
||||
if (!singleNode) {
|
||||
nm2 =
|
||||
new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
}
|
||||
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
CapacityScheduler scheduler =
|
||||
(CapacityScheduler) rm1.getResourceScheduler();
|
||||
ContainerId amContainer =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
// Preempt the first attempt;
|
||||
RMContainer rmContainer = scheduler.getRMContainer(amContainer);
|
||||
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
|
||||
MockNM currentNode, otherNode;
|
||||
if (singleNode) {
|
||||
Assert.assertEquals(nm1.getNodeId(), nodeWhereAMRan);
|
||||
currentNode = nm1;
|
||||
otherNode = null; // not applicable
|
||||
} else {
|
||||
if (nodeWhereAMRan == nm1.getNodeId()) {
|
||||
currentNode = nm1;
|
||||
otherNode = nm2;
|
||||
} else {
|
||||
currentNode = nm2;
|
||||
otherNode = nm1;
|
||||
}
|
||||
}
|
||||
|
||||
// set the exist status to test
|
||||
// any status other than SUCCESS and PREEMPTED should cause the node to be
|
||||
// blacklisted
|
||||
int exitStatus = singleNode ?
|
||||
ContainerExitStatus.PREEMPTED :
|
||||
ContainerExitStatus.INVALID;
|
||||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
|
||||
"", exitStatus, Resources.createResource(200));
|
||||
currentNode.containerStatus(containerStatus);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// restart the am
|
||||
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app1, rm1);
|
||||
System.out.println("Launch AM " + attempt.getAppAttemptId());
|
||||
|
||||
|
||||
|
||||
currentNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
|
||||
if (!singleNode) {
|
||||
Assert.assertEquals(
|
||||
"AppAttemptState should still be SCHEDULED if currentNode is " +
|
||||
"blacklisted correctly",
|
||||
RMAppAttemptState.SCHEDULED,
|
||||
attempt.getAppAttemptState());
|
||||
|
||||
otherNode.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
}
|
||||
|
||||
MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
|
||||
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
amContainer =
|
||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||
rmContainer = scheduler.getRMContainer(amContainer);
|
||||
nodeWhereAMRan = rmContainer.getAllocatedNode();
|
||||
if (singleNode) {
|
||||
// with preemption, the node should not be blacklisted and should get the
|
||||
// assignment (with a single node)
|
||||
Assert.assertEquals(
|
||||
"AM should still have been able to run on the same node",
|
||||
currentNode.getNodeId(), nodeWhereAMRan);
|
||||
} else {
|
||||
// with a failed status, the other node should receive the assignment
|
||||
Assert.assertEquals(
|
||||
"After blacklisting AM should have run on the other node",
|
||||
otherNode.getNodeId(), nodeWhereAMRan);
|
||||
|
||||
am2.registerAppAttempt();
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
List<Container> allocatedContainers =
|
||||
allocateContainers(currentNode, am2, 1);
|
||||
Assert.assertEquals(
|
||||
"Even though AM is blacklisted from the node, application can " +
|
||||
"still allocate containers there",
|
||||
currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// AM container preempted, nm disk failure
|
||||
// should not be counted towards AM max retry count.
|
||||
@Test(timeout = 100000)
|
||||
|
|
|
@ -19,12 +19,13 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.blacklist;
|
||||
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestBlacklistManager {
|
||||
|
||||
@Test
|
||||
|
@ -37,12 +38,12 @@ public class TestBlacklistManager {
|
|||
String anyNode2 = "bar";
|
||||
manager.addNode(anyNode);
|
||||
manager.addNode(anyNode2);
|
||||
BlacklistUpdates blacklist = manager
|
||||
ResourceBlacklistRequest blacklist = manager
|
||||
.getBlacklistUpdates();
|
||||
|
||||
List<String> blacklistAdditions = blacklist.getAdditions();
|
||||
List<String> blacklistAdditions = blacklist.getBlacklistAdditions();
|
||||
Collections.sort(blacklistAdditions);
|
||||
List<String> blacklistRemovals = blacklist.getRemovals();
|
||||
List<String> blacklistRemovals = blacklist.getBlacklistRemovals();
|
||||
String[] expectedBlacklistAdditions = new String[]{anyNode2, anyNode};
|
||||
Assert.assertArrayEquals(
|
||||
"Blacklist additions was not as expected",
|
||||
|
@ -61,12 +62,12 @@ public class TestBlacklistManager {
|
|||
String anyNode = "foo";
|
||||
String anyNode2 = "bar";
|
||||
manager.addNode(anyNode);
|
||||
BlacklistUpdates blacklist = manager
|
||||
ResourceBlacklistRequest blacklist = manager
|
||||
.getBlacklistUpdates();
|
||||
|
||||
List<String> blacklistAdditions = blacklist.getAdditions();
|
||||
List<String> blacklistAdditions = blacklist.getBlacklistAdditions();
|
||||
Collections.sort(blacklistAdditions);
|
||||
List<String> blacklistRemovals = blacklist.getRemovals();
|
||||
List<String> blacklistRemovals = blacklist.getBlacklistRemovals();
|
||||
String[] expectedBlacklistAdditions = new String[]{anyNode};
|
||||
Assert.assertArrayEquals(
|
||||
"Blacklist additions was not as expected",
|
||||
|
@ -81,9 +82,9 @@ public class TestBlacklistManager {
|
|||
|
||||
blacklist = manager
|
||||
.getBlacklistUpdates();
|
||||
blacklistAdditions = blacklist.getAdditions();
|
||||
blacklistAdditions = blacklist.getBlacklistAdditions();
|
||||
Collections.sort(blacklistAdditions);
|
||||
blacklistRemovals = blacklist.getRemovals();
|
||||
blacklistRemovals = blacklist.getBlacklistRemovals();
|
||||
Collections.sort(blacklistRemovals);
|
||||
String[] expectedBlacklistRemovals = new String[] {anyNode2, anyNode};
|
||||
Assert.assertTrue(
|
||||
|
@ -101,11 +102,11 @@ public class TestBlacklistManager {
|
|||
BlacklistManager disabled = new DisabledBlacklistManager();
|
||||
String anyNode = "foo";
|
||||
disabled.addNode(anyNode);
|
||||
BlacklistUpdates blacklist = disabled
|
||||
ResourceBlacklistRequest blacklist = disabled
|
||||
.getBlacklistUpdates();
|
||||
|
||||
List<String> blacklistAdditions = blacklist.getAdditions();
|
||||
List<String> blacklistRemovals = blacklist.getRemovals();
|
||||
List<String> blacklistAdditions = blacklist.getBlacklistAdditions();
|
||||
List<String> blacklistRemovals = blacklist.getBlacklistRemovals();
|
||||
Assert.assertTrue(
|
||||
"Blacklist additions should be empty but was " +
|
||||
blacklistAdditions,
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.security.SecurityUtil;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
|
@ -1063,63 +1062,6 @@ public class TestRMAppTransitions {
|
|||
+ "/"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMBlackListConfigFromApp() {
|
||||
// Scenario 1: Application enables AM blacklisting
|
||||
float disableThreshold = 0.9f;
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, false);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
new ApplicationSubmissionContextPBImpl();
|
||||
submissionContext.setAMBlackListRequest(AMBlackListingRequest.newInstance(
|
||||
true, disableThreshold));
|
||||
RMAppImpl application = (RMAppImpl) createNewTestApp(submissionContext);
|
||||
|
||||
Assert.assertTrue(application.isAmBlacklistingEnabled());
|
||||
Assert.assertEquals(disableThreshold,
|
||||
application.getAmBlacklistingDisableThreshold(), 1e-8);
|
||||
|
||||
// Scenario 2: Application disables AM blacklisting
|
||||
float globalThreshold = 0.9f;
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
globalThreshold);
|
||||
ApplicationSubmissionContext submissionContext2 =
|
||||
new ApplicationSubmissionContextPBImpl();
|
||||
submissionContext2.setAMBlackListRequest(AMBlackListingRequest.newInstance(
|
||||
false, disableThreshold));
|
||||
RMAppImpl application2 = (RMAppImpl) createNewTestApp(submissionContext2);
|
||||
|
||||
// Am blacklisting will be disabled eventhough its enabled in RM.
|
||||
Assert.assertFalse(application2.isAmBlacklistingEnabled());
|
||||
|
||||
// Scenario 3: Application updates invalid AM threshold
|
||||
float invalidDisableThreshold = -0.5f;
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
globalThreshold);
|
||||
ApplicationSubmissionContext submissionContext3 =
|
||||
new ApplicationSubmissionContextPBImpl();
|
||||
submissionContext3.setAMBlackListRequest(AMBlackListingRequest.newInstance(
|
||||
true, invalidDisableThreshold));
|
||||
RMAppImpl application3 = (RMAppImpl) createNewTestApp(submissionContext3);
|
||||
|
||||
Assert.assertTrue(application3.isAmBlacklistingEnabled());
|
||||
Assert.assertEquals(globalThreshold,
|
||||
application3.getAmBlacklistingDisableThreshold(), 1e-8);
|
||||
|
||||
// Scenario 4: Empty AMBlackListingRequest in Submission Context
|
||||
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
|
||||
globalThreshold);
|
||||
ApplicationSubmissionContext submissionContext4 =
|
||||
new ApplicationSubmissionContextPBImpl();
|
||||
RMAppImpl application4 = (RMAppImpl) createNewTestApp(submissionContext4);
|
||||
|
||||
Assert.assertTrue(application4.isAmBlacklistingEnabled());
|
||||
Assert.assertEquals(globalThreshold,
|
||||
application4.getAmBlacklistingDisableThreshold(), 1e-8);
|
||||
}
|
||||
|
||||
private void verifyApplicationFinished(RMAppState state) {
|
||||
ArgumentCaptor<RMAppState> finalState =
|
||||
ArgumentCaptor.forClass(RMAppState.class);
|
||||
|
|
|
@ -41,32 +41,32 @@ public class TestAppSchedulingInfo {
|
|||
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
|
||||
appAttemptId, "test", queue, null, 0, new ResourceUsage());
|
||||
|
||||
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
|
||||
new ArrayList<String>());
|
||||
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||
|
||||
ArrayList<String> blacklistAdditions = new ArrayList<String>();
|
||||
blacklistAdditions.add("node1");
|
||||
blacklistAdditions.add("node2");
|
||||
appSchedulingInfo.updateBlacklist(blacklistAdditions,
|
||||
appSchedulingInfo.updatePlacesBlacklistedByApp(blacklistAdditions,
|
||||
new ArrayList<String>());
|
||||
Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||
|
||||
blacklistAdditions.clear();
|
||||
blacklistAdditions.add("node1");
|
||||
appSchedulingInfo.updateBlacklist(blacklistAdditions,
|
||||
appSchedulingInfo.updatePlacesBlacklistedByApp(blacklistAdditions,
|
||||
new ArrayList<String>());
|
||||
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||
|
||||
ArrayList<String> blacklistRemovals = new ArrayList<String>();
|
||||
blacklistRemovals.add("node1");
|
||||
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
|
||||
blacklistRemovals);
|
||||
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
|
||||
blacklistRemovals);
|
||||
Assert.assertTrue(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||
|
||||
appSchedulingInfo.updateBlacklist(new ArrayList<String>(),
|
||||
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
|
||||
blacklistRemovals);
|
||||
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||
}
|
||||
|
|
|
@ -737,11 +737,13 @@ public class TestCapacityScheduler {
|
|||
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
||||
Collections.<ContainerId>emptyList(),
|
||||
Collections.singletonList(host), null, null, null);
|
||||
Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
|
||||
Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
|
||||
.isPlaceBlacklisted(host));
|
||||
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
||||
Collections.<ContainerId>emptyList(), null,
|
||||
Collections.singletonList(host), null, null);
|
||||
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
|
||||
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
|
||||
.isPlaceBlacklisted(host));
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
|
|
|
@ -313,24 +313,24 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
|
|||
FSAppAttempt spyApp = spy(app);
|
||||
doReturn(false)
|
||||
.when(spyApp).isWaitingForAMContainer();
|
||||
assertTrue(spyApp.isBlacklisted(n1.getNodeName()));
|
||||
assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
|
||||
assertTrue(spyApp.isPlaceBlacklisted(n1.getNodeName()));
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName()));
|
||||
assertEquals(n2.getAvailableResource(), spyApp.getHeadroom());
|
||||
|
||||
blacklistAdditions.clear();
|
||||
blacklistAdditions.add(n2.getNodeName());
|
||||
blacklistRemovals.add(n1.getNodeName());
|
||||
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
|
||||
assertTrue(spyApp.isBlacklisted(n2.getNodeName()));
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName()));
|
||||
assertTrue(spyApp.isPlaceBlacklisted(n2.getNodeName()));
|
||||
assertEquals(n1.getAvailableResource(), spyApp.getHeadroom());
|
||||
|
||||
blacklistAdditions.clear();
|
||||
blacklistRemovals.clear();
|
||||
blacklistRemovals.add(n2.getNodeName());
|
||||
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
assertFalse(spyApp.isBlacklisted(n1.getNodeName()));
|
||||
assertFalse(spyApp.isBlacklisted(n2.getNodeName()));
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName()));
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName()));
|
||||
assertEquals(clusterResource, spyApp.getHeadroom());
|
||||
}
|
||||
|
||||
|
|
|
@ -4907,11 +4907,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
||||
Collections.<ContainerId>emptyList(),
|
||||
Collections.singletonList(host), null, null, null);
|
||||
assertTrue(app.isBlacklisted(host));
|
||||
assertTrue(app.isPlaceBlacklisted(host));
|
||||
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
||||
Collections.<ContainerId>emptyList(), null,
|
||||
Collections.singletonList(host), null, null);
|
||||
assertFalse(scheduler.getSchedulerApp(appAttemptId).isBlacklisted(host));
|
||||
assertFalse(scheduler.getSchedulerApp(appAttemptId)
|
||||
.isPlaceBlacklisted(host));
|
||||
|
||||
List<ResourceRequest> update = Arrays.asList(
|
||||
createResourceRequest(GB, node.getHostName(), 1, 0, true));
|
||||
|
@ -4920,7 +4921,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.allocate(appAttemptId, update,
|
||||
Collections.<ContainerId>emptyList(),
|
||||
Collections.singletonList(host), null, null, null);
|
||||
assertTrue(app.isBlacklisted(host));
|
||||
assertTrue(app.isPlaceBlacklisted(host));
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
assertEquals("Incorrect number of containers allocated", 0, app
|
||||
|
@ -4930,7 +4931,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.allocate(appAttemptId, update,
|
||||
Collections.<ContainerId>emptyList(), null,
|
||||
Collections.singletonList(host), null, null);
|
||||
assertFalse(app.isBlacklisted(host));
|
||||
assertFalse(app.isPlaceBlacklisted(host));
|
||||
createSchedulingRequest(GB, "root.default", "user", 1);
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
|
|
|
@ -18,21 +18,38 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.servlet.GuiceServletContextListener;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||
import com.sun.jersey.api.client.filter.LoggingFilter;
|
||||
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||
import com.sun.jersey.api.json.JSONMarshaller;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -82,36 +99,21 @@ import org.w3c.dom.NodeList;
|
|||
import org.xml.sax.InputSource;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.servlet.GuiceServletContextListener;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||
import com.sun.jersey.api.client.filter.LoggingFilter;
|
||||
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||
import com.sun.jersey.api.json.JSONMarshaller;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestRMWebServicesAppsModification extends JerseyTestBase {
|
||||
|
|
Loading…
Reference in New Issue