Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-14 15:12:47 -07:00
commit fdf0542d8e
25 changed files with 392 additions and 59 deletions

View File

@ -34,7 +34,6 @@
import java.util.Map; import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -59,7 +58,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -87,6 +85,7 @@
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;

View File

@ -42,10 +42,8 @@
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -110,6 +108,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;

View File

@ -22,6 +22,7 @@
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.util.UUID; import java.util.UUID;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -30,12 +31,12 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.test.CoreTestDriver; import org.apache.hadoop.test.CoreTestDriver;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;

View File

@ -58,6 +58,12 @@
<artifactId>hadoop-mapreduce-client-hs</artifactId> <artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId> <artifactId>hadoop-yarn-server-nodemanager</artifactId>

View File

@ -24,14 +24,12 @@
import java.util.Random; import java.util.Random;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -44,6 +42,7 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test; import org.junit.Test;

View File

@ -75,9 +75,13 @@ Release 2.6.0 - UNRELEASED
YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all
containers to a preconfigured limit. (Varun Vasudev via vinodkv) containers to a preconfigured limit. (Varun Vasudev via vinodkv)
YARN-2033. YARN-2033. Merging generic-history into the Timeline Store YARN-2033. Merging generic-history into the Timeline Store
(Zhijie Shen via junping_du) (Zhijie Shen via junping_du)
YARN-611. Added an API to let apps specify an interval beyond which AM
failures should be ignored towards counting max-attempts. (Xuan Gong via
vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
@ -351,6 +355,10 @@ Release 2.6.0 - UNRELEASED
YARN-2542. Fixed NPE when retrieving ApplicationReport from TimeLineServer. YARN-2542. Fixed NPE when retrieving ApplicationReport from TimeLineServer.
(Zhijie Shen via jianhe) (Zhijie Shen via jianhe)
YARN-2528. Relaxed http response split vulnerability protection for the origins
header and made it accept multiple origins in CrossOriginFilter. (Jonathan
Eagles via zjshen)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -45,6 +45,15 @@
* {@link ContainerLaunchContext} of the container in which the * {@link ContainerLaunchContext} of the container in which the
* <code>ApplicationMaster</code> is executed. * <code>ApplicationMaster</code> is executed.
* </li> * </li>
* <li>maxAppAttempts. The maximum number of application attempts.
* It should be no larger than the global number of max attempts in the
* Yarn configuration.</li>
* <li>attemptFailuresValidityInterval. The default value is -1.
* when attemptFailuresValidityInterval in milliseconds is set to > 0,
* the failure number will no take failures which happen out of the
* validityInterval into failure count. If failure count reaches to
* maxAppAttempts, the application will be failed.
* </li>
* </ul> * </ul>
* </p> * </p>
* *
@ -103,6 +112,22 @@ public static ApplicationSubmissionContext newInstance(
resource, null); resource, null);
} }
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
boolean keepContainers, long attemptFailuresValidityInterval) {
ApplicationSubmissionContext context =
newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, applicationType, keepContainers);
context.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
return context;
}
/** /**
* Get the <code>ApplicationId</code> of the submitted application. * Get the <code>ApplicationId</code> of the submitted application.
* @return <code>ApplicationId</code> of the submitted application * @return <code>ApplicationId</code> of the submitted application
@ -338,4 +363,22 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Public @Public
@Stable @Stable
public abstract void setApplicationTags(Set<String> tags); public abstract void setApplicationTags(Set<String> tags);
/**
* Get the attemptFailuresValidityInterval in milliseconds for the application
*
* @return the attemptFailuresValidityInterval
*/
@Public
@Stable
public abstract long getAttemptFailuresValidityInterval();
/**
* Set the attemptFailuresValidityInterval in milliseconds for the application
* @param attemptFailuresValidityInterval
*/
@Public
@Stable
public abstract void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval);
} }

View File

