YARN-261. Ability to fail AM attempts. Contributed by Andrey Klochkov and Rohith Sharma K S

This commit is contained in:
Jason Lowe 2015-10-09 14:17:38 +00:00
parent 8f195387a4
commit a0bca2b5ad
27 changed files with 815 additions and 6 deletions

View File

@ -293,6 +293,12 @@ public class ResourceMgrDelegate extends YarnClient {
return client.submitApplication(appContext);
}
@Override
public void failApplicationAttempt(ApplicationAttemptId attemptId)
throws YarnException, IOException {
client.failApplicationAttempt(attemptId);
}
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {

View File

@ -70,6 +70,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -327,6 +329,12 @@ public class TestClientRedirect {
throw new IOException("Test");
}
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws IOException {
return recordFactory.newRecordInstance(FailApplicationAttemptResponse.class);
}
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws IOException {

View File

@ -226,6 +226,9 @@ Release 2.8.0 - UNRELEASED
YARN-1897. CLI and core support for signal container functionality.
(Ming Ma via xgong)
YARN-261. Ability to fail AM attempts (Andrey Klochkov and
Rohith Sharma K S via jlowe)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
@ -150,6 +152,32 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
/**
* <p>The interface used by clients to request the
* <code>ResourceManager</code> to fail an application attempt.</p>
*
* <p>The client, via {@link FailApplicationAttemptRequest} provides the
* {@link ApplicationAttemptId} of the attempt to be failed.</p>
*
* <p> In secure mode,the <code>ResourceManager</code> verifies access to the
* application, queue etc. before failing the attempt.</p>
*
* <p>Currently, the <code>ResourceManager</code> returns an empty response
* on success and throws an exception on rejecting the request.</p>
*
* @param request request to fail an attempt
* @return <code>ResourceManager</code> returns an empty response
* on success and throws an exception on rejecting the request
* @throws YarnException
* @throws IOException
* @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)
*/
@Public
@Unstable
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request)
throws YarnException, IOException;
/**
* <p>The interface used by clients to request the
* <code>ResourceManager</code> to abort submitted application.</p>
*
* <p>The client, via {@link KillApplicationRequest} provides the

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The request sent by the client to the <code>ResourceManager</code>
* to fail an application attempt.</p>
*
* <p>The request includes the {@link ApplicationAttemptId} of the attempt to
* be failed.</p>
*
* @see ApplicationClientProtocol#failApplicationAttempt(FailApplicationAttemptRequest)
*/
@Public
@Stable
public abstract class FailApplicationAttemptRequest {
@Public
@Stable
public static FailApplicationAttemptRequest newInstance(
ApplicationAttemptId attemptId) {
FailApplicationAttemptRequest request =
Records.newRecord(FailApplicationAttemptRequest.class);
request.setApplicationAttemptId(attemptId);
return request;
}
/**
* Get the <code>ApplicationAttemptId</code> of the attempt to be failed.
* @return <code>ApplicationAttemptId</code> of the attempt.
*/
@Public
@Stable
public abstract ApplicationAttemptId getApplicationAttemptId();
@Public
@Stable
public abstract void setApplicationAttemptId(
ApplicationAttemptId applicationAttemptId);
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The response sent by the <code>ResourceManager</code> to the client
* failing an application attempt.</p>
*
* <p>Currently it's empty.</p>
*
* @see ApplicationClientProtocol#failApplicationAttempt(FailApplicationAttemptRequest)
*/
@Public
@Stable
public abstract class FailApplicationAttemptResponse {
@Private
@Unstable
public static FailApplicationAttemptResponse newInstance() {
FailApplicationAttemptResponse response =
Records.newRecord(FailApplicationAttemptResponse.class);
return response;
}
}

View File

@ -35,6 +35,7 @@ service ApplicationClientProtocolService {
rpc getNewApplication (GetNewApplicationRequestProto) returns (GetNewApplicationResponseProto);
rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto);
rpc failApplicationAttempt (FailApplicationAttemptRequestProto) returns (FailApplicationAttemptResponseProto);
rpc forceKillApplication (KillApplicationRequestProto) returns (KillApplicationResponseProto);
rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
rpc getApplications (GetApplicationsRequestProto) returns (GetApplicationsResponseProto);

View File

@ -122,6 +122,13 @@ message SubmitApplicationRequestProto {
message SubmitApplicationResponseProto {
}
message FailApplicationAttemptRequestProto {
optional ApplicationAttemptIdProto application_attempt_id = 1;
}
message FailApplicationAttemptResponseProto {
}
message KillApplicationRequestProto {
optional ApplicationIdProto application_id = 1;
}

View File

@ -136,6 +136,23 @@ public abstract class YarnClient extends AbstractService {
ApplicationSubmissionContext appContext) throws YarnException,
IOException;
/**
* <p>
* Fail an application attempt identified by given ID.
* </p>
*
* @param applicationAttemptId
* {@link ApplicationAttemptId} of the attempt to fail.
* @throws YarnException
* in case of errors or if YARN rejects the request due to
* access-control restrictions.
* @throws IOException
* @see #getQueueAclsInfo()
*/
public abstract void failApplicationAttempt(
ApplicationAttemptId applicationAttemptId) throws YarnException,
IOException;
/**
* <p>
* Kill an application identified by given ID.

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -382,6 +383,16 @@ public class YarnClientImpl extends YarnClient {
return UserGroupInformation.isSecurityEnabled();
}
@Override
public void failApplicationAttempt(ApplicationAttemptId attemptId)
throws YarnException, IOException {
LOG.info("Failing application attempt " + attemptId);
FailApplicationAttemptRequest request =
Records.newRecord(FailApplicationAttemptRequest.class);
request.setApplicationAttemptId(attemptId);
rmClient.failApplicationAttempt(request);
}
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -138,9 +139,11 @@ public class ApplicationCLI extends YarnCLI {
"Prints the status of the application attempt.");
opts.addOption(LIST_CMD, true,
"List application attempts for aplication.");
opts.addOption(FAIL_CMD, true, "Fails application attempt.");
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
opts.getOption(STATUS_CMD).setArgName("Application Attempt ID");
opts.getOption(LIST_CMD).setArgName("Application ID");
opts.getOption(FAIL_CMD).setArgName("Application Attempt ID");
} else if (args.length > 0 && args[0].equalsIgnoreCase(CONTAINER)) {
title = CONTAINER;
opts.addOption(STATUS_CMD, true,
@ -252,6 +255,12 @@ public class ApplicationCLI extends YarnCLI {
}
moveApplicationAcrossQueues(cliParser.getOptionValue(MOVE_TO_QUEUE_CMD),
cliParser.getOptionValue(QUEUE_CMD));
} else if (cliParser.hasOption(FAIL_CMD)) {
if (!args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) {
printUsage(title, opts);
return exitCode;
}
failApplicationAttempt(cliParser.getOptionValue(FAIL_CMD));
} else if (cliParser.hasOption(HELP_CMD)) {
printUsage(title, opts);
return 0;
@ -518,6 +527,25 @@ public class ApplicationCLI extends YarnCLI {
}
}
/**
* Fails an application attempt.
*
* @param attemptId ID of the attempt to fail. If provided, applicationId
* parameter is not used.
* @throws YarnException
* @throws IOException
*/
private void failApplicationAttempt(String attemptId) throws YarnException,
IOException {
ApplicationId appId;
ApplicationAttemptId attId;
attId = ConverterUtils.toApplicationAttemptId(attemptId);
appId = attId.getApplicationId();
sysout.println("Failing attempt " + attId + " of application " + appId);
client.failApplicationAttempt(attId);
}
/**
* Prints the application report for an application id.
*

View File

@ -33,6 +33,7 @@ public abstract class YarnCLI extends Configured implements Tool {
public static final String STATUS_CMD = "status";
public static final String LIST_CMD = "list";
public static final String KILL_CMD = "kill";
public static final String FAIL_CMD = "fail";
public static final String MOVE_TO_QUEUE_CMD = "movetoqueue";
public static final String HELP_CMD = "help";
public static final String SIGNAL_CMD = "signal";
@ -42,11 +43,15 @@ public abstract class YarnCLI extends Configured implements Tool {
public YarnCLI() {
super(new YarnConfiguration());
client = YarnClient.createYarnClient();
client = createYarnClient();
client.init(getConf());
client.start();
}
protected YarnClient createYarnClient() {
return YarnClient.createYarnClient();
}
public void setSysOutPrintStream(PrintStream sysout) {
this.sysout = sysout;
}

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
@ -1443,6 +1444,18 @@ public class TestYarnCLI {
}
@Test
public void testFailApplicationAttempt() throws Exception {
ApplicationCLI cli = createAndGetAppCLI();
int exitCode =
cli.run(new String[] { "applicationattempt", "-fail",
"appattempt_1444199730803_0003_000001" });
Assert.assertEquals(0, exitCode);
verify(client).failApplicationAttempt(any(ApplicationAttemptId.class));
verifyNoMoreInteractions(client);
}
private void verifyUsageInfo(YarnCLI cli) throws Exception {
cli.setSysErrPrintStream(sysErr);
cli.run(new String[] { "application" });
@ -1527,6 +1540,7 @@ public class TestYarnCLI {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: applicationattempt");
pw.println(" -fail <Application Attempt ID> Fails application attempt.");
pw.println(" -help Displays help for all commands.");
pw.println(" -list <Application ID> List application attempts for");
pw.println(" aplication.");

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -83,6 +85,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FailApplicationAttemptRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FailApplicationAttemptResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsRequestPBImpl;
@ -134,6 +138,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRespo
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FailApplicationAttemptRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
@ -177,6 +182,20 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
}
}
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
FailApplicationAttemptRequestProto requestProto =
((FailApplicationAttemptRequestPBImpl) request).getProto();
try {
return new FailApplicationAttemptResponsePBImpl(proxy.failApplicationAttempt(
null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRespo
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FailApplicationAttemptRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FailApplicationAttemptResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsRequestPBImpl;
@ -104,6 +107,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationReque
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FailApplicationAttemptRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FailApplicationAttemptResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
@ -162,6 +167,20 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
this.real = impl;
}
@Override
public FailApplicationAttemptResponseProto failApplicationAttempt(RpcController arg0,
FailApplicationAttemptRequestProto proto) throws ServiceException {
FailApplicationAttemptRequestPBImpl request = new FailApplicationAttemptRequestPBImpl(proto);
try {
FailApplicationAttemptResponse response = real.failApplicationAttempt(request);
return ((FailApplicationAttemptResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public KillApplicationResponseProto forceKillApplication(RpcController arg0,
KillApplicationRequestProto proto) throws ServiceException {

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FailApplicationAttemptRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FailApplicationAttemptRequestProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class FailApplicationAttemptRequestPBImpl extends FailApplicationAttemptRequest {
FailApplicationAttemptRequestProto proto = FailApplicationAttemptRequestProto.getDefaultInstance();
FailApplicationAttemptRequestProto.Builder builder = null;
boolean viaProto = false;
private ApplicationAttemptId applicationAttemptId = null;
public FailApplicationAttemptRequestPBImpl() {
builder = FailApplicationAttemptRequestProto.newBuilder();
}
public FailApplicationAttemptRequestPBImpl(FailApplicationAttemptRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public FailApplicationAttemptRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToBuilder() {
if (this.applicationAttemptId != null) {
builder.setApplicationAttemptId(
convertToProtoFormat(this.applicationAttemptId));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = FailApplicationAttemptRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
FailApplicationAttemptRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationAttemptId != null) {
return this.applicationAttemptId;
}
if (!p.hasApplicationAttemptId()) {
return null;
}
this.applicationAttemptId = convertFromProtoFormat(
p.getApplicationAttemptId());
return this.applicationAttemptId;
}
@Override
public void setApplicationAttemptId(
ApplicationAttemptId applicationAttemptId) {
maybeInitBuilder();
if (applicationAttemptId == null)
builder.clearApplicationAttemptId();
this.applicationAttemptId = applicationAttemptId;
}
private ApplicationAttemptIdPBImpl convertFromProtoFormat(
ApplicationAttemptIdProto p) {
return new ApplicationAttemptIdPBImpl(p);
}
private ApplicationAttemptIdProto convertToProtoFormat(
ApplicationAttemptId t) {
return ((ApplicationAttemptIdPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FailApplicationAttemptResponseProto;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class FailApplicationAttemptResponsePBImpl extends FailApplicationAttemptResponse {
FailApplicationAttemptResponseProto proto = FailApplicationAttemptResponseProto.getDefaultInstance();
FailApplicationAttemptResponseProto.Builder builder = null;
boolean viaProto = false;
public FailApplicationAttemptResponsePBImpl() {
builder = FailApplicationAttemptResponseProto.newBuilder();
}
public FailApplicationAttemptResponsePBImpl(FailApplicationAttemptResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public FailApplicationAttemptResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
@ -472,4 +474,10 @@ public class MockResourceManagerFacade implements
SignalContainerRequest request) throws IOException {
return null;
}
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
}

View File

@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@ -139,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
@ -614,6 +617,79 @@ public class ClientRMService extends AbstractService implements
return response;
}
@SuppressWarnings("unchecked")
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException {
ApplicationAttemptId attemptId = request.getApplicationAttemptId();
ApplicationId applicationId = attemptId.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.FAIL_ATTEMPT_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId, attemptId);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to fail an attempt of an absent application", applicationId,
attemptId);
throw new ApplicationNotFoundException("Trying to fail an attempt "
+ attemptId + " of an absent application " + applicationId);
}
RMAppAttempt appAttempt = application.getAppAttempts().get(attemptId);
if (appAttempt == null) {
throw new ApplicationAttemptNotFoundException(
"ApplicationAttempt with id '" + attemptId + "' doesn't exist in RM.");
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
FailApplicationAttemptResponse response =
recordFactory.newRecordInstance(FailApplicationAttemptResponse.class);
if (!ACTIVE_APP_STATES.contains(application.getState())) {
if (COMPLETED_APP_STATES.contains(application.getState())) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService",
applicationId);
return response;
}
}
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppAttemptFailedEvent(attemptId,
"Attempt failed by user."));
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService", applicationId,
attemptId);
return response;
}
@SuppressWarnings("unchecked")
@Override
public KillApplicationResponse forceKillApplication(

View File

@ -43,6 +43,7 @@ public class RMAuditLogger {
static final String KEY_VAL_SEPARATOR = "=";
static final char PAIR_SEPARATOR = '\t';
public static final String FAIL_ATTEMPT_REQUEST = "Fail Attempt Request";
public static final String KILL_APP_REQUEST = "Kill Application Request";
public static final String SUBMIT_APP_REQUEST = "Submit Application Request";
public static final String MOVE_APP_REQUEST = "Move Application Request";

View File

@ -22,6 +22,7 @@ public enum RMAppAttemptEventType {
// Source: RMApp
START,
KILL,
FAIL,
// Source: AMLauncher
LAUNCHED,

View File

@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
@ -175,6 +176,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final boolean maybeLastAttempt;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
private static final AttemptFailedTransition FAILED_TRANSITION =
new AttemptFailedTransition();
private RMAppAttemptEvent eventCausingFinalSaving;
private RMAppAttemptState targetedFinalState;
@ -202,6 +205,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.REGISTERED,
new FinalSavingTransition(
@ -221,6 +228,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.REGISTERED,
new FinalSavingTransition(
@ -236,6 +247,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.SCHEDULED,
RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
@ -260,6 +275,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new FinalSavingTransition(
new AMContainerCrashedBeforeRunningTransition(),
RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
// Transitions from LAUNCHED_UNMANAGED_SAVING State
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
@ -278,6 +298,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
@ -291,6 +316,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new FinalSavingTransition(
new KillAllocatedAMTransition(), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
new FinalSavingTransition(
@ -314,6 +343,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new FinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
// Transitions from RUNNING State
.addTransition(RMAppAttemptState.RUNNING,
@ -340,6 +373,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new FinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
// Transitions from FINAL_SAVING State
.addTransition(RMAppAttemptState.FINAL_SAVING,
@ -363,7 +400,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Saving in scheduler
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.KILL))
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL))
// Transitions from FAILED State
// For work-preserving AM restart, failed attempt are still capturing
@ -378,6 +416,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
EnumSet.of(
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
RMAppAttemptEventType.CONTAINER_ALLOCATED))
@ -397,7 +436,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.CONTAINER_ALLOCATED,
// ignore Kill as we have already saved the final Finished state in
// state store.
RMAppAttemptEventType.KILL))
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL))
// Transitions from FINISHED State
.addTransition(
@ -407,7 +447,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.KILL))
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL))
.addTransition(RMAppAttemptState.FINISHED,
RMAppAttemptState.FINISHED,
RMAppAttemptEventType.CONTAINER_FINISHED,
@ -425,6 +466,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL,
RMAppAttemptEventType.STATUS_UPDATE))
.addTransition(RMAppAttemptState.KILLED,
RMAppAttemptState.KILLED,
@ -1175,6 +1217,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
break;
case KILL:
break;
case FAIL:
RMAppAttemptFailedEvent failEvent =
(RMAppAttemptFailedEvent) event;
diags = failEvent.getDiagnostics();
break;
case EXPIRE:
diags = getAMExpiredDiagnostics(event);
break;
@ -1321,6 +1368,22 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
}
private static class AttemptFailedTransition extends BaseFinalTransition {
public AttemptFailedTransition() {
super(RMAppAttemptState.FAILED);
}
@Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
RMAppAttemptFailedEvent failedEvent = (RMAppAttemptFailedEvent) event;
if (failedEvent.getDiagnostics() != null) {
appAttempt.diagnostics.append(failedEvent.getDiagnostics());
}
super.transition(appAttempt, event);
}
}
private static class AMLaunchedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptFailedEvent extends RMAppAttemptEvent {
private final String diagnostics;
public RMAppAttemptFailedEvent(ApplicationAttemptId appAttemptId,
String diagnostics) {
super(appAttemptId, RMAppAttemptEventType.FAIL);
this.diagnostics = diagnostics;
}
public String getDiagnostics() {
return this.diagnostics;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
public class RMAppAttemptRegistrationEvent extends RMAppAttemptEvent {
private final ApplicationAttemptId appAttemptId;
private final String host;
private int rpcport;
private String trackingurl;
@ -32,7 +31,6 @@ public class RMAppAttemptRegistrationEvent extends RMAppAttemptEvent {
public RMAppAttemptRegistrationEvent(ApplicationAttemptId appAttemptId,
String host, int rpcPort, String trackingUrl) {
super(appAttemptId, RMAppAttemptEventType.REGISTERED);
this.appAttemptId = appAttemptId;
this.host = host;
this.rpcport = rpcPort;
this.trackingurl = trackingUrl;

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@ -587,6 +589,14 @@ public class MockRM extends ResourceManager {
return client.forceKillApplication(req);
}
public FailApplicationAttemptResponse failApplicationAttempt(
ApplicationAttemptId attemptId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
FailApplicationAttemptRequest req =
FailApplicationAttemptRequest.newInstance(attemptId);
return client.failApplicationAttempt(req);
}
// from AMLauncher
public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId)
throws Exception {

View File

@ -2170,6 +2170,68 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm2.stop();
}
@Test(timeout = 60000)
public void testRMRestartFailAppAttempt() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
int maxAttempt =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(200);
MockAM am0 = launchAM(app0, rm1, nm1);
ApplicationId applicationId = app0.getApplicationId();
ApplicationAttemptId appAttemptId1 =
app0.getCurrentAppAttempt().getAppAttemptId();
Assert.assertEquals(1, appAttemptId1.getAttemptId());
// fail the 1st app attempt.
rm1.failApplicationAttempt(appAttemptId1);
rm1.waitForState(appAttemptId1, RMAppAttemptState.FAILED);
rm1.waitForState(applicationId, RMAppState.ACCEPTED);
ApplicationAttemptId appAttemptId2 =
app0.getCurrentAppAttempt().getAppAttemptId();
Assert.assertEquals(2, appAttemptId2.getAttemptId());
rm1.waitForState(appAttemptId2, RMAppAttemptState.SCHEDULED);
// restart rm
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(applicationId);
rm2.waitForState(applicationId, RMAppState.ACCEPTED);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
Assert.assertEquals(2, loadedApp0.getAppAttempts().size());
rm2.waitForState(appAttemptId2, RMAppAttemptState.SCHEDULED);
appAttemptId2 = loadedApp0.getCurrentAppAttempt().getAppAttemptId();
Assert.assertEquals(2, appAttemptId2.getAttemptId());
// fail 2nd attempt
rm2.failApplicationAttempt(appAttemptId2);
rm2.waitForState(appAttemptId2, RMAppAttemptState.FAILED);
rm2.waitForState(applicationId, RMAppState.FAILED);
Assert.assertEquals(maxAttempt, loadedApp0.getAppAttempts().size());
}
private <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;

View File

@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@ -135,6 +136,7 @@ public class TestRMAppAttemptTransitions {
LogFactory.getLog(TestRMAppAttemptTransitions.class);
private static final String EMPTY_DIAGNOSTICS = "";
private static final String FAILED_DIAGNOSTICS = "Attempt failed by user.";
private static final String RM_WEBAPP_ADDR =
WebAppUtils.getResolvedRMWebAppURLWithScheme(new Configuration());
@ -1542,6 +1544,78 @@ public class TestRMAppAttemptTransitions {
(RMAppAttemptImpl) applicationAttempt, null);
}
@Test(timeout = 30000)
public void testNewToFailed() {
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS));
assertEquals(YarnApplicationAttemptState.NEW,
applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(null, FAILED_DIAGNOSTICS);
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
}
@Test(timeout = 30000)
public void testSubmittedToFailed() {
submitApplicationAttempt();
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS));
assertEquals(YarnApplicationAttemptState.SUBMITTED,
applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(null, FAILED_DIAGNOSTICS);
}
@Test(timeout = 30000)
public void testScheduledToFailed() {
scheduleApplicationAttempt();
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS));
assertEquals(YarnApplicationAttemptState.SCHEDULED,
applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(null, FAILED_DIAGNOSTICS);
}
@Test(timeout = 30000)
public void testAllocatedToFailedUserTriggeredFailEvent() {
Container amContainer = allocateApplicationAttempt();
assertEquals(YarnApplicationAttemptState.ALLOCATED,
applicationAttempt.createApplicationAttemptState());
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS));
testAppAttemptFailedState(amContainer, FAILED_DIAGNOSTICS);
}
@Test(timeout = 30000)
public void testRunningToFailedUserTriggeredFailEvent() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
applicationAttempt.handle(new RMAppAttemptFailedEvent(applicationAttempt
.getAppAttemptId(), FAILED_DIAGNOSTICS));
assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
amContainer.getId(), ContainerState.COMPLETE, "", 0,
amContainer.getResource()), anyNodeId));
assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl =
pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt
.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyAMHostAndPortInvalidated();
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
}
private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics,
int exitCode, boolean shouldCheckURL) {
assertTrue("Diagnostic information does not point the logs to the users",