YARN-4389. Allow application to enable or disable am blacklisting. (Sunil G via rohithsharmaks)

(cherry picked from commit d3c08cde68)
This commit is contained in:
rohithsharmaks 2016-01-15 22:46:21 +05:30
parent de31bff67d
commit 9baeae9368
9 changed files with 362 additions and 12 deletions

View File

@ -1128,6 +1128,10 @@ Release 2.8.0 - UNRELEASED
YARN-4534. Remove the redundant symbol in yarn rmadmin help msg.
(Lin Yiqun via aajisaka)
YARN-4389. "yarn.am.blacklisting.enabled" and "yarn.am.blacklisting.disable-
failure-threshold" should be app specific rather than a setting for whole
YARN cluster. (Sunil G via rohithsharmaks)
YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
(sandflee via junping_du)

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* Specific AMBlacklistingRequest from AM to enable/disable blacklisting.
*/
@Public
@Evolving
public abstract class AMBlackListingRequest {
@Private
@Unstable
public static AMBlackListingRequest newInstance(
boolean isAMBlackListingEnabled, float disableFailureThreshold) {
AMBlackListingRequest blackListRequest = Records
.newRecord(AMBlackListingRequest.class);
blackListRequest.setBlackListingEnabled(isAMBlackListingEnabled);
blackListRequest
.setBlackListingDisableFailureThreshold(disableFailureThreshold);
return blackListRequest;
}
/**
* @return AM Blacklisting is enabled.
*/
@Public
@Evolving
public abstract boolean isAMBlackListingEnabled();
/**
* @return AM Blacklisting disable failure threshold
*/
@Public
@Evolving
public abstract float getBlackListingDisableFailureThreshold();
@Private
@Unstable
public abstract void setBlackListingEnabled(boolean isAMBlackListingEnabled);
@Private
@Unstable
public abstract void setBlackListingDisableFailureThreshold(
float disableFailureThreshold);
}

View File

@ -535,4 +535,27 @@ public abstract void setLogAggregationContext(
@Public
@Unstable
public abstract void setReservationID(ReservationId reservationID);
/**
* Get AM Blacklisting request object to know whether application needs any
* specific blacklisting for AM Nodes.
*
* @return AMBlackListingRequest object which has blacklisting information.
*/
@Public
@Unstable
public abstract AMBlackListingRequest getAMBlackListRequest();
/**
* Get AM Blacklisting request object to know whether application needs any
* specific blacklisting for AM Nodes.
*
* @param blackListRequest
* object which has blacklisting information such as
* "enable/disable AM blacklisting" and "disable failure threshold".
*/
@Public
@Unstable
public abstract void setAMBlackListRequest(
AMBlackListingRequest blackListRequest);
}

View File

