YARN-611. Added an API to let apps specify an interval beyond which AM failures should be ignored towards counting max-attempts. Contributed by Xuan Gong.
(cherry picked from commit 14e2639fd0
)
This commit is contained in:
parent
99ccd842d8
commit
5cdb24d4b1
|
@ -34,7 +34,6 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -59,7 +58,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
|
@ -87,6 +85,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
|
|
@ -42,10 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -109,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.mapreduce.v2.hs;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
|
@ -30,12 +31,12 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.test.CoreTestDriver;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
|
|
@ -58,6 +58,12 @@
|
|||
<artifactId>hadoop-mapreduce-client-hs</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-common</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
|
||||
|
|
|
@ -24,14 +24,12 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
|
@ -44,6 +42,7 @@ import org.apache.hadoop.service.Service;
|
|||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
|
@ -50,9 +50,13 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all
|
||||
containers to a preconfigured limit. (Varun Vasudev via vinodkv)
|
||||
|
||||
YARN-2033. YARN-2033. Merging generic-history into the Timeline Store
|
||||
YARN-2033. Merging generic-history into the Timeline Store
|
||||
(Zhijie Shen via junping_du)
|
||||
|
||||
YARN-611. Added an API to let apps specify an interval beyond which AM
|
||||
failures should be ignored towards counting max-attempts. (Xuan Gong via
|
||||
vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
||||
|
|
|
@ -45,6 +45,15 @@ import java.util.Set;
|
|||
* {@link ContainerLaunchContext} of the container in which the
|
||||
* <code>ApplicationMaster</code> is executed.
|
||||
* </li>
|
||||
* <li>maxAppAttempts. The maximum number of application attempts.
|
||||
* It should be no larger than the global number of max attempts in the
|
||||
* Yarn configuration.</li>
|
||||
* <li>attemptFailuresValidityInterval. The default value is -1.
|
||||
* when attemptFailuresValidityInterval in milliseconds is set to > 0,
|
||||
* the failure number will no take failures which happen out of the
|
||||
* validityInterval into failure count. If failure count reaches to
|
||||
* maxAppAttempts, the application will be failed.
|
||||
* </li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
|
@ -103,6 +112,22 @@ public abstract class ApplicationSubmissionContext {
|
|||
resource, null);
|
||||
}
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public static ApplicationSubmissionContext newInstance(
|
||||
ApplicationId applicationId, String applicationName, String queue,
|
||||
Priority priority, ContainerLaunchContext amContainer,
|
||||
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
|
||||
int maxAppAttempts, Resource resource, String applicationType,
|
||||
boolean keepContainers, long attemptFailuresValidityInterval) {
|
||||
ApplicationSubmissionContext context =
|
||||
newInstance(applicationId, applicationName, queue, priority,
|
||||
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
|
||||
resource, applicationType, keepContainers);
|
||||
context.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
|
||||
return context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the <code>ApplicationId</code> of the submitted application.
|
||||
* @return <code>ApplicationId</code> of the submitted application
|
||||
|
@ -338,4 +363,22 @@ public abstract class ApplicationSubmissionContext {
|
|||
@Public
|
||||
@Stable
|
||||
public abstract void setApplicationTags(Set<String> tags);
|
||||
|
||||
/**
|
||||
* Get the attemptFailuresValidityInterval in milliseconds for the application
|
||||
*
|
||||
* @return the attemptFailuresValidityInterval
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract long getAttemptFailuresValidityInterval();
|
||||
|
||||
/**
|
||||
* Set the attemptFailuresValidityInterval in milliseconds for the application
|
||||
* @param attemptFailuresValidityInterval
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setAttemptFailuresValidityInterval(
|
||||
long attemptFailuresValidityInterval);
|
||||
}
|
|
@ -291,6 +291,7 @@ message ApplicationSubmissionContextProto {
|
|||
optional string applicationType = 10 [default = "YARN"];
|
||||
optional bool keep_containers_across_application_attempts = 11 [default = false];
|
||||
repeated string applicationTags = 12;
|
||||
optional int64 attempt_failures_validity_interval = 13 [default = -1];
|
||||
}
|
||||
|
||||
enum ApplicationAccessTypeProto {
|
||||
|
|
|
@ -402,4 +402,17 @@ extends ApplicationSubmissionContext {
|
|||
private ResourceProto convertToProtoFormat(Resource t) {
|
||||
return ((ResourcePBImpl)t).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAttemptFailuresValidityInterval() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getAttemptFailuresValidityInterval();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttemptFailuresValidityInterval(
|
||||
long attemptFailuresValidityInterval) {
|
||||
maybeInitBuilder();
|
||||
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,9 +15,8 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
package org.apache.hadoop.yarn.util;
|
||||
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
||||
public class ControlledClock implements Clock {
|
||||
private long time = -1;
|
|
@ -282,6 +282,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
attemptStateData.getDiagnostics(),
|
||||
attemptStateData.getFinalApplicationStatus(),
|
||||
attemptStateData.getAMContainerExitStatus(),
|
||||
attemptStateData.getFinishTime(),
|
||||
attemptStateData.getMemorySeconds(),
|
||||
attemptStateData.getVcoreSeconds());
|
||||
|
||||
|
|
|
@ -171,6 +171,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
attemptStateData.getDiagnostics(),
|
||||
attemptStateData.getFinalApplicationStatus(),
|
||||
attemptStateData.getAMContainerExitStatus(),
|
||||
attemptStateData.getFinishTime(),
|
||||
attemptStateData.getMemorySeconds(),
|
||||
attemptStateData.getVcoreSeconds());
|
||||
|
||||
|
|
|
@ -261,6 +261,7 @@ public abstract class RMStateStore extends AbstractService {
|
|||
final Container masterContainer;
|
||||
final Credentials appAttemptCredentials;
|
||||
long startTime = 0;
|
||||
long finishTime = 0;
|
||||
// fields set when attempt completes
|
||||
RMAppAttemptState state;
|
||||
String finalTrackingUrl = "N/A";
|
||||
|
@ -274,14 +275,15 @@ public abstract class RMStateStore extends AbstractService {
|
|||
Container masterContainer, Credentials appAttemptCredentials,
|
||||
long startTime, long memorySeconds, long vcoreSeconds) {
|
||||
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
|
||||
null, "", null, ContainerExitStatus.INVALID, memorySeconds, vcoreSeconds);
|
||||
null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds);
|
||||
}
|
||||
|
||||
public ApplicationAttemptState(ApplicationAttemptId attemptId,
|
||||
Container masterContainer, Credentials appAttemptCredentials,
|
||||
long startTime, RMAppAttemptState state, String finalTrackingUrl,
|
||||
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
|
||||
int exitStatus, long memorySeconds, long vcoreSeconds) {
|
||||
int exitStatus, long finishTime, long memorySeconds,
|
||||
long vcoreSeconds) {
|
||||
this.attemptId = attemptId;
|
||||
this.masterContainer = masterContainer;
|
||||
this.appAttemptCredentials = appAttemptCredentials;
|
||||
|
@ -291,6 +293,7 @@ public abstract class RMStateStore extends AbstractService {
|
|||
this.diagnostics = diagnostics == null ? "" : diagnostics;
|
||||
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
|
||||
this.exitStatus = exitStatus;
|
||||
this.finishTime = finishTime;
|
||||
this.memorySeconds = memorySeconds;
|
||||
this.vcoreSeconds = vcoreSeconds;
|
||||
}
|
||||
|
@ -328,6 +331,9 @@ public abstract class RMStateStore extends AbstractService {
|
|||
public long getVcoreSeconds() {
|
||||
return vcoreSeconds;
|
||||
}
|
||||
public long getFinishTime() {
|
||||
return this.finishTime;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -604,9 +604,11 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
attemptStateData.getDiagnostics(),
|
||||
attemptStateData.getFinalApplicationStatus(),
|
||||
attemptStateData.getAMContainerExitStatus(),
|
||||
attemptStateData.getFinishTime(),
|
||||
attemptStateData.getMemorySeconds(),
|
||||
attemptStateData.getVcoreSeconds());
|
||||
|
||||
|
||||
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public abstract class ApplicationAttemptStateData {
|
|||
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
|
||||
String finalTrackingUrl, String diagnostics,
|
||||
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
|
||||
long memorySeconds, long vcoreSeconds) {
|
||||
long finishTime, long memorySeconds, long vcoreSeconds) {
|
||||
ApplicationAttemptStateData attemptStateData =
|
||||
Records.newRecord(ApplicationAttemptStateData.class);
|
||||
attemptStateData.setAttemptId(attemptId);
|
||||
|
@ -56,6 +56,7 @@ public abstract class ApplicationAttemptStateData {
|
|||
attemptStateData.setStartTime(startTime);
|
||||
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
|
||||
attemptStateData.setAMContainerExitStatus(exitStatus);
|
||||
attemptStateData.setFinishTime(finishTime);
|
||||
attemptStateData.setMemorySeconds(memorySeconds);
|
||||
attemptStateData.setVcoreSeconds(vcoreSeconds);
|
||||
return attemptStateData;
|
||||
|
@ -75,7 +76,7 @@ public abstract class ApplicationAttemptStateData {
|
|||
attemptState.getStartTime(), attemptState.getState(),
|
||||
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
|
||||
attemptState.getFinalApplicationStatus(),
|
||||
attemptState.getAMContainerExitStatus(),
|
||||
attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(),
|
||||
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
|
||||
}
|
||||
|
||||
|
@ -162,6 +163,14 @@ public abstract class ApplicationAttemptStateData {
|
|||
|
||||
public abstract void setAMContainerExitStatus(int exitStatus);
|
||||
|
||||
/**
|
||||
* Get the <em>finish time</em> of the application attempt.
|
||||
* @return <em>finish time</em> of the application attempt
|
||||
*/
|
||||
public abstract long getFinishTime();
|
||||
|
||||
public abstract void setFinishTime(long finishTime);
|
||||
|
||||
/**
|
||||
* Get the <em>memory seconds</em> (in MB seconds) of the application.
|
||||
* @return <em>memory seconds</em> (in MB seconds) of the application
|
||||
|
|
|
@ -318,4 +318,16 @@ public class ApplicationAttemptStateDataPBImpl extends
|
|||
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
|
||||
return ProtoUtils.convertFromProtoFormat(s);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFinishTime() {
|
||||
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getFinishTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFinishTime(long finishTime) {
|
||||
maybeInitBuilder();
|
||||
builder.setFinishTime(finishTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,10 +32,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
|
@ -81,8 +81,12 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
|||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
public class RMAppImpl implements RMApp, Recoverable {
|
||||
|
||||
|
@ -110,6 +114,12 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
private final String applicationType;
|
||||
private final Set<String> applicationTags;
|
||||
|
||||
private final long attemptFailuresValidityInterval;
|
||||
|
||||
private Clock systemClock;
|
||||
|
||||
private boolean isNumAttemptsBeyondThreshold = false;
|
||||
|
||||
// Mutable fields
|
||||
private long startTime;
|
||||
private long finishTime = 0;
|
||||
|
@ -328,6 +338,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
ApplicationMasterService masterService, long submitTime,
|
||||
String applicationType, Set<String> applicationTags) {
|
||||
|
||||
this.systemClock = new SystemClock();
|
||||
|
||||
this.applicationId = applicationId;
|
||||
this.name = name;
|
||||
this.rmContext = rmContext;
|
||||
|
@ -340,7 +352,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
this.scheduler = scheduler;
|
||||
this.masterService = masterService;
|
||||
this.submitTime = submitTime;
|
||||
this.startTime = System.currentTimeMillis();
|
||||
this.startTime = this.systemClock.getTime();
|
||||
this.applicationType = applicationType;
|
||||
this.applicationTags = applicationTags;
|
||||
|
||||
|
@ -358,6 +370,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
this.maxAppAttempts = individualMaxAppAttempts;
|
||||
}
|
||||
|
||||
this.attemptFailuresValidityInterval =
|
||||
submissionContext.getAttemptFailuresValidityInterval();
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.readLock = lock.readLock();
|
||||
this.writeLock = lock.writeLock();
|
||||
|
@ -897,7 +912,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
msg = "Unmanaged application " + this.getApplicationId()
|
||||
+ " failed due to " + failedEvent.getDiagnostics()
|
||||
+ ". Failing the application.";
|
||||
} else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
|
||||
} else if (this.isNumAttemptsBeyondThreshold) {
|
||||
msg = "Application " + this.getApplicationId() + " failed "
|
||||
+ this.maxAppAttempts + " times due to "
|
||||
+ failedEvent.getDiagnostics() + ". Failing the application.";
|
||||
|
@ -930,7 +945,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppState stateToBeStored) {
|
||||
rememberTargetTransitions(event, transitionToDo, targetFinalState);
|
||||
this.stateBeforeFinalSaving = getState();
|
||||
this.storedFinishTime = System.currentTimeMillis();
|
||||
this.storedFinishTime = this.systemClock.getTime();
|
||||
|
||||
LOG.info("Updating application " + this.applicationId
|
||||
+ " with final state: " + this.targetedFinalState);
|
||||
|
@ -1097,7 +1112,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
app.finishTime = app.storedFinishTime;
|
||||
if (app.finishTime == 0 ) {
|
||||
app.finishTime = System.currentTimeMillis();
|
||||
app.finishTime = app.systemClock.getTime();
|
||||
}
|
||||
// Recovered apps that are completed were not added to scheduler, so no
|
||||
// need to remove them from scheduler.
|
||||
|
@ -1118,13 +1133,18 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
|
||||
private int getNumFailedAppAttempts() {
|
||||
int completedAttempts = 0;
|
||||
long endTime = this.systemClock.getTime();
|
||||
// Do not count AM preemption, hardware failures or NM resync
|
||||
// as attempt failure.
|
||||
for (RMAppAttempt attempt : attempts.values()) {
|
||||
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
|
||||
if (this.attemptFailuresValidityInterval <= 0
|
||||
|| (attempt.getFinishTime() > endTime
|
||||
- this.attemptFailuresValidityInterval)) {
|
||||
completedAttempts++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return completedAttempts;
|
||||
}
|
||||
|
||||
|
@ -1139,8 +1159,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
|
||||
@Override
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
int numberOfFailure = app.getNumFailedAppAttempts();
|
||||
if (!app.submissionContext.getUnmanagedAM()
|
||||
&& app.getNumFailedAppAttempts() < app.maxAppAttempts) {
|
||||
&& numberOfFailure < app.maxAppAttempts) {
|
||||
boolean transferStateFromPreviousAttempt = false;
|
||||
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
|
||||
transferStateFromPreviousAttempt =
|
||||
|
@ -1158,6 +1179,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
return initialState;
|
||||
} else {
|
||||
if (numberOfFailure >= app.maxAppAttempts) {
|
||||
app.isNumAttemptsBeyondThreshold = true;
|
||||
}
|
||||
app.rememberTargetTransitionsAndStoreState(event,
|
||||
new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
|
||||
RMAppState.FAILED);
|
||||
|
@ -1244,4 +1268,10 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
numNonAMContainerPreempted, numAMContainerPreempted,
|
||||
memorySeconds, vcoreSeconds);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setSystemClock(Clock clock) {
|
||||
this.systemClock = clock;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -213,4 +213,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
|||
* @return metrics
|
||||
*/
|
||||
RMAppAttemptMetrics getRMAppAttemptMetrics();
|
||||
|
||||
/**
|
||||
* the finish time of the application attempt.
|
||||
* @return the finish time of the application attempt.
|
||||
*/
|
||||
long getFinishTime();
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -85,7 +84,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
|
@ -142,6 +140,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
private String originalTrackingUrl = "N/A";
|
||||
private String proxiedTrackingUrl = "N/A";
|
||||
private long startTime = 0;
|
||||
private long finishTime = 0;
|
||||
|
||||
// Set to null initially. Will eventually get set
|
||||
// if an RMAppAttemptUnregistrationEvent occurs
|
||||
|
@ -739,6 +738,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
|
||||
this.finalStatus = attemptState.getFinalApplicationStatus();
|
||||
this.startTime = attemptState.getStartTime();
|
||||
this.finishTime = attemptState.getFinishTime();
|
||||
this.attemptMetrics.updateAggregateAppResourceUsage(
|
||||
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
|
||||
}
|
||||
|
@ -1028,11 +1028,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
AggregateAppResourceUsage resUsage =
|
||||
this.attemptMetrics.getAggregateAppResourceUsage();
|
||||
RMStateStore rmStore = rmContext.getStateStore();
|
||||
setFinishTime(System.currentTimeMillis());
|
||||
ApplicationAttemptState attemptState =
|
||||
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
|
||||
rmStore.getCredentialsFromAppAttempt(this), startTime,
|
||||
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
|
||||
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
|
||||
getFinishTime(), resUsage.getMemorySeconds(),
|
||||
resUsage.getVcoreSeconds());
|
||||
LOG.info("Updating application attempt " + applicationAttemptId
|
||||
+ " with final state: " + targetedFinalState + ", and exit status: "
|
||||
+ exitStatus);
|
||||
|
@ -1747,4 +1749,23 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
// lock
|
||||
return attemptMetrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFinishTime() {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
return this.finishTime;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void setFinishTime(long finishTime) {
|
||||
try {
|
||||
this.writeLock.lock();
|
||||
this.finishTime = finishTime;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ message ApplicationAttemptStateDataProto {
|
|||
optional int32 am_container_exit_status = 9 [default = -1000];
|
||||
optional int64 memory_seconds = 10;
|
||||
optional int64 vcore_seconds = 11;
|
||||
optional int64 finish_time = 12;
|
||||
}
|
||||
|
||||
message EpochProto {
|
||||
|
|
|
@ -278,7 +278,16 @@ public class MockRM extends ResourceManager {
|
|||
boolean waitForAccepted, boolean keepContainers) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||
false, null);
|
||||
false, null, 0);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
|
||||
throws Exception {
|
||||
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
||||
.getShortUserName(), null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
||||
false, null, attemptFailuresValidityInterval);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
|
@ -286,6 +295,17 @@ public class MockRM extends ResourceManager {
|
|||
int maxAppAttempts, Credentials ts, String appType,
|
||||
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
||||
ApplicationId applicationId) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
||||
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
||||
isAppIdProvided, applicationId, 0);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts, Credentials ts, String appType,
|
||||
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
||||
ApplicationId applicationId, long attemptFailuresValidityInterval)
|
||||
throws Exception {
|
||||
ApplicationId appId = isAppIdProvided ? applicationId : null;
|
||||
ApplicationClientProtocol client = getClientRMService();
|
||||
if (! isAppIdProvided) {
|
||||
|
@ -321,6 +341,7 @@ public class MockRM extends ResourceManager {
|
|||
clc.setTokens(securityTokens);
|
||||
}
|
||||
sub.setAMContainerSpec(clc);
|
||||
sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
UserGroupInformation fakeUser =
|
||||
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
|
@ -53,7 +54,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -584,4 +587,128 @@ public class TestAMRestart {
|
|||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
@Test (timeout = 50000)
|
||||
public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
// explicitly set max-am-retry count as 2.
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// set window size to a larger number : 20s
|
||||
// we will verify the app should be failed if
|
||||
// two continuous attempts failed in 20s.
|
||||
RMApp app = rm1.submitApp(200, 20000);
|
||||
|
||||
MockAM am = MockRM.launchAM(app, rm1, nm1);
|
||||
// Fail current attempt normally
|
||||
nm1.nodeHeartbeat(am.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FAILED);
|
||||
// launch the second attempt
|
||||
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||
Assert.assertEquals(2, app.getAppAttempts().size());
|
||||
Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
|
||||
.mayBeLastAttempt());
|
||||
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
||||
am_2.waitForState(RMAppAttemptState.RUNNING);
|
||||
nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am_2.waitForState(RMAppAttemptState.FAILED);
|
||||
// current app should be failed.
|
||||
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
|
||||
|
||||
ControlledClock clock = new ControlledClock(new SystemClock());
|
||||
// set window size to 6s
|
||||
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 6000);;
|
||||
app1.setSystemClock(clock);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// Fail attempt1 normally
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
|
||||
// launch the second attempt
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
Assert.assertEquals(2, app1.getAppAttempts().size());
|
||||
|
||||
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
am2.waitForState(RMAppAttemptState.RUNNING);
|
||||
|
||||
// wait for 6 seconds
|
||||
clock.setTime(System.currentTimeMillis() + 6*1000);
|
||||
// Fail attempt2 normally
|
||||
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
|
||||
1, ContainerState.COMPLETE);
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
|
||||
// can launch the third attempt successfully
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
Assert.assertEquals(3, app1.getAppAttempts().size());
|
||||
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
||||
clock.reset();
|
||||
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
am3.waitForState(RMAppAttemptState.RUNNING);
|
||||
|
||||
// Restart rm.
|
||||
@SuppressWarnings("resource")
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
|
||||
// re-register the NM
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
|
||||
status
|
||||
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
|
||||
status.setContainerId(attempt3.getMasterContainer().getId());
|
||||
status.setContainerState(ContainerState.COMPLETE);
|
||||
status.setDiagnostics("");
|
||||
nm1.registerNode(Collections.singletonList(status), null);
|
||||
|
||||
rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
|
||||
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// Lauch Attempt 4
|
||||
MockAM am4 =
|
||||
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
|
||||
|
||||
// wait for 6 seconds
|
||||
clock.setTime(System.currentTimeMillis() + 6*1000);
|
||||
// Fail attempt4 normally
|
||||
nm1
|
||||
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am4.waitForState(RMAppAttemptState.FAILED);
|
||||
|
||||
// can launch the 5th attempt successfully
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
MockAM am5 =
|
||||
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1);
|
||||
clock.reset();
|
||||
am5.waitForState(RMAppAttemptState.RUNNING);
|
||||
|
||||
// Fail attempt5 normally
|
||||
nm1
|
||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am5.waitForState(RMAppAttemptState.FAILED);
|
||||
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -318,7 +318,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
oldAttemptState.getAppAttemptCredentials(),
|
||||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 100, 0, 0);
|
||||
FinalApplicationStatus.SUCCEEDED, 100,
|
||||
oldAttemptState.getFinishTime(), 0, 0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
|
||||
// test updating the state of an app/attempt whose initial state was not
|
||||
|
@ -341,7 +342,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
oldAttemptState.getAppAttemptCredentials(),
|
||||
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
|
||||
"myTrackingUrl", "attemptDiagnostics",
|
||||
FinalApplicationStatus.SUCCEEDED, 111, 0, 0);
|
||||
FinalApplicationStatus.SUCCEEDED, 111,
|
||||
oldAttemptState.getFinishTime(), 0, 0);
|
||||
store.updateApplicationAttemptState(dummyAttempt);
|
||||
|
||||
// let things settle down
|
||||
|
|
Loading…
Reference in New Issue