@ -291,6 +291,7 @@ message ApplicationSubmissionContextProto {
optional string applicationType = 10 [default = "YARN"]; optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false]; optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12; repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1];
} }
enum ApplicationAccessTypeProto { enum ApplicationAccessTypeProto {

View File

@ -402,4 +402,17 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
private ResourceProto convertToProtoFormat(Resource t) { private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto(); return ((ResourcePBImpl)t).getProto();
} }
@Override
public long getAttemptFailuresValidityInterval() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
return p.getAttemptFailuresValidityInterval();
}
@Override
public void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval) {
maybeInitBuilder();
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
} }

View File

@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.yarn.util;
import org.apache.hadoop.yarn.util.Clock;
public class ControlledClock implements Clock { public class ControlledClock implements Clock {
private long time = -1; private long time = -1;

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.server.timeline.webapp; package org.apache.hadoop.yarn.server.timeline.webapp;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -106,12 +104,12 @@ public void destroy() {
private void doCrossFilter(HttpServletRequest req, HttpServletResponse res) { private void doCrossFilter(HttpServletRequest req, HttpServletResponse res) {
String origin = encodeHeader(req.getHeader(ORIGIN)); String originsList = encodeHeader(req.getHeader(ORIGIN));
if (!isCrossOrigin(origin)) { if (!isCrossOrigin(originsList)) {
return; return;
} }
if (!isOriginAllowed(origin)) { if (!areOriginsAllowed(originsList)) {
return; return;
} }
@ -127,7 +125,7 @@ private void doCrossFilter(HttpServletRequest req, HttpServletResponse res) {
return; return;
} }
res.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, origin); res.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, originsList);
res.setHeader(ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE.toString()); res.setHeader(ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE.toString());
res.setHeader(ACCESS_CONTROL_ALLOW_METHODS, getAllowedMethodsHeader()); res.setHeader(ACCESS_CONTROL_ALLOW_METHODS, getAllowedMethodsHeader());
res.setHeader(ACCESS_CONTROL_ALLOW_HEADERS, getAllowedHeadersHeader()); res.setHeader(ACCESS_CONTROL_ALLOW_HEADERS, getAllowedHeadersHeader());
@ -191,35 +189,36 @@ static String encodeHeader(final String header) {
if (header == null) { if (header == null) {
return null; return null;
} }
try { // Protect against HTTP response splitting vulnerability
// Protect against HTTP response splitting vulnerability // since value is written as part of the response header
// since value is written as part of the response header // Ensure this header only has one header by removing
return URLEncoder.encode(header, "ASCII"); // CRs and LFs
} catch (UnsupportedEncodingException e) { return header.split("\n|\r")[0].trim();
return null;
}
} }
static boolean isCrossOrigin(String origin) { static boolean isCrossOrigin(String originsList) {
return origin != null; return originsList != null;
} }
@VisibleForTesting @VisibleForTesting
boolean isOriginAllowed(String origin) { boolean areOriginsAllowed(String originsList) {
if (allowAllOrigins) { if (allowAllOrigins) {
return true; return true;
} }
for (String allowedOrigin : allowedOrigins) { String[] origins = originsList.trim().split("\\s+");
if (allowedOrigin.contains("*")) { for (String origin : origins) {
String regex = allowedOrigin.replace(".", "\\.").replace("*", ".*"); for (String allowedOrigin : allowedOrigins) {
Pattern p = Pattern.compile(regex); if (allowedOrigin.contains("*")) {
Matcher m = p.matcher(origin); String regex = allowedOrigin.replace(".", "\\.").replace("*", ".*");
if (m.matches()) { Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(origin);
if (m.matches()) {
return true;
}
} else if (allowedOrigin.equals(origin)) {
return true; return true;
} }
} else if (allowedOrigin.equals(origin)) {
return true;
} }
} }
return false; return false;

View File

@ -77,7 +77,27 @@ public void testAllowAllOrigins() throws ServletException, IOException {
// Object under test // Object under test
CrossOriginFilter filter = new CrossOriginFilter(); CrossOriginFilter filter = new CrossOriginFilter();
filter.init(filterConfig); filter.init(filterConfig);
Assert.assertTrue(filter.isOriginAllowed("example.com")); Assert.assertTrue(filter.areOriginsAllowed("example.com"));
}
@Test
public void testEncodeHeaders() {
String validOrigin = "http://localhost:12345";
String encodedValidOrigin = CrossOriginFilter.encodeHeader(validOrigin);
Assert.assertEquals("Valid origin encoding should match exactly",
validOrigin, encodedValidOrigin);
String httpResponseSplitOrigin = validOrigin + " \nSecondHeader: value";
String encodedResponseSplitOrigin =
CrossOriginFilter.encodeHeader(httpResponseSplitOrigin);
Assert.assertEquals("Http response split origin should be protected against",
validOrigin, encodedResponseSplitOrigin);
// Test Origin List
String validOriginList = "http://foo.example.com:12345 http://bar.example.com:12345";
String encodedValidOriginList = CrossOriginFilter.encodeHeader(validOriginList);
Assert.assertEquals("Valid origin list encoding should match exactly",
validOriginList, encodedValidOriginList);
} }
@Test @Test
@ -93,10 +113,17 @@ public void testPatternMatchingOrigins() throws ServletException, IOException {
filter.init(filterConfig); filter.init(filterConfig);
// match multiple sub-domains // match multiple sub-domains
Assert.assertFalse(filter.isOriginAllowed("example.com")); Assert.assertFalse(filter.areOriginsAllowed("example.com"));
Assert.assertFalse(filter.isOriginAllowed("foo:example.com")); Assert.assertFalse(filter.areOriginsAllowed("foo:example.com"));
Assert.assertTrue(filter.isOriginAllowed("foo.example.com")); Assert.assertTrue(filter.areOriginsAllowed("foo.example.com"));
Assert.assertTrue(filter.isOriginAllowed("foo.bar.example.com")); Assert.assertTrue(filter.areOriginsAllowed("foo.bar.example.com"));
// First origin is allowed
Assert.assertTrue(filter.areOriginsAllowed("foo.example.com foo.nomatch.com"));
// Second origin is allowed
Assert.assertTrue(filter.areOriginsAllowed("foo.nomatch.com foo.example.com"));
// No origin in list is allowed
Assert.assertFalse(filter.areOriginsAllowed("foo.nomatch1.com foo.nomatch2.com"));
} }
@Test @Test
@ -238,7 +265,7 @@ public void testCrossOriginFilterAfterRestart() throws ServletException {
Assert.assertTrue("Allowed methods do not match", Assert.assertTrue("Allowed methods do not match",
filter.getAllowedMethodsHeader() filter.getAllowedMethodsHeader()
.compareTo("GET,POST") == 0); .compareTo("GET,POST") == 0);
Assert.assertTrue(filter.isOriginAllowed("example.com")); Assert.assertTrue(filter.areOriginsAllowed("example.com"));
//destroy filter values and clear conf //destroy filter values and clear conf
filter.destroy(); filter.destroy();
@ -260,7 +287,7 @@ public void testCrossOriginFilterAfterRestart() throws ServletException {
Assert.assertTrue("Allowed methods do not match", Assert.assertTrue("Allowed methods do not match",
filter.getAllowedMethodsHeader() filter.getAllowedMethodsHeader()
.compareTo("GET,HEAD") == 0); .compareTo("GET,HEAD") == 0);
Assert.assertTrue(filter.isOriginAllowed("newexample.com")); Assert.assertTrue(filter.areOriginsAllowed("newexample.com"));
//destroy filter values //destroy filter values
filter.destroy(); filter.destroy();

View File

@ -282,6 +282,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(), attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(), attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(), attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds()); attemptStateData.getVcoreSeconds());

View File

@ -171,6 +171,7 @@ public synchronized void updateApplicationAttemptStateInternal(
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(), attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(), attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(), attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds()); attemptStateData.getVcoreSeconds());