@ -284,6 +284,10 @@ enum ContainerTypeProto {
TASK = 2;
}
message AMBlackListingRequestProto {
optional bool blacklisting_enabled = 1 [default = false];
optional float blacklisting_failure_threshold = 2;
}
////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
@ -349,6 +353,7 @@ message ApplicationSubmissionContextProto {
optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17;
optional AMBlackListingRequestProto am_blacklisting_request = 18;
}
message LogAggregationContextProto {

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class AMBlackListingRequestPBImpl extends AMBlackListingRequest {
AMBlackListingRequestProto proto = AMBlackListingRequestProto
.getDefaultInstance();
AMBlackListingRequestProto.Builder builder = null;
boolean viaProto = false;
public AMBlackListingRequestPBImpl() {
builder = AMBlackListingRequestProto.newBuilder();
}
public AMBlackListingRequestPBImpl(AMBlackListingRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public AMBlackListingRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AMBlackListingRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public boolean isAMBlackListingEnabled() {
AMBlackListingRequestProtoOrBuilder p = viaProto ? proto : builder;
return p.getBlacklistingEnabled();
}
@Override
public float getBlackListingDisableFailureThreshold() {
AMBlackListingRequestProtoOrBuilder p = viaProto ? proto : builder;
return p.getBlacklistingFailureThreshold();
}
@Override
public void setBlackListingEnabled(boolean isAMBlackListingEnabled) {
maybeInitBuilder();
builder.setBlacklistingEnabled(isAMBlackListingEnabled);
}
@Override
public void setBlackListingDisableFailureThreshold(
float disableFailureThreshold) {
maybeInitBuilder();
builder.setBlacklistingFailureThreshold(disableFailureThreshold);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
@ -63,6 +65,7 @@ public class ApplicationSubmissionContextPBImpl
private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null;
private AMBlackListingRequest amBlackListRequest = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@ -131,6 +134,10 @@ private void mergeLocalToBuilder() {
if (this.reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId));
}
if (this.amBlackListRequest != null) {
builder.setAmBlacklistingRequest(
convertToProtoFormat(this.amBlackListRequest));
}
}
private void mergeLocalToProto() {
@ -413,6 +420,29 @@ public boolean getKeepContainersAcrossApplicationAttempts() {
return p.getKeepContainersAcrossApplicationAttempts();
}
@Override
public AMBlackListingRequest getAMBlackListRequest() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (amBlackListRequest != null) {
return amBlackListRequest;
}
if (!p.hasAmBlacklistingRequest()) {
return null;
}
amBlackListRequest = convertFromProtoFormat(p.getAmBlacklistingRequest());
return amBlackListRequest;
}
@Override
public void setAMBlackListRequest(AMBlackListingRequest amBlackListRequest) {
maybeInitBuilder();
if (amBlackListRequest == null) {
builder.clearAmBlacklistingRequest();
return;
}
this.amBlackListRequest = amBlackListRequest;
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
@ -455,6 +485,16 @@ private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
private AMBlackListingRequestPBImpl convertFromProtoFormat(
AMBlackListingRequestProto a) {
return new AMBlackListingRequestPBImpl(a);
}
private AMBlackListingRequestProto convertToProtoFormat(
AMBlackListingRequest a) {
return ((AMBlackListingRequestPBImpl) a).getProto();
}
@Override
public String getNodeLabelExpression() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -107,6 +107,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -148,6 +149,7 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.impl.pb.AMBlackListingRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
@ -181,6 +183,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.YarnClusterMetricsPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
@ -498,6 +501,7 @@ public static void setup() throws Exception {
generateByNewInstance(ReservationRequests.class);
generateByNewInstance(ReservationDefinition.class);
generateByNewInstance(ResourceUtilization.class);
generateByNewInstance(AMBlackListingRequest.class);
}
private class GetSetPair {
@ -1319,4 +1323,10 @@ public void testCheckForDecommissioningNodesResponsePBImpl() throws Exception {
validatePBImplRecord(CheckForDecommissioningNodesResponsePBImpl.class,
CheckForDecommissioningNodesResponseProto.class);
}
@Test
public void testAMBlackListingRequestPBImpl() throws Exception {
validatePBImplRecord(AMBlackListingRequestPBImpl.class,
AMBlackListingRequestProto.class);
}
}

View File

@ -138,8 +138,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Set<String> applicationTags;
private final long attemptFailuresValidityInterval;
private final boolean amBlacklistingEnabled;
private final float blacklistDisableThreshold;
private boolean amBlacklistingEnabled = false;
private float blacklistDisableThreshold;
private Clock systemClock;
@ -387,7 +387,9 @@ RMAppEventType.KILL, new KillAttemptTransition())
stateMachine;
private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
private static final float MINIMUM_THRESHOLD_VALUE = 0.0f;
private static final float MAXIMUM_THRESHOLD_VALUE = 1.0f;
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
@ -465,16 +467,43 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
amBlacklistingEnabled = conf.getBoolean(
YarnConfiguration.AM_BLACKLISTING_ENABLED,
YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
if (amBlacklistingEnabled) {
blacklistDisableThreshold = conf.getFloat(
YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
} else {
// amBlacklistingEnabled can be configured globally and by each
// application.
// Case 1: If AMBlackListRequest is available in submission context, we
// will consider only app level request (RM level configuration will be
// skipped).
// Case 2: AMBlackListRequest is available in submission context and
// amBlacklisting is disabled. In this case, AM blacklisting wont be
// enabled for this app even if this feature is enabled in RM level.
// Case 3: AMBlackListRequest is not available through submission context.
// RM level AM black listing configuration will be considered.
if (null != submissionContext.getAMBlackListRequest()) {
amBlacklistingEnabled = submissionContext.getAMBlackListRequest()
.isAMBlackListingEnabled();
blacklistDisableThreshold = 0.0f;
if (amBlacklistingEnabled) {
blacklistDisableThreshold = submissionContext.getAMBlackListRequest()
.getBlackListingDisableFailureThreshold();
// Verify whether blacklistDisableThreshold is valid. And for invalid
// threshold, reset to global level blacklistDisableThreshold
// configured.
if (blacklistDisableThreshold < MINIMUM_THRESHOLD_VALUE
|| blacklistDisableThreshold > MAXIMUM_THRESHOLD_VALUE) {
blacklistDisableThreshold = conf.getFloat(
YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
}
}
} else {
amBlacklistingEnabled = conf.getBoolean(
YarnConfiguration.AM_BLACKLISTING_ENABLED,
YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
if (amBlacklistingEnabled) {
blacklistDisableThreshold = conf.getFloat(
YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
}
}
}
@ -1732,4 +1761,14 @@ private void sendATSCreateEvent(RMApp app, long startTime) {
rmContext.getRMApplicationHistoryWriter().applicationStarted(app);
rmContext.getSystemMetricsPublisher().appCreated(app, startTime);
}
@VisibleForTesting
public boolean isAmBlacklistingEnabled() {
return amBlacklistingEnabled;
}
@VisibleForTesting
public float getAmBlacklistingDisableThreshold() {
return blacklistDisableThreshold;
}
}

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -1027,6 +1028,63 @@ public void testGetAppReport() {
+ "/"));
}
@Test
public void testAMBlackListConfigFromApp() {
// Scenario 1: Application enables AM blacklisting
float disableThreshold = 0.9f;
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, false);
ApplicationSubmissionContext submissionContext =
new ApplicationSubmissionContextPBImpl();
submissionContext.setAMBlackListRequest(AMBlackListingRequest.newInstance(
true, disableThreshold));
RMAppImpl application = (RMAppImpl) createNewTestApp(submissionContext);
Assert.assertTrue(application.isAmBlacklistingEnabled());
Assert.assertEquals(disableThreshold,
application.getAmBlacklistingDisableThreshold(), 1e-8);
// Scenario 2: Application disables AM blacklisting
float globalThreshold = 0.9f;
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
globalThreshold);
ApplicationSubmissionContext submissionContext2 =
new ApplicationSubmissionContextPBImpl();
submissionContext2.setAMBlackListRequest(AMBlackListingRequest.newInstance(
false, disableThreshold));
RMAppImpl application2 = (RMAppImpl) createNewTestApp(submissionContext2);
// Am blacklisting will be disabled eventhough its enabled in RM.
Assert.assertFalse(application2.isAmBlacklistingEnabled());
// Scenario 3: Application updates invalid AM threshold
float invalidDisableThreshold = -0.5f;
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
globalThreshold);
ApplicationSubmissionContext submissionContext3 =
new ApplicationSubmissionContextPBImpl();
submissionContext3.setAMBlackListRequest(AMBlackListingRequest.newInstance(
true, invalidDisableThreshold));
RMAppImpl application3 = (RMAppImpl) createNewTestApp(submissionContext3);
Assert.assertTrue(application3.isAmBlacklistingEnabled());
Assert.assertEquals(globalThreshold,
application3.getAmBlacklistingDisableThreshold(), 1e-8);
// Scenario 4: Empty AMBlackListingRequest in Submission Context
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
globalThreshold);
ApplicationSubmissionContext submissionContext4 =
new ApplicationSubmissionContextPBImpl();
RMAppImpl application4 = (RMAppImpl) createNewTestApp(submissionContext4);
Assert.assertTrue(application4.isAmBlacklistingEnabled());
Assert.assertEquals(globalThreshold,
application4.getAmBlacklistingDisableThreshold(), 1e-8);
}
private void verifyApplicationFinished(RMAppState state) {
ArgumentCaptor<RMAppState> finalState =
ArgumentCaptor.forClass(RMAppState.class);