YARN-2459. RM crashes if App gets rejected for any reason and HA is enabled. Contributed by Jian He
This commit is contained in:
parent
b4b59ef749
commit
b91ef0c502
|
@ -285,6 +285,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-1458. FairScheduler: Zero weight can lead to livelock.
|
YARN-1458. FairScheduler: Zero weight can lead to livelock.
|
||||||
(Zhihai Xu via kasha)
|
(Zhihai Xu via kasha)
|
||||||
|
|
||||||
|
YARN-2459. RM crashes if App gets rejected for any reason
|
||||||
|
and HA is enabled. (Jian He via xgong)
|
||||||
|
|
||||||
Release 2.5.1 - UNRELEASED
|
Release 2.5.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -401,7 +401,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Credentials parseCredentials(ApplicationSubmissionContext application)
|
protected Credentials parseCredentials(ApplicationSubmissionContext application)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
||||||
|
|
|
@ -150,8 +150,10 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
|
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
|
||||||
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
|
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
|
||||||
new AppKilledTransition())
|
new AppKilledTransition())
|
||||||
.addTransition(RMAppState.NEW, RMAppState.FAILED,
|
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
|
RMAppEventType.APP_REJECTED,
|
||||||
|
new FinalSavingTransition(new AppRejectedTransition(),
|
||||||
|
RMAppState.FAILED))
|
||||||
|
|
||||||
// Transitions from NEW_SAVING state
|
// Transitions from NEW_SAVING state
|
||||||
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
||||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
@ -92,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -1606,6 +1609,53 @@ public class TestRMRestart {
|
||||||
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
|
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test Application that fails on submission is saved in state store.
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testAppFailedOnSubmissionSavedInStateStore() throws Exception {
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
MockRM rm1 = new TestSecurityMockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMAppManager createRMAppManager() {
|
||||||
|
return new TestRMAppManager(this.rmContext, this.scheduler,
|
||||||
|
this.masterService, this.applicationACLsManager, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestRMAppManager extends RMAppManager {
|
||||||
|
|
||||||
|
public TestRMAppManager(RMContext context, YarnScheduler scheduler,
|
||||||
|
ApplicationMasterService masterService,
|
||||||
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
|
super(context, scheduler, masterService, applicationACLsManager, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Credentials parseCredentials(
|
||||||
|
ApplicationSubmissionContext application) throws IOException {
|
||||||
|
throw new IOException("Parsing credential error.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm1.start();
|
||||||
|
RMApp app1 =
|
||||||
|
rm1.submitApp(200, "name", "user",
|
||||||
|
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
|
||||||
|
null, "MAPREDUCE", false);
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
|
// Check app staet is saved in state store.
|
||||||
|
Assert.assertEquals(RMAppState.FAILED, memStore.getState()
|
||||||
|
.getApplicationState().get(app1.getApplicationId()).getState());
|
||||||
|
|
||||||
|
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
// Restarted RM has the failed app info too.
|
||||||
|
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testQueueMetricsOnRMRestart() throws Exception {
|
public void testQueueMetricsOnRMRestart() throws Exception {
|
||||||
|
|
|
@ -526,10 +526,28 @@ public class TestRMAppTransitions {
|
||||||
rmDispatcher.await();
|
rmDispatcher.await();
|
||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateNotSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verifyApplicationFinished(RMAppState.FAILED);
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testAppNewRejectAddToStore() throws IOException {
|
||||||
|
LOG.info("--- START: testAppNewRejectAddToStore ---");
|
||||||
|
|
||||||
|
RMApp application = createNewTestApp(null);
|
||||||
|
// NEW => FAILED event RMAppEventType.APP_REJECTED
|
||||||
|
String rejectedText = "Test Application Rejected";
|
||||||
|
RMAppEvent event =
|
||||||
|
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
||||||
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
|
sendAppUpdateSavedEvent(application);
|
||||||
|
assertFailed(application, rejectedText);
|
||||||
|
assertAppFinalStateSaved(application);
|
||||||
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
|
rmContext.getStateStore().removeApplication(application);
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testAppNewSavingKill() throws IOException {
|
public void testAppNewSavingKill() throws IOException {
|
||||||
LOG.info("--- START: testAppNewSavingKill ---");
|
LOG.info("--- START: testAppNewSavingKill ---");
|
||||||
|
|
Loading…
Reference in New Issue