View File

@ -261,6 +261,7 @@ public static class ApplicationAttemptState {
final Container masterContainer; final Container masterContainer;
final Credentials appAttemptCredentials; final Credentials appAttemptCredentials;
long startTime = 0; long startTime = 0;
long finishTime = 0;
// fields set when attempt completes // fields set when attempt completes
RMAppAttemptState state; RMAppAttemptState state;
String finalTrackingUrl = "N/A"; String finalTrackingUrl = "N/A";
@ -274,14 +275,15 @@ public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials, Container masterContainer, Credentials appAttemptCredentials,
long startTime, long memorySeconds, long vcoreSeconds) { long startTime, long memorySeconds, long vcoreSeconds) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null, this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
null, "", null, ContainerExitStatus.INVALID, memorySeconds, vcoreSeconds); null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds);
} }
public ApplicationAttemptState(ApplicationAttemptId attemptId, public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials, Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl, long startTime, RMAppAttemptState state, String finalTrackingUrl,
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
int exitStatus, long memorySeconds, long vcoreSeconds) { int exitStatus, long finishTime, long memorySeconds,
long vcoreSeconds) {
this.attemptId = attemptId; this.attemptId = attemptId;
this.masterContainer = masterContainer; this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials; this.appAttemptCredentials = appAttemptCredentials;
@ -291,6 +293,7 @@ public ApplicationAttemptState(ApplicationAttemptId attemptId,
this.diagnostics = diagnostics == null ? "" : diagnostics; this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
this.exitStatus = exitStatus; this.exitStatus = exitStatus;
this.finishTime = finishTime;
this.memorySeconds = memorySeconds; this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds; this.vcoreSeconds = vcoreSeconds;
} }
@ -328,6 +331,9 @@ public long getMemorySeconds() {
public long getVcoreSeconds() { public long getVcoreSeconds() {
return vcoreSeconds; return vcoreSeconds;
} }
public long getFinishTime() {
return this.finishTime;
}
} }
/** /**

View File

@ -604,9 +604,11 @@ private void loadApplicationAttemptState(ApplicationState appState,
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(), attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(), attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(), attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds()); attemptStateData.getVcoreSeconds());
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }
} }

View File

@ -44,7 +44,7 @@ public static ApplicationAttemptStateData newInstance(
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics, String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus, FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long memorySeconds, long vcoreSeconds) { long finishTime, long memorySeconds, long vcoreSeconds) {
ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class); Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId); attemptStateData.setAttemptId(attemptId);
@ -56,6 +56,7 @@ public static ApplicationAttemptStateData newInstance(
attemptStateData.setStartTime(startTime); attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus); attemptStateData.setAMContainerExitStatus(exitStatus);
attemptStateData.setFinishTime(finishTime);
attemptStateData.setMemorySeconds(memorySeconds); attemptStateData.setMemorySeconds(memorySeconds);
attemptStateData.setVcoreSeconds(vcoreSeconds); attemptStateData.setVcoreSeconds(vcoreSeconds);
return attemptStateData; return attemptStateData;
@ -75,7 +76,7 @@ public static ApplicationAttemptStateData newInstance(
attemptState.getStartTime(), attemptState.getState(), attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus(), attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus(), attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(),
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds()); attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
} }
@ -163,7 +164,15 @@ public abstract void setFinalApplicationStatus(
public abstract void setAMContainerExitStatus(int exitStatus); public abstract void setAMContainerExitStatus(int exitStatus);
/** /**
* Get the <em>memory seconds</em> (in MB seconds) of the application. * Get the <em>finish time</em> of the application attempt.
* @return <em>finish time</em> of the application attempt
*/
public abstract long getFinishTime();
public abstract void setFinishTime(long finishTime);
/**
* Get the <em>memory seconds</em> (in MB seconds) of the application.
* @return <em>memory seconds</em> (in MB seconds) of the application * @return <em>memory seconds</em> (in MB seconds) of the application
*/ */
@Public @Public

