YARN-8644. Improve unit test for RMAppImpl.FinalTransition. (Contributed by Szilard Nemeth)

This commit is contained in:
Haibo Chen 2018-10-05 09:31:48 -07:00
parent 73c660b43f
commit c968365650
2 changed files with 150 additions and 80 deletions

View File

@ -903,7 +903,6 @@ public int getMaxAppAttempts() {
@Override
public void handle(RMAppEvent event) {
this.writeLock.lock();
try {
@ -1459,8 +1458,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
}
private static final class AppRejectedTransition extends
FinalTransition{
private static final class AppRejectedTransition extends FinalTransition {
public AppRejectedTransition() {
super(RMAppState.FAILED);
}
@ -1502,39 +1500,50 @@ private static class FinalTransition extends RMAppTransition {
private final RMAppState finalState;
public FinalTransition(RMAppState finalState) {
FinalTransition(RMAppState finalState) {
this.finalState = finalState;
}
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.logAggregationStartTime = app.systemClock.getTime();
completeAndCleanupApp(app);
handleAppFinished(app);
app.clearUnusedFields();
appAdminClientCleanUp(app);
}
private void completeAndCleanupApp(RMAppImpl app) {
//cleanup app in RM Nodes
for (NodeId nodeId : app.getRanNodes()) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
app.finishTime = app.storedFinishTime;
if (app.finishTime == 0 ) {
app.finishTime = app.systemClock.getTime();
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
// Recovered apps that are completed were not added to scheduler, so no
// need to remove them from scheduler.
if (app.recoveredFinalState == null) {
app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
finalState));
finalState));
}
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
// Send app completed event to AppManager
app.handler.handle(new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
}
private void handleAppFinished(RMAppImpl app) {
app.logAggregationStartTime = app.systemClock.getTime();
// record finish time
app.finishTime = app.storedFinishTime;
if (app.finishTime == 0) {
app.finishTime = app.systemClock.getTime();
}
//record finish in history and metrics
app.rmContext.getRMApplicationHistoryWriter()
.applicationFinished(app, finalState);
app.rmContext.getSystemMetricsPublisher()
.appFinished(app, finalState, app.finishTime);
// set the memory free
app.clearUnusedFields();
appAdminClientCleanUp(app);
};
}
}
public int getNumFailedAppAttempts() {
@ -1550,7 +1559,7 @@ public int getNumFailedAppAttempts() {
}
private static final class AttemptFailedTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
private final RMAppState initialState;
@ -1812,8 +1821,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
== LogAggregationStatus.TIME_OUT
&& report.getLogAggregationStatus()
== LogAggregationStatus.RUNNING) {
// If the log aggregation status got from latest nm heartbeat
// is Running, and current log aggregation status is TimeOut,
// If the log aggregation status got from latest NM heartbeat
// is RUNNING, and current log aggregation status is TIME_OUT,
// based on whether there are any failure messages for this NM,
// we will reset the log aggregation status as RUNNING or
// RUNNING_WITH_FAILURE
@ -2137,4 +2146,10 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType,
RMAppState state){
/* TODO fail the application on the failed transition */
}
@VisibleForTesting
public long getLogAggregationStartTime() {
return logAggregationStartTime;
}
}

View File

