diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6cdd6485c43..d5bd97f8cb6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -6,8 +6,6 @@ Release 2.6.0 - 2014-11-15 NEW FEATURES - YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN. (Abin - Shahab via raviprak) YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java new file mode 100644 index 00000000000..cfd16001a37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; + + +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) +public abstract class ParameterizedSchedulerTestBase { + protected final static String TEST_DIR = + new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); + private final static String FS_ALLOC_FILE = + new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath(); + + private SchedulerType schedulerType; + private YarnConfiguration conf = null; + + public enum SchedulerType { + CAPACITY, FAIR + } + + public ParameterizedSchedulerTestBase(SchedulerType type) { + schedulerType = type; + } + + public YarnConfiguration getConf() { + return conf; + } + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new SchedulerType[][]{ + {SchedulerType.CAPACITY}, {SchedulerType.FAIR}}); + } + + @Before + public void configureScheduler() throws IOException { + conf = new YarnConfiguration(); + switch (schedulerType) { + case CAPACITY: + conf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + break; + case FAIR: + configureFairScheduler(conf); + break; + } + } + + private void configureFairScheduler(YarnConfiguration conf) throws IOException { + // Disable queueMaxAMShare limitation for fair scheduler + PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("-1.0"); + out.println(""); + out.close(); + + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 42a3a001c32..4865420b936 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; @@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.log4j.Level; @@ -75,13 +75,23 @@ import org.junit.Test; import org.mockito.ArgumentMatcher; @SuppressWarnings({"unchecked", "rawtypes"}) -public class TestRM { - +public class TestRM extends ParameterizedSchedulerTestBase { private static final Log LOG = LogFactory.getLog(TestRM.class); // Milliseconds to sleep for when waiting for something to happen private final static int WAIT_SLEEP_MS = 100; + private YarnConfiguration conf; + + public TestRM(SchedulerType type) { + super(type); + } + + @Before + public void setup() { + conf = getConf(); + } + @After public void tearDown() { ClusterMetrics.destroy(); @@ -93,7 +103,7 @@ public class TestRM { public void testGetNewAppId() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - MockRM rm = new MockRM(); + MockRM rm = new MockRM(conf); rm.start(); GetNewApplicationResponse resp = rm.getNewAppId(); @@ -106,7 +116,7 @@ public class TestRM { public void testAppWithNoContainers() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - MockRM rm = new MockRM(); + MockRM rm = new MockRM(conf); rm.start(); MockNM nm1 = rm.registerNode("h1:1234", 5120); @@ -128,7 +138,6 @@ public class TestRM { public void testAppOnMultiNode() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - YarnConfiguration conf = new YarnConfiguration(); conf.set("yarn.scheduler.capacity.node-locality-delay", "-1"); MockRM rm = new MockRM(conf); rm.start(); @@ -188,7 +197,6 @@ public class TestRM { // corresponding NM Token. @Test (timeout = 20000) public void testNMTokenSentForNormalContainer() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getCanonicalName()); MockRM rm = new MockRM(conf); @@ -241,7 +249,7 @@ public class TestRM { @Test (timeout = 40000) public void testNMToken() throws Exception { - MockRM rm = new MockRM(); + MockRM rm = new MockRM(conf); try { rm.start(); MockNM nm1 = rm.registerNode("h1:1234", 10000); @@ -425,8 +433,6 @@ public class TestRM { @Test (timeout = 300000) public void testActivatingApplicationAfterAddingNM() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - MockRM rm1 = new MockRM(conf); // start like normal because state is empty @@ -472,7 +478,6 @@ public class TestRM { // is killed or failed, so that client doesn't get the wrong information. @Test (timeout = 80000) public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); MockRM rm1 = new MockRM(conf); rm1.start(); @@ -525,7 +530,6 @@ public class TestRM { @Test (timeout = 60000) public void testInvalidatedAMHostPortOnAMRestart() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = @@ -558,7 +562,6 @@ public class TestRM { @Test (timeout = 60000) public void testApplicationKillAtAcceptedState() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); final Dispatcher dispatcher = new AsyncDispatcher() { @Override public EventHandler getEventHandler() { @@ -635,15 +638,4 @@ public class TestRM { Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted()); } - public static void main(String[] args) throws Exception { - TestRM t = new TestRM(); - t.testGetNewAppId(); - t.testAppWithNoContainers(); - t.testAppOnMultiNode(); - t.testNMToken(); - t.testActivatingApplicationAfterAddingNM(); - t.testInvalidateAMHostPortWhenAMFailedOrKilled(); - t.testInvalidatedAMHostPortOnAMRestart(); - t.testApplicationKillAtAcceptedState(); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index b93631f5544..a0f86272b78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -29,7 +29,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -108,7 +107,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class TestRMRestart { +public class TestRMRestart extends ParameterizedSchedulerTestBase { private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); @@ -116,12 +115,17 @@ public class TestRMRestart { // Fake rmAddr for token-renewal private static InetSocketAddress rmAddr; + private List rms = new ArrayList(); + + public TestRMRestart(SchedulerType type) { + super(type); + } @Before - public void setup() throws UnknownHostException { + public void setup() throws IOException { + conf = getConf(); Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - conf = new YarnConfiguration(); UserGroupInformation.setConfiguration(conf); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -131,9 +135,24 @@ public class TestRMRestart { @After public void tearDown() { + for (MockRM rm : rms) { + rm.stop(); + } + rms.clear(); + TEMP_DIR.delete(); } + /** + * + * @return a new MockRM that will be stopped at the end of the test. + */ + private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) { + MockRM rm = new MockRM(conf, store); + rms.add(rm); + return rm; + } + @SuppressWarnings("rawtypes") @Test (timeout=180000) public void testRMRestart() throws Exception { @@ -150,7 +169,7 @@ public class TestRMRestart { // PHASE 1: create state in an RM // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); // start like normal because state is empty rm1.start(); @@ -246,7 +265,7 @@ public class TestRMRestart { // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); // start new RM rm2.start(); @@ -317,7 +336,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() - .getAppAttemptId(), 1, ContainerState.COMPLETE); + .getAppAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); nm2.registerNode(); @@ -414,7 +433,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -443,13 +462,11 @@ public class TestRMRestart { rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); // assert the previous AM state is loaded back on RM recovery. rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED); - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -473,7 +490,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - final MockRM rm1 = new MockRM(conf, memStore); + final MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService()); @@ -497,8 +514,7 @@ public class TestRMRestart { .getAppAttemptState(), RMAppAttemptState.RUNNING); // start new RM. - MockRM rm2 = null; - rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -525,7 +541,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart.createNMContainerStatus( - am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); @@ -535,8 +551,7 @@ public class TestRMRestart { // Now restart RM ... // Setting AMLivelinessMonitor interval to be 10 Secs. conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); - MockRM rm3 = null; - rm3 = new MockRM(conf, memStore); + MockRM rm3 = createMockRM(conf, memStore); rm3.start(); // Wait for RM to process all the events as a part of rm recovery. @@ -583,8 +598,7 @@ public class TestRMRestart { memStore.getState().getApplicationState().get(app2.getApplicationId()) .getAttemptCount()); - MockRM rm4 = null; - rm4 = new MockRM(conf, memStore); + MockRM rm4 = createMockRM(conf, memStore); rm4.start(); rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId()); @@ -640,7 +654,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120); RMApp app0 = rm1.submitApp(200); @@ -657,7 +671,7 @@ public class TestRMRestart { Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState()); // start RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); rm2.start(); @@ -666,7 +680,7 @@ public class TestRMRestart { rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); // app final state is saved via the finish event from attempt. Assert.assertEquals(RMAppState.FINISHED, - rmAppState.get(app0.getApplicationId()).getState()); + rmAppState.get(app0.getApplicationId()).getState()); } @Test (timeout = 60000) @@ -679,7 +693,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -701,7 +715,7 @@ public class TestRMRestart { appState.getAttempt(am0.getApplicationAttemptId()).getState()); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId()); rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); @@ -714,8 +728,6 @@ public class TestRMRestart { .contains("Failing the application.")); // failed diagnostics from attempt is lost because the diagnostics from // attempt is not yet available by the time app is saving the app state. - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -729,7 +741,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -751,7 +763,7 @@ public class TestRMRestart { appState.getAttempt(am0.getApplicationAttemptId()).getState()); // restart rm - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId()); rm2.waitForState(app0.getApplicationId(), RMAppState.KILLED); @@ -761,9 +773,7 @@ public class TestRMRestart { ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2); Assert.assertEquals(app0.getDiagnostics().toString(), - appReport.getDiagnostics()); - rm1.stop(); - rm2.stop(); + appReport.getDiagnostics()); } @Test (timeout = 60000) @@ -786,7 +796,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); // create app RMApp app0 = @@ -798,7 +808,7 @@ public class TestRMRestart { rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED); // restart rm - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId()); @@ -817,7 +827,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -844,7 +854,7 @@ public class TestRMRestart { Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime()); // restart rm - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); // verify application report returns the same app info as the app info @@ -853,9 +863,6 @@ public class TestRMRestart { Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, appReport.getFinalApplicationStatus()); Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl()); - - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -865,7 +872,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -902,7 +909,7 @@ public class TestRMRestart { return spy(super.createRMAppManager()); } }; - + rms.add(rm2); rm2.start(); GetApplicationsRequest request1 = @@ -949,9 +956,6 @@ public class TestRMRestart { // check application summary is logged for the completed apps after RM restart. verify(rm2.getRMAppManager(), times(3)).logApplicationSummary( isA(ApplicationId.class)); - - rm1.stop(); - rm2.stop(); } private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @@ -1017,7 +1021,7 @@ public class TestRMRestart { Map rmAppState = rmState.getApplicationState(); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1055,7 +1059,7 @@ public class TestRMRestart { // Setting AMLivelinessMonitor interval to be 3 Secs. conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); // verify that maxAppAttempts is set to global value @@ -1074,10 +1078,6 @@ public class TestRMRestart { Assert.assertEquals(RMAppState.FAILED, rmAppState.get(app1.getApplicationId()).getState()); Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState()); - - // stop the RM - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -1159,10 +1159,6 @@ public class TestRMRestart { // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); - - // stop the RM - rm1.stop(); - rm2.stop(); } private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { @@ -1258,8 +1254,6 @@ public class TestRMRestart { Assert.assertArrayEquals(amrmToken.getPassword(), rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword( amrmToken.decodeIdentifier())); - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -1407,10 +1401,6 @@ public class TestRMRestart { .getAllTokens(); Assert.assertFalse(allTokensRM2.containsKey(dtId1)); Assert.assertFalse(rmDTState.containsKey(dtId1)); - - // stop the RM - rm1.stop(); - rm2.stop(); } // This is to test submit an application to the new RM with the old delegation @@ -1471,7 +1461,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - final MockRM rm1 = new MockRM(conf, memStore); + final MockRM rm1 = createMockRM(conf, memStore); rm1.start(); // create apps. @@ -1517,7 +1507,7 @@ public class TestRMRestart { RMState rmState = memStore.getState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1528,7 +1518,7 @@ public class TestRMRestart { MockAM am0 = launchAM(app0, rm1, nm1); finishApplicationMaster(app0, rm1, nm1, am0); - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1 = rm2.registerNode("127.0.0.1:1234", 15120); @@ -1550,9 +1540,6 @@ public class TestRMRestart { Assert.assertNull(rm2.getRMContext().getRMApps() .get(app0.getApplicationId())); Assert.assertNull(rmAppState.get(app0.getApplicationId())); - - rm1.stop(); - rm2.stop(); } // This is to test RM does not get hang on shutdown. @@ -1569,7 +1556,7 @@ public class TestRMRestart { memStore.init(conf); MockRM rm1 = null; try { - rm1 = new MockRM(conf, memStore); + rm1 = createMockRM(conf, memStore); rm1.start(); Assert.fail(); } catch (Exception e) { @@ -1587,7 +1574,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1703,7 +1690,11 @@ public class TestRMRestart { } } }; - rm1.start(); + try { + rm1.start(); + } finally { + rm1.stop(); + } } @SuppressWarnings("resource") @@ -1716,7 +1707,7 @@ public class TestRMRestart { // PHASE 1: create state in an RM // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1754,7 +1745,7 @@ public class TestRMRestart { // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics(); resetQueueMetrics(qm2); assertQueueMetrics(qm2, 0, 0, 0, 0); @@ -1770,7 +1761,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() - .getAppAttemptId(), 1, ContainerState.COMPLETE); + .getAppAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); while (loadedApp1.getAppAttempts().size() != 2) { @@ -1799,10 +1790,6 @@ public class TestRMRestart { // finish the AMs finishApplicationMaster(loadedApp1, rm2, nm1, am1); assertQueueMetrics(qm2, 1, 0, 0, 1); - - // stop RM's - rm2.stop(); - rm1.stop(); } @@ -1840,43 +1827,58 @@ public class TestRMRestart { hostFile.getAbsolutePath()); writeToHostsFile(""); final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM(conf) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; + MockRM rm1 = null, rm2 = null; + try { + rm1 = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("localhost:1234", 8000); + MockNM nm2 = rm1.registerNode("host2:1234", 8000); + Assert + .assertEquals(0, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + String ip = NetUtils.normalizeHostName("localhost"); + // Add 2 hosts to exclude list. + writeToHostsFile("host2", ip); + + // refresh nodes + rm1.getNodesListManager().refreshNodes(conf); + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert + .assertTrue( + NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue("The decommisioned metrics are not updated", + NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); + + dispatcher.await(); + Assert + .assertEquals(2, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm1.stop(); + rm1 = null; + Assert + .assertEquals(0, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + + // restart RM. + rm2 = new MockRM(conf); + rm2.start(); + Assert + .assertEquals(2, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + } finally { + if (rm1 != null) { + rm1.stop(); } - }; - rm1.start(); - MockNM nm1 = rm1.registerNode("localhost:1234", 8000); - MockNM nm2 = rm1.registerNode("host2:1234", 8000); - Assert - .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - String ip = NetUtils.normalizeHostName("localhost"); - // Add 2 hosts to exclude list. - writeToHostsFile("host2", ip); - - // refresh nodes - rm1.getNodesListManager().refreshNodes(conf); - NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); - Assert - .assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - nodeHeartbeat = nm2.nodeHeartbeat(true); - Assert.assertTrue("The decommisioned metrics are not updated", - NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - - dispatcher.await(); - Assert - .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - rm1.stop(); - Assert - .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - - // restart RM. - MockRM rm2 = new MockRM(conf); - rm2.start(); - Assert - .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - rm2.stop(); + if (rm2 != null) { + rm2.stop(); + } + } } // Test Delegation token is renewed synchronously so that recover events @@ -1891,7 +1893,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1914,24 +1916,29 @@ public class TestRMRestart { nm1.setResourceTrackerService(getResourceTrackerService()); NMContainerStatus status = TestRMRestart.createNMContainerStatus( - am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); } }; } }; - // Re-start RM - rm2.start(); - // wait for the 2nd attempt to be started. - RMApp loadedApp0 = - rm2.getRMContext().getRMApps().get(app0.getApplicationId()); - int timeoutSecs = 0; - while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) { - Thread.sleep(200); + try { + // Re-start RM + rm2.start(); + + // wait for the 2nd attempt to be started. + RMApp loadedApp0 = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + int timeoutSecs = 0; + while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) { + Thread.sleep(200); + } + MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); + MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); + } finally { + rm2.stop(); } - MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); - MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); } private void writeToHostsFile(String... hosts) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index b5636bda018..78bc728ac77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -18,7 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager + .ParameterizedSchedulerTestBase; import static org.junit.Assert.fail; +import org.junit.Before; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -79,7 +83,17 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Test; -public class TestClientToAMTokens { +public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { + private YarnConfiguration conf; + + public TestClientToAMTokens(SchedulerType type) { + super(type); + } + + @Before + public void setup() { + conf = getConf(); + } private interface CustomProtocol { @SuppressWarnings("unused") @@ -166,8 +180,6 @@ public class TestClientToAMTokens { @Test public void testClientToAMTokens() throws Exception { - - final Configuration conf = new Configuration(); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf);