YARN-5095. flow activities and flow runs are populated with wrong timestamp when RM restarts w/ recovery enabled (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2016-05-25 16:56:49 -07:00
parent 831a3ffd6e
commit 702236129b
3 changed files with 87 additions and 7 deletions

View File

@ -288,8 +288,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
String user) throws YarnException, AccessControlException { String user) throws YarnException, AccessControlException {
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
RMAppImpl application = // Passing start time as -1. It will be eventually set in RMAppImpl
createAndPopulateNewRMApp(submissionContext, submitTime, user, false); // constructor.
RMAppImpl application = createAndPopulateNewRMApp(
submissionContext, submitTime, user, false, -1);
Credentials credentials = null; Credentials credentials = null;
try { try {
credentials = parseCredentials(submissionContext); credentials = parseCredentials(submissionContext);
@ -327,14 +329,14 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// create and recover app. // create and recover app.
RMAppImpl application = RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser(), true); appState.getUser(), true, appState.getStartTime());
application.handle(new RMAppRecoverEvent(appId, rmState)); application.handle(new RMAppRecoverEvent(appId, rmState));
} }
private RMAppImpl createAndPopulateNewRMApp( private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) String user, boolean isRecovery, long startTime)
throws YarnException, AccessControlException { throws YarnException, AccessControlException {
// Do queue mapping // Do queue mapping
if (!isRecovery) { if (!isRecovery) {
@ -391,7 +393,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
submissionContext.getQueue(), submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService, submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(), submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags(), amReq); submissionContext.getApplicationTags(), amReq, startTime);
// Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not // Concurrent app submissions with different applicationIds will not
// influence each other // influence each other

View File

@ -416,8 +416,19 @@ public class RMAppImpl implements RMApp, Recoverable {
Configuration config, String name, String user, String queue, Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
ApplicationMasterService masterService, long submitTime, ApplicationMasterService masterService, long submitTime,
String applicationType, Set<String> applicationTags, String applicationType, Set<String> applicationTags,
ResourceRequest amReq) { ResourceRequest amReq) {
this(applicationId, rmContext, config, name, user, queue, submissionContext,
scheduler, masterService, submitTime, applicationType, applicationTags,
amReq, -1);
}
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
ApplicationMasterService masterService, long submitTime,
String applicationType, Set<String> applicationTags,
ResourceRequest amReq, long startTime) {
this.systemClock = SystemClock.getInstance(); this.systemClock = SystemClock.getInstance();
@ -433,7 +444,11 @@ public class RMAppImpl implements RMApp, Recoverable {
this.scheduler = scheduler; this.scheduler = scheduler;
this.masterService = masterService; this.masterService = masterService;
this.submitTime = submitTime; this.submitTime = submitTime;
this.startTime = this.systemClock.getTime(); if (startTime <= 0) {
this.startTime = this.systemClock.getTime();
} else {
this.startTime = startTime;
}
this.applicationType = applicationType; this.applicationType = applicationType;
this.applicationTags = applicationTags; this.applicationTags = applicationTags;
this.amReq = amReq; this.amReq = amReq;

View File

@ -109,6 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -1125,6 +1126,68 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState()); Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState());
} }
@Test (timeout = 60000)
public void testRMRestartTimelineCollectorContext() throws Exception {
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
MockRM rm1 = null;
MockRM rm2 = null;
try {
rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// submit an app.
RMApp app = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null);
// Check if app info has been saved.
ApplicationStateData appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), app.getApplicationSubmissionContext()
.getApplicationId());
// Allocate the AM
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
ApplicationId appId = app.getApplicationId();
TimelineCollectorContext contextBeforeRestart =
rm1.getRMContext().getRMTimelineCollectorManager().get(appId).
getTimelineEntityContext();
// Restart RM.
rm2 = createMockRM(conf, memStore);
rm2.start();
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
TimelineCollectorContext contextAfterRestart =
rm2.getRMContext().getRMTimelineCollectorManager().get(appId).
getTimelineEntityContext();
Assert.assertEquals("Collector contexts for an app should be same " +
"across restarts", contextBeforeRestart, contextAfterRestart);
} finally {
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
if (rm1 != null) {
rm1.close();
}
if (rm2 != null) {
rm2.close();
}
}
}
@Test (timeout = 60000) @Test (timeout = 60000)
public void testDelegationTokenRestoredInDelegationTokenRenewer() public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception { throws Exception {