YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1565497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-02-07 00:18:46 +00:00
parent 914a9709b9
commit 943b2190d7
7 changed files with 119 additions and 11 deletions

View File

@ -633,6 +633,9 @@ Release 2.3.0 - UNRELEASED
YARN-1661. Fixed DS ApplicationMaster to write the correct exit log. (Vinod
Kumar Vavilapalli via zjshen)
YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. (Vinod
Kumar Vavilapalli via zjshen)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -196,10 +196,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// waiting for the previous AM to exit.
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.ACCEPTED))
.addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
RMAppEventType.KILL,
new FinalSavingTransition(
new AppKilledTransition(), RMAppState.KILLED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
// recovery the app returns ACCEPTED state and the app once again go
// through the scheduler and triggers one more APP_ACCEPTED event at

View File

@ -482,6 +482,13 @@ public class MockRM extends ResourceManager {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
return am;
}
public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
MockAM am = launchAM(app, rm, nm);
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@ -33,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
@ -44,9 +49,17 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -54,7 +67,9 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRM {
private static final Log LOG = LogFactory.getLog(TestRM.class);
@ -397,19 +412,19 @@ public class TestRM {
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
// a failed app
RMApp app2 = rm1.submitApp(200);
MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
// a killed app
RMApp app3 = rm1.submitApp(200);
MockAM am3 = MockRM.launchAM(app3, rm1, nm1);
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
rm1.killApp(app3.getApplicationId());
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@ -449,7 +464,7 @@ public class TestRM {
// a failed app
RMApp app2 = rm1.submitApp(200);
MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
nm1
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
@ -466,10 +481,88 @@ public class TestRM {
Assert.assertEquals(-1, report1.getRpcPort());
}
/**
* Validate killing an application when it is at accepted state.
* @throws Exception exception
*/
@Test (timeout = 60000)
public void testApplicationKillAtAcceptedState() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
final Dispatcher dispatcher = new AsyncDispatcher() {
@Override
public EventHandler getEventHandler() {
class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
@Override
public boolean matches(Object argument) {
if (argument instanceof RMAppAttemptEvent) {
if (((RMAppAttemptEvent) argument).getType().equals(
RMAppAttemptEventType.KILL)) {
return true;
}
}
return false;
}
}
EventHandler handler = spy(super.getEventHandler());
doNothing().when(handler).handle(argThat(new EventArgMatcher()));
return handler;
}
};
MockRM rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
// a failed app
RMApp application = rm.submitApp(200);
MockAM am = MockRM.launchAM(application, rm, nm1);
am.waitForState(RMAppAttemptState.LAUNCHED);
nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.RUNNING);
rm.waitForState(application.getApplicationId(), RMAppState.ACCEPTED);
// Now kill the application before new attempt is launched, the app report
// returns the invalid AM host and port.
KillApplicationRequest request =
KillApplicationRequest.newInstance(application.getApplicationId());
rm.getClientRMService().forceKillApplication(request);
// Specific test for YARN-1689 follows
// Now let's say a race causes AM to register now. This should not crash RM.
am.registerAppAttempt(false);
// We explicitly intercepted the kill-event to RMAppAttempt, so app should
// still be in KILLING state.
rm.waitForState(application.getApplicationId(), RMAppState.KILLING);
// AM should now be in running
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
// Simulate that appAttempt is killed.
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_KILLED));
rm.waitForState(application.getApplicationId(), RMAppState.KILLED);
}
public static void main(String[] args) throws Exception {
TestRM t = new TestRM();
t.testGetNewAppId();
t.testAppWithNoContainers();
t.testAppOnMultiNode();
t.testNMToken();
t.testActivatingApplicationAfterAddingNM();
t.testInvalidateAMHostPortWhenAMFailedOrKilled();
t.testInvalidatedAMHostPortOnAMRestart();
t.testApplicationKillAtAcceptedState();
}
}

View File

@ -69,7 +69,7 @@ public class TestAMRestart {
new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService());
nm2.registerNode();
MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 3;
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,

View File

@ -639,6 +639,13 @@ public class TestRMAppTransitions {
RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
assertAppState(RMAppState.KILLING, application);
RMAppEvent appAttemptKilled =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_KILLED);
application.handle(appAttemptKilled);
assertAppState(RMAppState.FINAL_SAVING, application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
assertAppFinalStateSaved(application);

View File

@ -1389,7 +1389,7 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 8192);
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
MockAM am = MockRM.launchAM(app1, rm, amNodeManager);
MockAM am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager);
int maxAppAttempts = rm.getConfig().getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -1405,7 +1405,7 @@ public class TestRMWebServicesApps extends JerseyTest {
}
// wait for app to start a new attempt.
rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
am = MockRM.launchAM(app1, rm, amNodeManager);
am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager);
numAttempt++;
}
assertEquals("incorrect number of attempts", maxAppAttempts,