View File

@ -318,4 +318,16 @@ private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
return ProtoUtils.convertFromProtoFormat(s); return ProtoUtils.convertFromProtoFormat(s);
} }
@Override
public long getFinishTime() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
return p.getFinishTime();
}
@Override
public void setFinishTime(long finishTime) {
maybeInitBuilder();
builder.setFinishTime(finishTime);
}
} }

View File

@ -32,10 +32,10 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -81,8 +81,12 @@
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable { public class RMAppImpl implements RMApp, Recoverable {
@ -110,6 +114,12 @@ public class RMAppImpl implements RMApp, Recoverable {
private final String applicationType; private final String applicationType;
private final Set<String> applicationTags; private final Set<String> applicationTags;
private final long attemptFailuresValidityInterval;
private Clock systemClock;
private boolean isNumAttemptsBeyondThreshold = false;
// Mutable fields // Mutable fields
private long startTime; private long startTime;
private long finishTime = 0; private long finishTime = 0;
@ -328,6 +338,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
ApplicationMasterService masterService, long submitTime, ApplicationMasterService masterService, long submitTime,
String applicationType, Set<String> applicationTags) { String applicationType, Set<String> applicationTags) {
this.systemClock = new SystemClock();
this.applicationId = applicationId; this.applicationId = applicationId;
this.name = name; this.name = name;
this.rmContext = rmContext; this.rmContext = rmContext;
@ -340,7 +352,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.scheduler = scheduler; this.scheduler = scheduler;
this.masterService = masterService; this.masterService = masterService;
this.submitTime = submitTime; this.submitTime = submitTime;
this.startTime = System.currentTimeMillis(); this.startTime = this.systemClock.getTime();
this.applicationType = applicationType; this.applicationType = applicationType;
this.applicationTags = applicationTags; this.applicationTags = applicationTags;
@ -358,6 +370,9 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.maxAppAttempts = individualMaxAppAttempts; this.maxAppAttempts = individualMaxAppAttempts;
} }
this.attemptFailuresValidityInterval =
submissionContext.getAttemptFailuresValidityInterval();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock(); this.readLock = lock.readLock();
this.writeLock = lock.writeLock(); this.writeLock = lock.writeLock();
@ -897,7 +912,7 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
msg = "Unmanaged application " + this.getApplicationId() msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics() + " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application."; + ". Failing the application.";
} else if (getNumFailedAppAttempts() >= this.maxAppAttempts) { } else if (this.isNumAttemptsBeyondThreshold) {
msg = "Application " + this.getApplicationId() + " failed " msg = "Application " + this.getApplicationId() + " failed "
+ this.maxAppAttempts + " times due to " + this.maxAppAttempts + " times due to "
+ failedEvent.getDiagnostics() + ". Failing the application."; + failedEvent.getDiagnostics() + ". Failing the application.";
@ -930,7 +945,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
RMAppState stateToBeStored) { RMAppState stateToBeStored) {
rememberTargetTransitions(event, transitionToDo, targetFinalState); rememberTargetTransitions(event, transitionToDo, targetFinalState);
this.stateBeforeFinalSaving = getState(); this.stateBeforeFinalSaving = getState();
this.storedFinishTime = System.currentTimeMillis(); this.storedFinishTime = this.systemClock.getTime();
LOG.info("Updating application " + this.applicationId LOG.info("Updating application " + this.applicationId
+ " with final state: " + this.targetedFinalState); + " with final state: " + this.targetedFinalState);
@ -1097,7 +1112,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
} }
app.finishTime = app.storedFinishTime; app.finishTime = app.storedFinishTime;
if (app.finishTime == 0 ) { if (app.finishTime == 0 ) {
app.finishTime = System.currentTimeMillis(); app.finishTime = app.systemClock.getTime();
} }
// Recovered apps that are completed were not added to scheduler, so no // Recovered apps that are completed were not added to scheduler, so no
// need to remove them from scheduler. // need to remove them from scheduler.
@ -1118,11 +1133,16 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private int getNumFailedAppAttempts() { private int getNumFailedAppAttempts() {
int completedAttempts = 0; int completedAttempts = 0;
long endTime = this.systemClock.getTime();
// Do not count AM preemption, hardware failures or NM resync // Do not count AM preemption, hardware failures or NM resync
// as attempt failure. // as attempt failure.
for (RMAppAttempt attempt : attempts.values()) { for (RMAppAttempt attempt : attempts.values()) {
if (attempt.shouldCountTowardsMaxAttemptRetry()) { if (attempt.shouldCountTowardsMaxAttemptRetry()) {
completedAttempts++; if (this.attemptFailuresValidityInterval <= 0
|| (attempt.getFinishTime() > endTime
- this.attemptFailuresValidityInterval)) {
completedAttempts++;
}
} }
} }
return completedAttempts; return completedAttempts;
@ -1139,8 +1159,9 @@ public AttemptFailedTransition(RMAppState initialState) {
@Override @Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) {
int numberOfFailure = app.getNumFailedAppAttempts();
if (!app.submissionContext.getUnmanagedAM() if (!app.submissionContext.getUnmanagedAM()
&& app.getNumFailedAppAttempts() < app.maxAppAttempts) { && numberOfFailure < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false; boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt = transferStateFromPreviousAttempt =
@ -1158,6 +1179,9 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
} }
return initialState; return initialState;
} else { } else {
if (numberOfFailure >= app.maxAppAttempts) {
app.isNumAttemptsBeyondThreshold = true;
}
app.rememberTargetTransitionsAndStoreState(event, app.rememberTargetTransitionsAndStoreState(event,
new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
RMAppState.FAILED); RMAppState.FAILED);
@ -1244,4 +1268,10 @@ public RMAppMetrics getRMAppMetrics() {
numNonAMContainerPreempted, numAMContainerPreempted, numNonAMContainerPreempted, numAMContainerPreempted,
memorySeconds, vcoreSeconds); memorySeconds, vcoreSeconds);
} }
@Private
@VisibleForTesting
public void setSystemClock(Clock clock) {
this.systemClock = clock;
}
} }

