YARN-7577. Unit Fail: TestAMRestart#testPreemptedAMRestartOnRMRestart (miklos.szegedi@cloudera.com via rkanter)

This commit is contained in:
Robert Kanter 2017-12-20 13:39:00 -08:00
parent 1ba491ff90
commit 382215c72b
1 changed files with 73 additions and 58 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@ -63,14 +65,20 @@ import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestAMRestart { /**
* Test AM restart functions.
*/
public class TestAMRestart extends ParameterizedSchedulerTestBase {
public TestAMRestart(SchedulerType type) throws IOException {
super(type);
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testAMRestartWithExistingContainers() throws Exception { public void testAMRestartWithExistingContainers() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
rm1.start(); rm1.start();
RMApp app1 = RMApp app1 =
rm1.submitApp(200, "name", "user", rm1.submitApp(200, "name", "user",
@ -266,15 +274,14 @@ public class TestAMRestart {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testNMTokensRebindOnAMRestart() throws Exception { public void testNMTokensRebindOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
// To prevent test from blacklisting nm1 for AM, we sit threshold to half // To prevent test from blacklisting nm1 for AM, we sit threshold to half
// of 2 nodes which is 1 // of 2 nodes which is 1
conf.setFloat( getConf().setFloat(
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD, YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
0.5f); 0.5f);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
rm1.start(); rm1.start();
RMApp app1 = RMApp app1 =
rm1.submitApp(200, "myname", "myuser", rm1.submitApp(200, "myname", "myuser",
@ -378,11 +385,11 @@ public class TestAMRestart {
// should not be counted towards AM max retry count. // should not be counted towards AM max retry count.
@Test(timeout = 100000) @Test(timeout = 100000)
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); getConf().set(
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
rm1.start(); rm1.start();
MockNM nm1 = MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
@ -503,11 +510,11 @@ public class TestAMRestart {
@Test(timeout = 100000) @Test(timeout = 100000)
public void testMaxAttemptOneMeansOne() throws Exception { public void testMaxAttemptOneMeansOne() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); getConf().set(
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
rm1.start(); rm1.start();
MockNM nm1 = MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
@ -537,14 +544,15 @@ public class TestAMRestart {
// re-launch the AM. // re-launch the AM.
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPreemptedAMRestartOnRMRestart() throws Exception { public void testPreemptedAMRestartOnRMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); getConf().setBoolean(
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); getConf().set(
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
rm1.start(); rm1.start();
MockNM nm1 = MockNM nm1 =
@ -584,12 +592,19 @@ public class TestAMRestart {
ApplicationStateData appState = ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId()); memStore.getState().getApplicationState().get(app1.getApplicationId());
Assert.assertEquals(2, appState.getAttemptCount()); Assert.assertEquals(2, appState.getAttemptCount());
// attempt stored has the preempted container exit status. if (getSchedulerType().equals(SchedulerType.FAIR)) {
Assert.assertEquals(ContainerExitStatus.PREEMPTED, // attempt stored has the preempted container exit status.
appState.getAttempt(am2.getApplicationAttemptId()) Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
.getAMContainerExitStatus()); appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus());
} else {
// attempt stored has the preempted container exit status.
Assert.assertEquals(ContainerExitStatus.PREEMPTED,
appState.getAttempt(am2.getApplicationAttemptId())
.getAMContainerExitStatus());
}
// Restart rm. // Restart rm.
MockRM rm2 = new MockRM(conf, memStore); MockRM rm2 = new MockRM(getConf(), memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1.registerNode(); nm1.registerNode();
rm2.start(); rm2.start();
@ -615,15 +630,16 @@ public class TestAMRestart {
@Test(timeout = 50000) @Test(timeout = 50000)
public void testRMRestartOrFailoverNotCountedForAMFailures() public void testRMRestartOrFailoverNotCountedForAMFailures()
throws Exception { throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); getConf().setBoolean(
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); getConf().set(
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 2. // explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
rm1.start(); rm1.start();
AbstractYarnScheduler scheduler = AbstractYarnScheduler scheduler =
@ -651,7 +667,7 @@ public class TestAMRestart {
RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
// Restart rm. // Restart rm.
MockRM rm2 = new MockRM(conf, memStore); MockRM rm2 = new MockRM(getConf(), memStore);
rm2.start(); rm2.start();
ApplicationStateData appState = ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId()); memStore.getState().getApplicationState().get(app1.getApplicationId());
@ -688,14 +704,15 @@ public class TestAMRestart {
@Test (timeout = 120000) @Test (timeout = 120000)
public void testRMAppAttemptFailuresValidityInterval() throws Exception { public void testRMAppAttemptFailuresValidityInterval() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); getConf().setBoolean(
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); getConf().set(
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
// explicitly set max-am-retry count as 2. // explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
rm1.start(); rm1.start();
MockMemoryRMStateStore memStore = MockMemoryRMStateStore memStore =
@ -765,7 +782,7 @@ public class TestAMRestart {
// Restart rm. // Restart rm.
@SuppressWarnings("resource") @SuppressWarnings("resource")
MockRM rm2 = new MockRM(conf, memStore); MockRM rm2 = new MockRM(getConf(), memStore);
rm2.start(); rm2.start();
MockMemoryRMStateStore memStore1 = MockMemoryRMStateStore memStore1 =
@ -834,12 +851,11 @@ public class TestAMRestart {
return false; return false;
} }
@Test(timeout = 30000) @Test(timeout = 40000)
public void testAMRestartNotLostContainerCompleteMsg() throws Exception { public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
rm1.start(); rm1.start();
RMApp app1 = RMApp app1 =
rm1.submitApp(200, "name", "user", rm1.submitApp(200, "name", "user",
@ -934,11 +950,10 @@ public class TestAMRestart {
@Test (timeout = 20000) @Test (timeout = 20000)
public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
throws Exception { throws Exception {
YarnConfiguration conf = new YarnConfiguration();
// explicitly set max-am-retry count as 2. // explicitly set max-am-retry count as 2.
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
rm1.start(); rm1.start();
MockNM nm1 = MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
@ -1019,16 +1034,16 @@ public class TestAMRestart {
@Test(timeout = 200000) @Test(timeout = 200000)
public void testContainersFromPreviousAttemptsWithRMRestart() public void testContainersFromPreviousAttemptsWithRMRestart()
throws Exception { throws Exception {
YarnConfiguration conf = new YarnConfiguration(); getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); getConf().setBoolean(
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setLong( getConf().setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); getConf()
.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm1 = new MockRM(conf); MockRM rm1 = new MockRM(getConf());
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
rm1.start(); rm1.start();
YarnScheduler scheduler = rm1.getResourceScheduler(); YarnScheduler scheduler = rm1.getResourceScheduler();
@ -1071,7 +1086,7 @@ public class TestAMRestart {
(AbstractYarnScheduler)scheduler, am1.getApplicationAttemptId()); (AbstractYarnScheduler)scheduler, am1.getApplicationAttemptId());
// restart rm // restart rm
MockRM rm2 = new MockRM(conf, memStore); MockRM rm2 = new MockRM(getConf(), memStore);
rm2.start(); rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.setResourceTrackerService(rm2.getResourceTrackerService());
NMContainerStatus container2Status = NMContainerStatus container2Status =