YARN-611. Added an API to let apps specify an interval beyond which AM failures should be ignored towards counting max-attempts. Contributed by Xuan Gong.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-09-13 18:04:05 -07:00
parent 98588cf044
commit 14e2639fd0
23 changed files with 331 additions and 28 deletions

View File

@ -34,7 +34,6 @@ import java.util.Iterator;
import java.util.Map;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -59,7 +58,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -87,6 +85,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

View File

@ -42,10 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -110,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Before;

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.mapreduce.v2.hs;
import java.io.File;
import java.io.FileOutputStream;
import java.util.UUID;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -30,12 +31,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.test.CoreTestDriver;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.AfterClass;

View File

@ -58,6 +58,12 @@
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>

View File

@ -24,14 +24,12 @@ import java.util.Map;
import java.util.Random;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -44,6 +42,7 @@ import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;

View File

@ -75,9 +75,13 @@ Release 2.6.0 - UNRELEASED
YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all
containers to a preconfigured limit. (Varun Vasudev via vinodkv)
YARN-2033. YARN-2033. Merging generic-history into the Timeline Store
YARN-2033. Merging generic-history into the Timeline Store
(Zhijie Shen via junping_du)
YARN-611. Added an API to let apps specify an interval beyond which AM
failures should be ignored towards counting max-attempts. (Xuan Gong via
vinodkv)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -45,6 +45,15 @@ import java.util.Set;
* {@link ContainerLaunchContext} of the container in which the
* <code>ApplicationMaster</code> is executed.
* </li>
* <li>maxAppAttempts. The maximum number of application attempts.
* It should be no larger than the global number of max attempts in the
* Yarn configuration.</li>
* <li>attemptFailuresValidityInterval. The default value is -1.
* when attemptFailuresValidityInterval in milliseconds is set to > 0,
* the failure number will no take failures which happen out of the
* validityInterval into failure count. If failure count reaches to
* maxAppAttempts, the application will be failed.
* </li>
* </ul>
* </p>
*
@ -103,6 +112,22 @@ public abstract class ApplicationSubmissionContext {
resource, null);
}
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
boolean keepContainers, long attemptFailuresValidityInterval) {
ApplicationSubmissionContext context =
newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, applicationType, keepContainers);
context.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
return context;
}
/**
* Get the <code>ApplicationId</code> of the submitted application.
* @return <code>ApplicationId</code> of the submitted application
@ -338,4 +363,22 @@ public abstract class ApplicationSubmissionContext {
@Public
@Stable
public abstract void setApplicationTags(Set<String> tags);
/**
* Get the attemptFailuresValidityInterval in milliseconds for the application
*
* @return the attemptFailuresValidityInterval
*/
@Public
@Stable
public abstract long getAttemptFailuresValidityInterval();
/**
* Set the attemptFailuresValidityInterval in milliseconds for the application
* @param attemptFailuresValidityInterval
*/
@Public
@Stable
public abstract void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval);
}

View File

@ -291,6 +291,7 @@ message ApplicationSubmissionContextProto {
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1];
}
enum ApplicationAccessTypeProto {

View File

@ -402,4 +402,17 @@ extends ApplicationSubmissionContext {
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
@Override
public long getAttemptFailuresValidityInterval() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
return p.getAttemptFailuresValidityInterval();
}
@Override
public void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval) {
maybeInitBuilder();
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
}

View File

@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
package org.apache.hadoop.yarn.util;
import org.apache.hadoop.yarn.util.Clock;
public class ControlledClock implements Clock {
private long time = -1;

View File

@ -282,6 +282,7 @@ public class FileSystemRMStateStore extends RMStateStore {
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());

View File

@ -171,6 +171,7 @@ public class MemoryRMStateStore extends RMStateStore {
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());

View File

@ -261,6 +261,7 @@ public abstract class RMStateStore extends AbstractService {
final Container masterContainer;
final Credentials appAttemptCredentials;
long startTime = 0;
long finishTime = 0;
// fields set when attempt completes
RMAppAttemptState state;
String finalTrackingUrl = "N/A";
@ -274,14 +275,15 @@ public abstract class RMStateStore extends AbstractService {
Container masterContainer, Credentials appAttemptCredentials,
long startTime, long memorySeconds, long vcoreSeconds) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
null, "", null, ContainerExitStatus.INVALID, memorySeconds, vcoreSeconds);
null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl,
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
int exitStatus, long memorySeconds, long vcoreSeconds) {
int exitStatus, long finishTime, long memorySeconds,
long vcoreSeconds) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
@ -291,6 +293,7 @@ public abstract class RMStateStore extends AbstractService {
this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
this.exitStatus = exitStatus;
this.finishTime = finishTime;
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
}
@ -328,6 +331,9 @@ public abstract class RMStateStore extends AbstractService {
public long getVcoreSeconds() {
return vcoreSeconds;
}
public long getFinishTime() {
return this.finishTime;
}
}
/**

View File

@ -604,9 +604,11 @@ public class ZKRMStateStore extends RMStateStore {
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}

View File

@ -44,7 +44,7 @@ public abstract class ApplicationAttemptStateData {
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long memorySeconds, long vcoreSeconds) {
long finishTime, long memorySeconds, long vcoreSeconds) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
@ -56,6 +56,7 @@ public abstract class ApplicationAttemptStateData {
attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
attemptStateData.setFinishTime(finishTime);
attemptStateData.setMemorySeconds(memorySeconds);
attemptStateData.setVcoreSeconds(vcoreSeconds);
return attemptStateData;
@ -75,7 +76,7 @@ public abstract class ApplicationAttemptStateData {
attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus(),
attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(),
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
}
@ -163,7 +164,15 @@ public abstract class ApplicationAttemptStateData {
public abstract void setAMContainerExitStatus(int exitStatus);
/**
* Get the <em>memory seconds</em> (in MB seconds) of the application.
* Get the <em>finish time</em> of the application attempt.
* @return <em>finish time</em> of the application attempt
*/
public abstract long getFinishTime();
public abstract void setFinishTime(long finishTime);
/**
* Get the <em>memory seconds</em> (in MB seconds) of the application.
* @return <em>memory seconds</em> (in MB seconds) of the application
*/
@Public