View File

@ -213,4 +213,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
* @return metrics * @return metrics
*/ */
RMAppAttemptMetrics getRMAppAttemptMetrics(); RMAppAttemptMetrics getRMAppAttemptMetrics();
/**
* the finish time of the application attempt.
* @return the finish time of the application attempt.
*/
long getFinishTime();
} }

View File

@ -42,7 +42,6 @@
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -85,7 +84,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@ -142,6 +140,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private String originalTrackingUrl = "N/A"; private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A";
private long startTime = 0; private long startTime = 0;
private long finishTime = 0;
// Set to null initially. Will eventually get set // Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs // if an RMAppAttemptUnregistrationEvent occurs
@ -739,6 +738,7 @@ public void recover(RMState state) throws Exception {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus(); this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime(); this.startTime = attemptState.getStartTime();
this.finishTime = attemptState.getFinishTime();
this.attemptMetrics.updateAggregateAppResourceUsage( this.attemptMetrics.updateAggregateAppResourceUsage(
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds()); attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
} }
@ -1028,11 +1028,13 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
AggregateAppResourceUsage resUsage = AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage(); this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore(); RMStateStore rmStore = rmContext.getStateStore();
setFinishTime(System.currentTimeMillis());
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime, rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus, stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds()); getFinishTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: " + " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus); + exitStatus);
@ -1747,4 +1749,23 @@ public RMAppAttemptMetrics getRMAppAttemptMetrics() {
// lock // lock
return attemptMetrics; return attemptMetrics;
} }
@Override
public long getFinishTime() {
try {
this.readLock.lock();
return this.finishTime;
} finally {
this.readLock.unlock();
}
}
private void setFinishTime(long finishTime) {
try {
this.writeLock.lock();
this.finishTime = finishTime;
} finally {
this.writeLock.unlock();
}
}
} }