@ -18,28 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -52,9 +31,9 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -89,7 +68,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@ -111,6 +95,29 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(value = Parameterized.class)
public class TestRMAppTransitions {
@ -128,6 +135,8 @@ public class TestRMAppTransitions {
private SystemMetricsPublisher publisher;
private YarnScheduler scheduler;
private TestSchedulerEventDispatcher schedulerDispatcher;
private TestApplicationManagerEventDispatcher appManagerDispatcher;
private long testCaseStartTime;
// ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements
@ -181,8 +190,11 @@ public void handle(RMAppEvent event) {
// ResourceManager.java
private static final class TestApplicationManagerEventDispatcher implements
EventHandler<RMAppManagerEvent> {
List<RMAppManagerEvent> events = Lists.newArrayList();
@Override
public void handle(RMAppManagerEvent event) {
LOG.info("Handling app manager event: " + event);
events.add(event);
}
}
@ -243,7 +255,7 @@ renewer, new AMRMTokenSecretManager(conf, this.rmContext),
ResourceScheduler resourceScheduler = mock(ResourceScheduler.class);
doReturn(null).when(resourceScheduler)
.getAppResourceUsageReport((ApplicationAttemptId)Matchers.any());
.getAppResourceUsageReport(Matchers.any());
doReturn(resourceScheduler).when(rmContext).getScheduler();
doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
@ -254,9 +266,11 @@ renewer, new AMRMTokenSecretManager(conf, this.rmContext),
rmDispatcher.register(RMAppEventType.class,
new TestApplicationEventDispatcher(rmContext));
appManagerDispatcher = new
TestApplicationManagerEventDispatcher();
rmDispatcher.register(RMAppManagerEventType.class,
new TestApplicationManagerEventDispatcher());
appManagerDispatcher);
schedulerDispatcher = new TestSchedulerEventDispatcher();
rmDispatcher.register(SchedulerEventType.class,
@ -264,6 +278,7 @@ renewer, new AMRMTokenSecretManager(conf, this.rmContext),
rmDispatcher.init(conf);
rmDispatcher.start();
testCaseStartTime = System.currentTimeMillis();
}
private ByteBuffer getTokens() throws IOException {
@ -332,7 +347,7 @@ private ContainerRetryContext getContainerRetryContext() {
ContainerRetryContext containerRetryContext = ContainerRetryContext
.newInstance(
ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0);
new HashSet<>(Arrays.asList(111)), 0, 0);
return containerRetryContext;
}
@ -424,17 +439,17 @@ private static void testAppStartState(ApplicationId applicationId,
name, application.getName());
Assert.assertEquals("application finish time is not 0 and should be",
0, application.getFinishTime());
Assert.assertEquals("application tracking url is not correct",
null, application.getTrackingUrl());
Assert.assertNull("application tracking url is not correct",
application.getTrackingUrl());
StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct",
0, diag.length());
}
// test to make sure times are set when app finishes
private static void assertStartTimeSet(RMApp application) {
Assert.assertTrue("application start time is not greater than 0",
application.getStartTime() > 0);
private void assertStartTimeSet(RMApp application) {
Assert.assertTrue("application start time is before test case start time",
application.getStartTime() >= testCaseStartTime);
Assert.assertTrue("application start time is before currentTime",
application.getStartTime() <= System.currentTimeMillis());
}
@ -452,8 +467,6 @@ private static void assertFinalAppStatus(FinalApplicationStatus status, RMApp ap
// test to make sure times are set when app finishes
private void assertTimesAtFinish(RMApp application) {
assertStartTimeSet(application);
Assert.assertTrue("application finish time is not greater than 0",
(application.getFinishTime() > 0));
Assert.assertTrue("application finish time is not >= start time",
(application.getFinishTime() >= application.getStartTime()));
}
@ -537,8 +550,10 @@ protected RMApp testCreateAppSubmittedRecovery(
RMApp application = createNewTestApp(submissionContext);
// NEW => SUBMITTED event RMAppEventType.RECOVER
RMState state = new RMState();
long startTime = testCaseStartTime + 1;
ApplicationStateData appState =
ApplicationStateData.newInstance(123, 123, null, "user", null);
ApplicationStateData.newInstance(testCaseStartTime, startTime, null,
"user", null);
state.getApplicationState().put(application.getApplicationId(), appState);
RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state);
@ -590,14 +605,21 @@ protected RMApp testCreateAppFinalSaving(
}
protected RMApp testCreateAppFinishing(
ApplicationSubmissionContext submissionContext) throws IOException {
ApplicationSubmissionContext submissionContext) throws Exception {
// unmanaged AMs don't use the FINISHING state
assert submissionContext == null || !submissionContext.getUnmanagedAM();
RMApp application = testCreateAppFinalSaving(submissionContext);
Assert.assertNotNull("app shouldn't be null", application);
// FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
RMAppEvent appUpdated =
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
application.handle(appUpdated);
GenericTestUtils.waitFor(() -> {
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
return appAttempt != null &&
RMAppAttemptState.SUBMITTED.equals(appAttempt.getState());
}, 10, 80 * 1000);
assertAppState(RMAppState.FINISHING, application);
assertTimesAtFinish(application);
return application;
@ -605,7 +627,7 @@ protected RMApp testCreateAppFinishing(
protected RMApp testCreateAppFinished(
ApplicationSubmissionContext submissionContext,
String diagnostics) throws IOException {
String diagnostics) throws Exception {
// unmanaged AMs don't use the FINISHING state
RMApp application = null;
if (submissionContext != null && submissionContext.getUnmanagedAM()) {
@ -613,10 +635,17 @@ protected RMApp testCreateAppFinished(
} else {
application = testCreateAppFinishing(submissionContext);
}
verifyAppBeforeFinishEvent(application);
// RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
RMAppEvent finishedEvent = new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FINISHED, diagnostics);
application.handle(finishedEvent);
//only run this verification if we created a finishing app
if (submissionContext == null) {
verifyAppAfterFinishEvent(application);
}
assertAppState(RMAppState.FINISHED, application);
assertTimesAtFinish(application);
// finished without a proper unregister implies failed
@ -627,7 +656,7 @@ protected RMApp testCreateAppFinished(
}
@Test
public void testUnmanagedApp() throws IOException {
public void testUnmanagedApp() throws Exception {
ApplicationSubmissionContext subContext = new ApplicationSubmissionContextPBImpl();
subContext.setUnmanagedAM(true);
@ -659,7 +688,7 @@ public void testUnmanagedApp() throws IOException {
}
@Test
public void testAppSuccessPath() throws IOException {
public void testAppSuccessPath() throws Exception {
LOG.info("--- START: testAppSuccessPath ---");
final String diagMsg = "some diagnostics";
RMApp application = testCreateAppFinished(null, diagMsg);
@ -695,7 +724,7 @@ public void testAppNewKill() throws IOException {
assertKilled(application);
assertAppFinalStateNotSaved(application);
verifyApplicationFinished(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(application, RMAppState.KILLED);
verifyRMAppFieldsForFinalTransitions(application);
}
@ -754,7 +783,7 @@ public void testAppNewSavingKill() throws IOException {
sendAppUpdateSavedEvent(application);
assertKilled(application);
verifyApplicationFinished(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(application, RMAppState.KILLED);
verifyRMAppFieldsForFinalTransitions(application);
}
@ -830,7 +859,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException {
assertKilled(application);
assertAppFinalStateSaved(application);
verifyApplicationFinished(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(application, RMAppState.KILLED);
verifyRMAppFieldsForFinalTransitions(application);
}
@ -894,7 +923,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException {
assertKilled(application);
assertAppFinalStateSaved(application);
verifyApplicationFinished(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(application, RMAppState.KILLED);
verifyRMAppFieldsForFinalTransitions(application);
}
@ -918,7 +947,7 @@ public void testAppAcceptedAttemptKilled() throws IOException,
assertKilled(application);
assertAppFinalStateSaved(application);
verifyApplicationFinished(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(application, RMAppState.KILLED);
}
@Test
@ -942,7 +971,7 @@ public void testAppRunningKill() throws IOException {
sendAppUpdateSavedEvent(application);
assertKilled(application);
verifyApplicationFinished(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
verifyAppRemovedSchedulerEvent(application, RMAppState.KILLED);
verifyRMAppFieldsForFinalTransitions(application);
}
@ -1005,7 +1034,7 @@ public void testAppRunningFailed() throws IOException {
}
@Test
public void testAppAtFinishingIgnoreKill() throws IOException {
public void testAppAtFinishingIgnoreKill() throws Exception {
LOG.info("--- START: testAppAtFinishingIgnoreKill ---");
RMApp application = testCreateAppFinishing(null);
@ -1047,7 +1076,7 @@ public void testAppFinalSavingToFinished() throws IOException {
}
@Test
public void testAppFinishedFinished() throws IOException {
public void testAppFinishedFinished() throws Exception {
LOG.info("--- START: testAppFinishedFinished ---");
RMApp application = testCreateAppFinished(null, "");
@ -1063,6 +1092,7 @@ public void testAppFinishedFinished() throws IOException {
Assert.assertEquals("application diagnostics is not correct",
"", diag.toString());
verifyApplicationFinished(RMAppState.FINISHED);
verifyAppRemovedSchedulerEvent(application, RMAppState.FINISHED);
verifyRMAppFieldsForFinalTransitions(application);
}
@ -1152,7 +1182,7 @@ public void testAppKilledKilled() throws IOException {
}
@Test (timeout = 30000)
public void testAppStartAfterKilled() throws IOException {
public void testAppStartAfterKilled() {
LOG.info("--- START: testAppStartAfterKilled ---");
ApplicationId applicationId = MockApps.newAppID(appId++);
@ -1162,8 +1192,8 @@ null, null, new ApplicationSubmissionContextPBImpl(), null, null,
@Override
protected void onInvalidStateTransition(RMAppEventType rmAppEventType,
RMAppState state) {
Assert.assertTrue("RMAppImpl: can't handle " + rmAppEventType
+ " at state " + state, false);
Assert.fail("RMAppImpl: can't handle " + rmAppEventType
+ " at state " + state);
}
};
@ -1200,8 +1230,7 @@ public void testAppsRecoveringStates() throws Exception {
}
public void testRecoverApplication(ApplicationStateData appState,
RMState rmState)
throws Exception {
RMState rmState) {
ApplicationSubmissionContext submissionContext =
appState.getApplicationSubmissionContext();
RMAppImpl application =
@ -1257,6 +1286,30 @@ public void testGetAppReport() throws IOException {
+ "/"));
}
private void verifyAppBeforeFinishEvent(RMApp app) {
assertEquals(0L, ((RMAppImpl) app).getLogAggregationStartTime());
//RMAppEventType.APP_UPDATE_SAVED sets the finish time
assertTrue("App manager events should not be received!",
appManagerDispatcher.events.isEmpty());
}
private void verifyAppAfterFinishEvent(RMApp app) {
assertTrue(
testCaseStartTime < ((RMAppImpl) app).getLogAggregationStartTime());
assertAppState(RMAppState.FINISHED, app);
verifyAppCompletedEvent(app);
verifyAppRemovedSchedulerEvent(app, RMAppState.FINISHED);
}
private void verifyAppCompletedEvent(RMApp app) {
assertEquals(1, appManagerDispatcher.events.size());
RMAppManagerEvent rmAppManagerEvent = appManagerDispatcher.events.get(0);
assertEquals(RMAppManagerEventType.APP_COMPLETED,
rmAppManagerEvent.getType());
assertEquals(app.getApplicationId().getId(),
rmAppManagerEvent.getApplicationId().getId());
}
private void verifyApplicationFinished(RMAppState state) {
ArgumentCaptor<RMAppState> finalState =
ArgumentCaptor.forClass(RMAppState.class);
@ -1268,14 +1321,16 @@ private void verifyApplicationFinished(RMAppState state) {
Assert.assertEquals(state, finalState.getValue());
}
private void verifyAppRemovedSchedulerEvent(RMAppState finalState) {
Assert.assertEquals(SchedulerEventType.APP_REMOVED,
schedulerDispatcher.lastSchedulerEvent.getType());
if(schedulerDispatcher.lastSchedulerEvent instanceof
AppRemovedSchedulerEvent) {
private void verifyAppRemovedSchedulerEvent(RMApp app,
RMAppState finalState) {
SchedulerEvent lastEvent = schedulerDispatcher.lastSchedulerEvent;
Assert.assertEquals(SchedulerEventType.APP_REMOVED, lastEvent.getType());
if (lastEvent instanceof AppRemovedSchedulerEvent) {
AppRemovedSchedulerEvent appRemovedEvent =
(AppRemovedSchedulerEvent) schedulerDispatcher.lastSchedulerEvent;
(AppRemovedSchedulerEvent) lastEvent;
Assert.assertEquals(finalState, appRemovedEvent.getFinalState());
Assert.assertEquals(app.getApplicationId().getId(),
appRemovedEvent.getApplicationID().getId());
}
}