View File

@ -318,4 +318,16 @@ public class ApplicationAttemptStateDataPBImpl extends
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
return ProtoUtils.convertFromProtoFormat(s);
}
@Override
public long getFinishTime() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getFinishTime();
}
@Override
public void setFinishTime(long finishTime) {
maybeInitBuilder();
builder.setFinishTime(finishTime);
}
}

View File

@ -32,10 +32,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -81,8 +81,12 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable {
@ -110,6 +114,12 @@ public class RMAppImpl implements RMApp, Recoverable {
private final String applicationType;
private final Set<String> applicationTags;
private final long attemptFailuresValidityInterval;
private Clock systemClock;
private boolean isNumAttemptsBeyondThreshold = false;
// Mutable fields
private long startTime;
private long finishTime = 0;
@ -328,6 +338,8 @@ public class RMAppImpl implements RMApp, Recoverable {
ApplicationMasterService masterService, long submitTime,
String applicationType, Set<String> applicationTags) {
this.systemClock = new SystemClock();
this.applicationId = applicationId;
this.name = name;
this.rmContext = rmContext;
@ -340,7 +352,7 @@ public class RMAppImpl implements RMApp, Recoverable {
this.scheduler = scheduler;
this.masterService = masterService;
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.startTime = this.systemClock.getTime();
this.applicationType = applicationType;
this.applicationTags = applicationTags;
@ -358,6 +370,9 @@ public class RMAppImpl implements RMApp, Recoverable {
this.maxAppAttempts = individualMaxAppAttempts;
}
this.attemptFailuresValidityInterval =
submissionContext.getAttemptFailuresValidityInterval();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@ -897,7 +912,7 @@ public class RMAppImpl implements RMApp, Recoverable {
msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
} else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
} else if (this.isNumAttemptsBeyondThreshold) {
msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application.";
@ -930,7 +945,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppState stateToBeStored) {
rememberTargetTransitions(event, transitionToDo, targetFinalState);
this.stateBeforeFinalSaving = getState();
this.storedFinishTime = System.currentTimeMillis();
this.storedFinishTime = this.systemClock.getTime();
LOG.info("Updating application " + this.applicationId
+ " with final state: " + this.targetedFinalState);
@ -1097,7 +1112,7 @@ public class RMAppImpl implements RMApp, Recoverable {
}
app.finishTime = app.storedFinishTime;
if (app.finishTime == 0 ) {
app.finishTime = System.currentTimeMillis();
app.finishTime = app.systemClock.getTime();
}
// Recovered apps that are completed were not added to scheduler, so no
// need to remove them from scheduler.
@ -1118,11 +1133,16 @@ public class RMAppImpl implements RMApp, Recoverable {
private int getNumFailedAppAttempts() {
int completedAttempts = 0;
long endTime = this.systemClock.getTime();
// Do not count AM preemption, hardware failures or NM resync
// as attempt failure.
for (RMAppAttempt attempt : attempts.values()) {
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
completedAttempts++;
if (this.attemptFailuresValidityInterval <= 0
|| (attempt.getFinishTime() > endTime
- this.attemptFailuresValidityInterval)) {
completedAttempts++;
}
}
}
return completedAttempts;
@ -1139,8 +1159,9 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
int numberOfFailure = app.getNumFailedAppAttempts();
if (!app.submissionContext.getUnmanagedAM()
&& app.getNumFailedAppAttempts() < app.maxAppAttempts) {
&& numberOfFailure < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
@ -1158,6 +1179,9 @@ public class RMAppImpl implements RMApp, Recoverable {
}
return initialState;
} else {
if (numberOfFailure >= app.maxAppAttempts) {
app.isNumAttemptsBeyondThreshold = true;
}
app.rememberTargetTransitionsAndStoreState(event,
new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
RMAppState.FAILED);
@ -1244,4 +1268,10 @@ public class RMAppImpl implements RMApp, Recoverable {
numNonAMContainerPreempted, numAMContainerPreempted,
memorySeconds, vcoreSeconds);
}
@Private
@VisibleForTesting
public void setSystemClock(Clock clock) {
this.systemClock = clock;
}
}

View File

@ -213,4 +213,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
* @return metrics
*/
RMAppAttemptMetrics getRMAppAttemptMetrics();
/**
* the finish time of the application attempt.
* @return the finish time of the application attempt.
*/
long getFinishTime();
}

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -85,7 +84,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@ -142,6 +140,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
private long startTime = 0;
private long finishTime = 0;
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
@ -739,6 +738,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
this.finishTime = attemptState.getFinishTime();
this.attemptMetrics.updateAggregateAppResourceUsage(
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
}
@ -1028,11 +1028,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore();
setFinishTime(System.currentTimeMillis());
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
getFinishTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
@ -1747,4 +1749,23 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// lock
return attemptMetrics;
}
@Override
public long getFinishTime() {
try {
this.readLock.lock();
return this.finishTime;
} finally {
this.readLock.unlock();
}
}
private void setFinishTime(long finishTime) {
try {
this.writeLock.lock();
this.finishTime = finishTime;
} finally {
this.writeLock.unlock();
}
}
}

View File

@ -80,6 +80,7 @@ message ApplicationAttemptStateDataProto {
optional int32 am_container_exit_status = 9 [default = -1000];
optional int64 memory_seconds = 10;
optional int64 vcore_seconds = 11;
optional int64 finish_time = 12;
}
message EpochProto {

View File

@ -278,7 +278,16 @@ public class MockRM extends ResourceManager {
boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null);
false, null, 0);
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, attemptFailuresValidityInterval);
}
public RMApp submitApp(int masterMemory, String name, String user,
@ -286,6 +295,17 @@ public class MockRM extends ResourceManager {
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, 0);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@ -321,6 +341,7 @@ public class MockRM extends ResourceManager {
clc.setTokens(securityTokens);
}
sub.setAMContainerSpec(clc);
sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@ -53,7 +54,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
import org.junit.Test;
@ -584,4 +587,128 @@ public class TestAMRestart {
rm1.stop();
rm2.stop();
}
@Test (timeout = 50000)
public void testRMAppAttemptFailuresValidityInterval() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
// set window size to a larger number : 20s
// we will verify the app should be failed if
// two continuous attempts failed in 20s.
RMApp app = rm1.submitApp(200, 20000);
MockAM am = MockRM.launchAM(app, rm1, nm1);
// Fail current attempt normally
nm1.nodeHeartbeat(am.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FAILED);
// launch the second attempt
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, app.getAppAttempts().size());
Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
.mayBeLastAttempt());
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
am_2.waitForState(RMAppAttemptState.RUNNING);
nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am_2.waitForState(RMAppAttemptState.FAILED);
// current app should be failed.
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
ControlledClock clock = new ControlledClock(new SystemClock());
// set window size to 6s
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 6000);;
app1.setSystemClock(clock);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Fail attempt1 normally
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FAILED);
// launch the second attempt
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, app1.getAppAttempts().size());
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am2.waitForState(RMAppAttemptState.RUNNING);
// wait for 6 seconds
clock.setTime(System.currentTimeMillis() + 6*1000);
// Fail attempt2 normally
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
// can launch the third attempt successfully
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(3, app1.getAppAttempts().size());
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
clock.reset();
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am3.waitForState(RMAppAttemptState.RUNNING);
// Restart rm.
@SuppressWarnings("resource")
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// re-register the NM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
status
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
status.setContainerId(attempt3.getMasterContainer().getId());
status.setContainerState(ContainerState.COMPLETE);
status.setDiagnostics("");
nm1.registerNode(Collections.singletonList(status), null);
rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// Lauch Attempt 4
MockAM am4 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
// wait for 6 seconds
clock.setTime(System.currentTimeMillis() + 6*1000);
// Fail attempt4 normally
nm1
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am4.waitForState(RMAppAttemptState.FAILED);
// can launch the 5th attempt successfully
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am5 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1);
clock.reset();
am5.waitForState(RMAppAttemptState.RUNNING);
// Fail attempt5 normally
nm1
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am5.waitForState(RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
rm1.stop();
rm2.stop();
}
}

View File

@ -318,7 +318,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100, 0, 0);
FinalApplicationStatus.SUCCEEDED, 100,
oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
@ -341,7 +342,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111, 0, 0);
FinalApplicationStatus.SUCCEEDED, 111,
oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down