YARN-4392. ApplicationCreatedEvent event time resets after RM
restart/failover. Contributed by Naganarasimha G R and Xuan Gong
(cherry picked from commit 4546c7582b
)
This commit is contained in:
parent
04e056f4f5
commit
7decba83ef
|
@ -1051,6 +1051,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4408. Fix issue that NodeManager reports negative running containers.
|
YARN-4408. Fix issue that NodeManager reports negative running containers.
|
||||||
(Robert Kanter via junping_du)
|
(Robert Kanter via junping_du)
|
||||||
|
|
||||||
|
YARN-4392. ApplicationCreatedEvent event time resets after RM restart/failover.
|
||||||
|
(Naganarasimha G R and Xuan Gong via xgong)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.ipc.CallerContext;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
|
@ -444,9 +444,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
this.callerContext = CallerContext.getCurrent();
|
this.callerContext = CallerContext.getCurrent();
|
||||||
|
|
||||||
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
|
|
||||||
rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
|
|
||||||
|
|
||||||
long localLogAggregationStatusTimeout =
|
long localLogAggregationStatusTimeout =
|
||||||
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
||||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
||||||
|
@ -813,6 +810,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
this.startTime = appState.getStartTime();
|
this.startTime = appState.getStartTime();
|
||||||
this.callerContext = appState.getCallerContext();
|
this.callerContext = appState.getCallerContext();
|
||||||
|
|
||||||
|
// send the ATS create Event
|
||||||
|
sendATSCreateEvent(this, this.startTime);
|
||||||
|
|
||||||
for(int i=0; i<appState.getAttemptCount(); ++i) {
|
for(int i=0; i<appState.getAttemptCount(); ++i) {
|
||||||
// create attempt
|
// create attempt
|
||||||
createNewAttempt();
|
createNewAttempt();
|
||||||
|
@ -1084,6 +1084,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
// communication
|
// communication
|
||||||
LOG.info("Storing application with id " + app.applicationId);
|
LOG.info("Storing application with id " + app.applicationId);
|
||||||
app.rmContext.getStateStore().storeNewApplication(app);
|
app.rmContext.getStateStore().storeNewApplication(app);
|
||||||
|
|
||||||
|
// send the ATS create Event
|
||||||
|
app.sendATSCreateEvent(app, app.startTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1734,4 +1737,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
public CallerContext getCallerContext() {
|
public CallerContext getCallerContext() {
|
||||||
return callerContext;
|
return callerContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void sendATSCreateEvent(RMApp app, long startTime) {
|
||||||
|
rmContext.getRMApplicationHistoryWriter().applicationStarted(app);
|
||||||
|
rmContext.getSystemMetricsPublisher().appCreated(app, startTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,15 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.timeout;
|
import static org.mockito.Mockito.timeout;
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
|
@ -113,7 +114,9 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
@ -896,7 +899,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = createMockRM(conf, memStore);
|
MockRM rm1 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
||||||
|
return spy(super.createSystemMetricsPublisher());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rms.add(rm1);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -925,6 +934,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
|
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
|
||||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
||||||
|
|
||||||
|
verify(rm1.getRMContext().getSystemMetricsPublisher(),Mockito.times(3))
|
||||||
|
.appCreated(any(RMApp.class), anyLong());
|
||||||
// restart rm
|
// restart rm
|
||||||
|
|
||||||
MockRM rm2 = new MockRM(conf, memStore) {
|
MockRM rm2 = new MockRM(conf, memStore) {
|
||||||
|
@ -932,10 +943,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
protected RMAppManager createRMAppManager() {
|
protected RMAppManager createRMAppManager() {
|
||||||
return spy(super.createRMAppManager());
|
return spy(super.createRMAppManager());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
||||||
|
return spy(super.createSystemMetricsPublisher());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
rms.add(rm2);
|
rms.add(rm2);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
|
verify(rm2.getRMContext().getSystemMetricsPublisher(),Mockito.times(3))
|
||||||
|
.appCreated(any(RMApp.class), anyLong());
|
||||||
|
|
||||||
GetApplicationsRequest request1 =
|
GetApplicationsRequest request1 =
|
||||||
GetApplicationsRequest.newInstance(EnumSet.of(
|
GetApplicationsRequest.newInstance(EnumSet.of(
|
||||||
YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
|
YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
@ -64,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
|
@ -368,8 +368,6 @@ public class TestRMAppTransitions {
|
||||||
protected RMApp testCreateAppNewSaving(
|
protected RMApp testCreateAppNewSaving(
|
||||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||||
RMApp application = createNewTestApp(submissionContext);
|
RMApp application = createNewTestApp(submissionContext);
|
||||||
verify(writer).applicationStarted(any(RMApp.class));
|
|
||||||
verify(publisher).appCreated(any(RMApp.class), anyLong());
|
|
||||||
// NEW => NEW_SAVING event RMAppEventType.START
|
// NEW => NEW_SAVING event RMAppEventType.START
|
||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
|
||||||
|
|
Loading…
Reference in New Issue