From 80d11eb68e60f88e16d7d41edecbddfc935a6b10 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 2 Oct 2014 23:20:29 -0700 Subject: [PATCH] YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run with both CS and FS. (Wei Yan and kasha via kasha) --- hadoop-yarn-project/CHANGES.txt | 3 + .../ParameterizedSchedulerTestBase.java | 92 +++++++ .../yarn/server/resourcemanager/TestRM.java | 40 ++- .../server/resourcemanager/TestRMRestart.java | 253 +++++++++--------- .../security/TestClientToAMTokens.java | 20 +- 5 files changed, 258 insertions(+), 150 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d1ee7ce61f4..644a6b3d045 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -41,6 +41,9 @@ Release 2.7.0 - UNRELEASED YARN-1979. TestDirectoryCollection fails when the umask is unusual. (Vinod Kumar Vavilapalli and Tsuyoshi OZAWA via junping_du) + YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run + with both CS and FS. (Wei Yan and kasha via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java new file mode 100644 index 00000000000..cfd16001a37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; + + +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) +public abstract class ParameterizedSchedulerTestBase { + protected final static String TEST_DIR = + new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); + private final static String FS_ALLOC_FILE = + new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath(); + + private SchedulerType schedulerType; + private YarnConfiguration conf = null; + + public enum SchedulerType { + CAPACITY, FAIR + } + + public ParameterizedSchedulerTestBase(SchedulerType type) { + schedulerType = type; + } + + public YarnConfiguration getConf() { + return conf; + } + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new SchedulerType[][]{ + {SchedulerType.CAPACITY}, {SchedulerType.FAIR}}); + } + + @Before + public void configureScheduler() throws IOException { + conf = new YarnConfiguration(); + switch (schedulerType) { + case CAPACITY: + conf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + break; + case FAIR: + configureFairScheduler(conf); + break; + } + } + + private void configureFairScheduler(YarnConfiguration conf) throws IOException { + // Disable queueMaxAMShare limitation for fair scheduler + PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("-1.0"); + out.println(""); + out.close(); + + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index cd67ebc216b..3d664f28848 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; @@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.log4j.Level; @@ -75,13 +75,23 @@ import org.junit.Test; import org.mockito.ArgumentMatcher; @SuppressWarnings({"unchecked", "rawtypes"}) -public class TestRM { - +public class TestRM extends ParameterizedSchedulerTestBase { private static final Log LOG = LogFactory.getLog(TestRM.class); // Milliseconds to sleep for when waiting for something to happen private final static int WAIT_SLEEP_MS = 100; + private YarnConfiguration conf; + + public TestRM(SchedulerType type) { + super(type); + } + + @Before + public void setup() { + conf = getConf(); + } + @After public void tearDown() { ClusterMetrics.destroy(); @@ -93,7 +103,7 @@ public class TestRM { public void testGetNewAppId() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - MockRM rm = new MockRM(); + MockRM rm = new MockRM(conf); rm.start(); GetNewApplicationResponse resp = rm.getNewAppId(); @@ -106,7 +116,7 @@ public class TestRM { public void testAppWithNoContainers() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - MockRM rm = new MockRM(); + MockRM rm = new MockRM(conf); rm.start(); MockNM nm1 = rm.registerNode("h1:1234", 5120); @@ -128,7 +138,6 @@ public class TestRM { public void testAppOnMultiNode() throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - YarnConfiguration conf = new YarnConfiguration(); conf.set("yarn.scheduler.capacity.node-locality-delay", "-1"); MockRM rm = new MockRM(conf); rm.start(); @@ -188,7 +197,6 @@ public class TestRM { // corresponding NM Token. @Test (timeout = 20000) public void testNMTokenSentForNormalContainer() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getCanonicalName()); MockRM rm = new MockRM(conf); @@ -240,7 +248,7 @@ public class TestRM { @Test (timeout = 40000) public void testNMToken() throws Exception { - MockRM rm = new MockRM(); + MockRM rm = new MockRM(conf); try { rm.start(); MockNM nm1 = rm.registerNode("h1:1234", 10000); @@ -422,8 +430,6 @@ public class TestRM { @Test (timeout = 300000) public void testActivatingApplicationAfterAddingNM() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - MockRM rm1 = new MockRM(conf); // start like normal because state is empty @@ -469,7 +475,6 @@ public class TestRM { // is killed or failed, so that client doesn't get the wrong information. @Test (timeout = 80000) public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); MockRM rm1 = new MockRM(conf); rm1.start(); @@ -522,7 +527,6 @@ public class TestRM { @Test (timeout = 60000) public void testInvalidatedAMHostPortOnAMRestart() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); MockRM rm1 = new MockRM(conf); rm1.start(); MockNM nm1 = @@ -555,7 +559,6 @@ public class TestRM { @Test (timeout = 60000) public void testApplicationKillAtAcceptedState() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); final Dispatcher dispatcher = new AsyncDispatcher() { @Override public EventHandler getEventHandler() { @@ -632,15 +635,4 @@ public class TestRM { Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted()); } - public static void main(String[] args) throws Exception { - TestRM t = new TestRM(); - t.testGetNewAppId(); - t.testAppWithNoContainers(); - t.testAppOnMultiNode(); - t.testNMToken(); - t.testActivatingApplicationAfterAddingNM(); - t.testInvalidateAMHostPortWhenAMFailedOrKilled(); - t.testInvalidatedAMHostPortOnAMRestart(); - t.testApplicationKillAtAcceptedState(); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 0b3a364c455..b37b648ae8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -29,7 +29,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -109,7 +108,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class TestRMRestart { +public class TestRMRestart extends ParameterizedSchedulerTestBase { private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); @@ -117,12 +116,17 @@ public class TestRMRestart { // Fake rmAddr for token-renewal private static InetSocketAddress rmAddr; + private List rms = new ArrayList(); + + public TestRMRestart(SchedulerType type) { + super(type); + } @Before - public void setup() throws UnknownHostException { + public void setup() throws IOException { + conf = getConf(); Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - conf = new YarnConfiguration(); UserGroupInformation.setConfiguration(conf); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -132,9 +136,24 @@ public class TestRMRestart { @After public void tearDown() { + for (MockRM rm : rms) { + rm.stop(); + } + rms.clear(); + TEMP_DIR.delete(); } + /** + * + * @return a new MockRM that will be stopped at the end of the test. + */ + private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) { + MockRM rm = new MockRM(conf, store); + rms.add(rm); + return rm; + } + @SuppressWarnings("rawtypes") @Test (timeout=180000) public void testRMRestart() throws Exception { @@ -151,7 +170,7 @@ public class TestRMRestart { // PHASE 1: create state in an RM // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); // start like normal because state is empty rm1.start(); @@ -247,7 +266,7 @@ public class TestRMRestart { // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); // start new RM rm2.start(); @@ -315,7 +334,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() - .getAppAttemptId(), 1, ContainerState.COMPLETE); + .getAppAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); nm2.registerNode(); @@ -412,7 +431,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -438,13 +457,11 @@ public class TestRMRestart { rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); // assert the previous AM state is loaded back on RM recovery. rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED); - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -468,7 +485,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - final MockRM rm1 = new MockRM(conf, memStore); + final MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService()); @@ -492,8 +509,7 @@ public class TestRMRestart { .getAppAttemptState(), RMAppAttemptState.RUNNING); // start new RM. - MockRM rm2 = null; - rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -520,7 +536,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart.createNMContainerStatus( - am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); @@ -530,8 +546,7 @@ public class TestRMRestart { // Now restart RM ... // Setting AMLivelinessMonitor interval to be 10 Secs. conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); - MockRM rm3 = null; - rm3 = new MockRM(conf, memStore); + MockRM rm3 = createMockRM(conf, memStore); rm3.start(); // Wait for RM to process all the events as a part of rm recovery. @@ -578,8 +593,7 @@ public class TestRMRestart { memStore.getState().getApplicationState().get(app2.getApplicationId()) .getAttemptCount()); - MockRM rm4 = null; - rm4 = new MockRM(conf, memStore); + MockRM rm4 = createMockRM(conf, memStore); rm4.start(); rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId()); @@ -635,7 +649,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120); RMApp app0 = rm1.submitApp(200); @@ -652,7 +666,7 @@ public class TestRMRestart { Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState()); // start RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); rm2.start(); @@ -661,7 +675,7 @@ public class TestRMRestart { rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); // app final state is saved via the finish event from attempt. Assert.assertEquals(RMAppState.FINISHED, - rmAppState.get(app0.getApplicationId()).getState()); + rmAppState.get(app0.getApplicationId()).getState()); } @Test (timeout = 60000) @@ -674,7 +688,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -696,7 +710,7 @@ public class TestRMRestart { appState.getAttempt(am0.getApplicationAttemptId()).getState()); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId()); rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); @@ -709,8 +723,6 @@ public class TestRMRestart { .contains("Failing the application.")); // failed diagnostics from attempt is lost because the diagnostics from // attempt is not yet available by the time app is saving the app state. - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -724,7 +736,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -746,7 +758,7 @@ public class TestRMRestart { appState.getAttempt(am0.getApplicationAttemptId()).getState()); // restart rm - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId()); rm2.waitForState(app0.getApplicationId(), RMAppState.KILLED); @@ -756,9 +768,7 @@ public class TestRMRestart { ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2); Assert.assertEquals(app0.getDiagnostics().toString(), - appReport.getDiagnostics()); - rm1.stop(); - rm2.stop(); + appReport.getDiagnostics()); } @Test (timeout = 60000) @@ -781,7 +791,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); // create app RMApp app0 = @@ -793,7 +803,7 @@ public class TestRMRestart { rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED); // restart rm - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId()); @@ -812,7 +822,7 @@ public class TestRMRestart { rmState.getApplicationState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -839,7 +849,7 @@ public class TestRMRestart { Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime()); // restart rm - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); // verify application report returns the same app info as the app info @@ -848,9 +858,6 @@ public class TestRMRestart { Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, appReport.getFinalApplicationStatus()); Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl()); - - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -860,7 +867,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -897,7 +904,7 @@ public class TestRMRestart { return spy(super.createRMAppManager()); } }; - + rms.add(rm2); rm2.start(); GetApplicationsRequest request1 = @@ -944,9 +951,6 @@ public class TestRMRestart { // check application summary is logged for the completed apps after RM restart. verify(rm2.getRMAppManager(), times(3)).logApplicationSummary( isA(ApplicationId.class)); - - rm1.stop(); - rm2.stop(); } private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @@ -1012,7 +1016,7 @@ public class TestRMRestart { Map rmAppState = rmState.getApplicationState(); - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1050,7 +1054,7 @@ public class TestRMRestart { // Setting AMLivelinessMonitor interval to be 3 Secs. conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000); // start new RM - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); // verify that maxAppAttempts is set to global value @@ -1069,10 +1073,6 @@ public class TestRMRestart { Assert.assertEquals(RMAppState.FAILED, rmAppState.get(app1.getApplicationId()).getState()); Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState()); - - // stop the RM - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -1154,10 +1154,6 @@ public class TestRMRestart { // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); - - // stop the RM - rm1.stop(); - rm2.stop(); } private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { @@ -1253,8 +1249,6 @@ public class TestRMRestart { Assert.assertArrayEquals(amrmToken.getPassword(), rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword( amrmToken.decodeIdentifier())); - rm1.stop(); - rm2.stop(); } @Test (timeout = 60000) @@ -1402,10 +1396,6 @@ public class TestRMRestart { .getAllTokens(); Assert.assertFalse(allTokensRM2.containsKey(dtId1)); Assert.assertFalse(rmDTState.containsKey(dtId1)); - - // stop the RM - rm1.stop(); - rm2.stop(); } // This is to test submit an application to the new RM with the old delegation @@ -1466,7 +1456,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - final MockRM rm1 = new MockRM(conf, memStore); + final MockRM rm1 = createMockRM(conf, memStore); rm1.start(); // create apps. @@ -1512,7 +1502,7 @@ public class TestRMRestart { RMState rmState = memStore.getState(); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1523,7 +1513,7 @@ public class TestRMRestart { MockAM am0 = launchAM(app0, rm1, nm1); finishApplicationMaster(app0, rm1, nm1, am0); - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1 = rm2.registerNode("127.0.0.1:1234", 15120); @@ -1545,9 +1535,6 @@ public class TestRMRestart { Assert.assertNull(rm2.getRMContext().getRMApps() .get(app0.getApplicationId())); Assert.assertNull(rmAppState.get(app0.getApplicationId())); - - rm1.stop(); - rm2.stop(); } // This is to test RM does not get hang on shutdown. @@ -1564,7 +1551,7 @@ public class TestRMRestart { memStore.init(conf); MockRM rm1 = null; try { - rm1 = new MockRM(conf, memStore); + rm1 = createMockRM(conf, memStore); rm1.start(); Assert.fail(); } catch (Exception e) { @@ -1582,7 +1569,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1698,7 +1685,11 @@ public class TestRMRestart { } } }; - rm1.start(); + try { + rm1.start(); + } finally { + rm1.stop(); + } } @SuppressWarnings("resource") @@ -1711,7 +1702,7 @@ public class TestRMRestart { // PHASE 1: create state in an RM // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1749,7 +1740,7 @@ public class TestRMRestart { // PHASE 2: create new RM and start from old state // create new RM to represent restart and recover state - MockRM rm2 = new MockRM(conf, memStore); + MockRM rm2 = createMockRM(conf, memStore); QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics(); resetQueueMetrics(qm2); assertQueueMetrics(qm2, 0, 0, 0, 0); @@ -1766,7 +1757,7 @@ public class TestRMRestart { NMContainerStatus status = TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() - .getAppAttemptId(), 1, ContainerState.COMPLETE); + .getAppAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); while (loadedApp1.getAppAttempts().size() != 2) { @@ -1795,10 +1786,6 @@ public class TestRMRestart { // finish the AMs finishApplicationMaster(loadedApp1, rm2, nm1, am1); assertQueueMetrics(qm2, 1, 0, 0, 1); - - // stop RM's - rm2.stop(); - rm1.stop(); } @@ -1836,43 +1823,58 @@ public class TestRMRestart { hostFile.getAbsolutePath()); writeToHostsFile(""); final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM(conf) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; + MockRM rm1 = null, rm2 = null; + try { + rm1 = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = rm1.registerNode("localhost:1234", 8000); + MockNM nm2 = rm1.registerNode("host2:1234", 8000); + Assert + .assertEquals(0, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + String ip = NetUtils.normalizeHostName("localhost"); + // Add 2 hosts to exclude list. + writeToHostsFile("host2", ip); + + // refresh nodes + rm1.getNodesListManager().refreshNodes(conf); + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert + .assertTrue( + NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue("The decommisioned metrics are not updated", + NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); + + dispatcher.await(); + Assert + .assertEquals(2, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm1.stop(); + rm1 = null; + Assert + .assertEquals(0, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + + // restart RM. + rm2 = new MockRM(conf); + rm2.start(); + Assert + .assertEquals(2, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + } finally { + if (rm1 != null) { + rm1.stop(); } - }; - rm1.start(); - MockNM nm1 = rm1.registerNode("localhost:1234", 8000); - MockNM nm2 = rm1.registerNode("host2:1234", 8000); - Assert - .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - String ip = NetUtils.normalizeHostName("localhost"); - // Add 2 hosts to exclude list. - writeToHostsFile("host2", ip); - - // refresh nodes - rm1.getNodesListManager().refreshNodes(conf); - NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); - Assert - .assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - nodeHeartbeat = nm2.nodeHeartbeat(true); - Assert.assertTrue("The decommisioned metrics are not updated", - NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - - dispatcher.await(); - Assert - .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - rm1.stop(); - Assert - .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - - // restart RM. - MockRM rm2 = new MockRM(conf); - rm2.start(); - Assert - .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); - rm2.stop(); + if (rm2 != null) { + rm2.stop(); + } + } } // Test Delegation token is renewed synchronously so that recover events @@ -1887,7 +1889,7 @@ public class TestRMRestart { memStore.init(conf); // start RM - MockRM rm1 = new MockRM(conf, memStore); + MockRM rm1 = createMockRM(conf, memStore); rm1.start(); final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -1910,24 +1912,29 @@ public class TestRMRestart { nm1.setResourceTrackerService(getResourceTrackerService()); NMContainerStatus status = TestRMRestart.createNMContainerStatus( - am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(status), null); } }; } }; - // Re-start RM - rm2.start(); - // wait for the 2nd attempt to be started. - RMApp loadedApp0 = - rm2.getRMContext().getRMApps().get(app0.getApplicationId()); - int timeoutSecs = 0; - while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) { - Thread.sleep(200); + try { + // Re-start RM + rm2.start(); + + // wait for the 2nd attempt to be started. + RMApp loadedApp0 = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + int timeoutSecs = 0; + while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) { + Thread.sleep(200); + } + MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); + MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); + } finally { + rm2.stop(); } - MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1); - MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1); } private void writeToHostsFile(String... hosts) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index 0dcd228453e..8b113a00213 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -18,7 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager + .ParameterizedSchedulerTestBase; import static org.junit.Assert.fail; +import org.junit.Before; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -74,7 +78,17 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Test; -public class TestClientToAMTokens { +public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { + private YarnConfiguration conf; + + public TestClientToAMTokens(SchedulerType type) { + super(type); + } + + @Before + public void setup() { + conf = getConf(); + } private interface CustomProtocol { @SuppressWarnings("unused") @@ -151,8 +165,6 @@ public class TestClientToAMTokens { @Test public void testClientToAMTokens() throws Exception { - - final Configuration conf = new Configuration(); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); @@ -267,6 +279,8 @@ public class TestClientToAMTokens { // Now for an authenticated user verifyValidToken(conf, am, token); + + rm.stop(); } private void verifyTokenWithTamperedID(final Configuration conf,