View File

@ -80,6 +80,7 @@ message ApplicationAttemptStateDataProto {
optional int32 am_container_exit_status = 9 [default = -1000]; optional int32 am_container_exit_status = 9 [default = -1000];
optional int64 memory_seconds = 10; optional int64 memory_seconds = 10;
optional int64 vcore_seconds = 11; optional int64 vcore_seconds = 11;
optional int64 finish_time = 12;
} }
message EpochProto { message EpochProto {

View File

@ -278,7 +278,16 @@ public RMApp submitApp(int masterMemory, String name, String user,
boolean waitForAccepted, boolean keepContainers) throws Exception { boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue, return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null); false, null, 0);
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, attemptFailuresValidityInterval);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
@ -286,6 +295,17 @@ public RMApp submitApp(int masterMemory, String name, String user,
int maxAppAttempts, Credentials ts, String appType, int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception { ApplicationId applicationId) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, 0);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) { if (! isAppIdProvided) {
@ -321,6 +341,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
clc.setTokens(securityTokens); clc.setTokens(securityTokens);
} }
sub.setAMContainerSpec(clc); sub.setAMContainerSpec(clc);
sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
req.setApplicationSubmissionContext(sub); req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser = UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@ -53,7 +54,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -584,4 +587,128 @@ public void testRMRestartOrFailoverNotCountedForAMFailures()
rm1.stop(); rm1.stop();
rm2.stop(); rm2.stop();
} }
@Test (timeout = 50000)
public void testRMAppAttemptFailuresValidityInterval() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
// set window size to a larger number : 20s
// we will verify the app should be failed if
// two continuous attempts failed in 20s.
RMApp app = rm1.submitApp(200, 20000);
MockAM am = MockRM.launchAM(app, rm1, nm1);
// Fail current attempt normally
nm1.nodeHeartbeat(am.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FAILED);
// launch the second attempt
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, app.getAppAttempts().size());
Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
.mayBeLastAttempt());
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
am_2.waitForState(RMAppAttemptState.RUNNING);
nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am_2.waitForState(RMAppAttemptState.FAILED);
// current app should be failed.
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
ControlledClock clock = new ControlledClock(new SystemClock());
// set window size to 6s
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 6000);;
app1.setSystemClock(clock);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Fail attempt1 normally
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FAILED);
// launch the second attempt
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, app1.getAppAttempts().size());
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am2.waitForState(RMAppAttemptState.RUNNING);
// wait for 6 seconds
clock.setTime(System.currentTimeMillis() + 6*1000);
// Fail attempt2 normally
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
// can launch the third attempt successfully
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(3, app1.getAppAttempts().size());
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
clock.reset();
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am3.waitForState(RMAppAttemptState.RUNNING);
// Restart rm.
@SuppressWarnings("resource")
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// re-register the NM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
status
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
status.setContainerId(attempt3.getMasterContainer().getId());
status.setContainerState(ContainerState.COMPLETE);
status.setDiagnostics("");
nm1.registerNode(Collections.singletonList(status), null);
rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// Lauch Attempt 4
MockAM am4 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
// wait for 6 seconds
clock.setTime(System.currentTimeMillis() + 6*1000);
// Fail attempt4 normally
nm1
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am4.waitForState(RMAppAttemptState.FAILED);
// can launch the 5th attempt successfully
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am5 =
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1);
clock.reset();
am5.waitForState(RMAppAttemptState.RUNNING);
// Fail attempt5 normally
nm1
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am5.waitForState(RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
rm1.stop();
rm2.stop();
}
} }

View File

@ -318,7 +318,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100, 0, 0); FinalApplicationStatus.SUCCEEDED, 100,
oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(newAttemptState); store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not // test updating the state of an app/attempt whose initial state was not
@ -341,7 +342,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
oldAttemptState.getAppAttemptCredentials(), oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111, 0, 0); FinalApplicationStatus.SUCCEEDED, 111,
oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(dummyAttempt); store.updateApplicationAttemptState(dummyAttempt);
// let things settle down // let things settle down