YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run with both CS and FS. (Wei Yan and kasha via kasha)

This commit is contained in:
Karthik Kambatla 2014-10-02 23:20:29 -07:00
parent eb6ce5e97c
commit 80d11eb68e
5 changed files with 258 additions and 150 deletions

View File

@ -41,6 +41,9 @@ Release 2.7.0 - UNRELEASED
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
(Vinod Kumar Vavilapalli and Tsuyoshi OZAWA via junping_du)
YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run
with both CS and FS. (Wei Yan and kasha via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collection;
@RunWith(Parameterized.class)
public abstract class ParameterizedSchedulerTestBase {
protected final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
private final static String FS_ALLOC_FILE =
new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath();
private SchedulerType schedulerType;
private YarnConfiguration conf = null;
public enum SchedulerType {
CAPACITY, FAIR
}
public ParameterizedSchedulerTestBase(SchedulerType type) {
schedulerType = type;
}
public YarnConfiguration getConf() {
return conf;
}
@Parameterized.Parameters
public static Collection<SchedulerType[]> getParameters() {
return Arrays.asList(new SchedulerType[][]{
{SchedulerType.CAPACITY}, {SchedulerType.FAIR}});
}
@Before
public void configureScheduler() throws IOException {
conf = new YarnConfiguration();
switch (schedulerType) {
case CAPACITY:
conf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
break;
case FAIR:
configureFairScheduler(conf);
break;
}
}
private void configureFairScheduler(YarnConfiguration conf) throws IOException {
// Disable queueMaxAMShare limitation for fair scheduler
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>");
out.println("</allocations>");
out.close();
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.junit.Before;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.log4j.Level;
@ -75,13 +75,23 @@ import org.junit.Test;
import org.mockito.ArgumentMatcher;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRM {
public class TestRM extends ParameterizedSchedulerTestBase {
private static final Log LOG = LogFactory.getLog(TestRM.class);
// Milliseconds to sleep for when waiting for something to happen
private final static int WAIT_SLEEP_MS = 100;
private YarnConfiguration conf;
public TestRM(SchedulerType type) {
super(type);
}
@Before
public void setup() {
conf = getConf();
}
@After
public void tearDown() {
ClusterMetrics.destroy();
@ -93,7 +103,7 @@ public class TestRM {
public void testGetNewAppId() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
MockRM rm = new MockRM(conf);
rm.start();
GetNewApplicationResponse resp = rm.getNewAppId();
@ -106,7 +116,7 @@ public class TestRM {
public void testAppWithNoContainers() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
@ -128,7 +138,6 @@ public class TestRM {
public void testAppOnMultiNode() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
YarnConfiguration conf = new YarnConfiguration();
conf.set("yarn.scheduler.capacity.node-locality-delay", "-1");
MockRM rm = new MockRM(conf);
rm.start();
@ -188,7 +197,6 @@ public class TestRM {
// corresponding NM Token.
@Test (timeout = 20000)
public void testNMTokenSentForNormalContainer() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName());
MockRM rm = new MockRM(conf);
@ -240,7 +248,7 @@ public class TestRM {
@Test (timeout = 40000)
public void testNMToken() throws Exception {
MockRM rm = new MockRM();
MockRM rm = new MockRM(conf);
try {
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 10000);
@ -422,8 +430,6 @@ public class TestRM {
@Test (timeout = 300000)
public void testActivatingApplicationAfterAddingNM() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf);
// start like normal because state is empty
@ -469,7 +475,6 @@ public class TestRM {
// is killed or failed, so that client doesn't get the wrong information.
@Test (timeout = 80000)
public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MockRM rm1 = new MockRM(conf);
rm1.start();
@ -522,7 +527,6 @@ public class TestRM {
@Test (timeout = 60000)
public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 =
@ -555,7 +559,6 @@ public class TestRM {
@Test (timeout = 60000)
public void testApplicationKillAtAcceptedState() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
final Dispatcher dispatcher = new AsyncDispatcher() {
@Override
public EventHandler getEventHandler() {
@ -632,15 +635,4 @@ public class TestRM {
Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted());
}
public static void main(String[] args) throws Exception {
TestRM t = new TestRM();
t.testGetNewAppId();
t.testAppWithNoContainers();
t.testAppOnMultiNode();
t.testNMToken();
t.testActivatingApplicationAfterAddingNM();
t.testInvalidateAMHostPortWhenAMFailedOrKilled();
t.testInvalidatedAMHostPortOnAMRestart();
t.testApplicationKillAtAcceptedState();
}
}

View File

@ -29,7 +29,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -109,7 +108,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestRMRestart {
public class TestRMRestart extends ParameterizedSchedulerTestBase {
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
@ -117,12 +116,17 @@ public class TestRMRestart {
// Fake rmAddr for token-renewal
private static InetSocketAddress rmAddr;
private List<MockRM> rms = new ArrayList<MockRM>();
public TestRMRestart(SchedulerType type) {
super(type);
}
@Before
public void setup() throws UnknownHostException {
public void setup() throws IOException {
conf = getConf();
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
@ -132,9 +136,24 @@ public class TestRMRestart {
@After
public void tearDown() {
for (MockRM rm : rms) {
rm.stop();
}
rms.clear();
TEMP_DIR.delete();
}
/**
*
* @return a new MockRM that will be stopped at the end of the test.
*/
private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) {
MockRM rm = new MockRM(conf, store);
rms.add(rm);
return rm;
}
@SuppressWarnings("rawtypes")
@Test (timeout=180000)
public void testRMRestart() throws Exception {
@ -151,7 +170,7 @@ public class TestRMRestart {
// PHASE 1: create state in an RM
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
// start like normal because state is empty
rm1.start();
@ -247,7 +266,7 @@ public class TestRMRestart {
// PHASE 2: create new RM and start from old state
// create new RM to represent restart and recover state
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
// start new RM
rm2.start();
@ -315,7 +334,7 @@ public class TestRMRestart {
NMContainerStatus status =
TestRMRestart
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
.getAppAttemptId(), 1, ContainerState.COMPLETE);
.getAppAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status), null);
nm2.registerNode();
@ -412,7 +431,7 @@ public class TestRMRestart {
rmState.getApplicationState();
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -438,13 +457,11 @@ public class TestRMRestart {
rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
// assert the previous AM state is loaded back on RM recovery.
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
rm1.stop();
rm2.stop();
}
@Test (timeout = 60000)
@ -468,7 +485,7 @@ public class TestRMRestart {
rmState.getApplicationState();
// start RM
final MockRM rm1 = new MockRM(conf, memStore);
final MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
@ -492,8 +509,7 @@ public class TestRMRestart {
.getAppAttemptState(), RMAppAttemptState.RUNNING);
// start new RM.
MockRM rm2 = null;
rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
@ -520,7 +536,7 @@ public class TestRMRestart {
NMContainerStatus status =
TestRMRestart.createNMContainerStatus(
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status), null);
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1);
@ -530,8 +546,7 @@ public class TestRMRestart {
// Now restart RM ...
// Setting AMLivelinessMonitor interval to be 10 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
MockRM rm3 = null;
rm3 = new MockRM(conf, memStore);
MockRM rm3 = createMockRM(conf, memStore);
rm3.start();
// Wait for RM to process all the events as a part of rm recovery.
@ -578,8 +593,7 @@ public class TestRMRestart {
memStore.getState().getApplicationState().get(app2.getApplicationId())
.getAttemptCount());
MockRM rm4 = null;
rm4 = new MockRM(conf, memStore);
MockRM rm4 = createMockRM(conf, memStore);
rm4.start();
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
@ -635,7 +649,7 @@ public class TestRMRestart {
rmState.getApplicationState();
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
RMApp app0 = rm1.submitApp(200);
@ -652,7 +666,7 @@ public class TestRMRestart {
Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
// start RM
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
@ -661,7 +675,7 @@ public class TestRMRestart {
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
// app final state is saved via the finish event from attempt.
Assert.assertEquals(RMAppState.FINISHED,
rmAppState.get(app0.getApplicationId()).getState());
rmAppState.get(app0.getApplicationId()).getState());
}
@Test (timeout = 60000)
@ -674,7 +688,7 @@ public class TestRMRestart {
rmState.getApplicationState();
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -696,7 +710,7 @@ public class TestRMRestart {
appState.getAttempt(am0.getApplicationAttemptId()).getState());
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
@ -709,8 +723,6 @@ public class TestRMRestart {
.contains("Failing the application."));
// failed diagnostics from attempt is lost because the diagnostics from
// attempt is not yet available by the time app is saving the app state.
rm1.stop();
rm2.stop();
}
@Test (timeout = 60000)
@ -724,7 +736,7 @@ public class TestRMRestart {
rmState.getApplicationState();
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -746,7 +758,7 @@ public class TestRMRestart {
appState.getAttempt(am0.getApplicationAttemptId()).getState());
// restart rm
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
rm2.waitForState(app0.getApplicationId(), RMAppState.KILLED);
@ -756,9 +768,7 @@ public class TestRMRestart {
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
Assert.assertEquals(app0.getDiagnostics().toString(),
appReport.getDiagnostics());
rm1.stop();
rm2.stop();
appReport.getDiagnostics());
}
@Test (timeout = 60000)
@ -781,7 +791,7 @@ public class TestRMRestart {
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
// create app
RMApp app0 =
@ -793,7 +803,7 @@ public class TestRMRestart {
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
// restart rm
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
RMApp loadedApp0 =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
@ -812,7 +822,7 @@ public class TestRMRestart {
rmState.getApplicationState();
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -839,7 +849,7 @@ public class TestRMRestart {
Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime());
// restart rm
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
// verify application report returns the same app info as the app info
@ -848,9 +858,6 @@ public class TestRMRestart {
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
rm1.stop();
rm2.stop();
}
@Test (timeout = 60000)
@ -860,7 +867,7 @@ public class TestRMRestart {
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -897,7 +904,7 @@ public class TestRMRestart {
return spy(super.createRMAppManager());
}
};
rms.add(rm2);
rm2.start();
GetApplicationsRequest request1 =
@ -944,9 +951,6 @@ public class TestRMRestart {
// check application summary is logged for the completed apps after RM restart.
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
isA(ApplicationId.class));
rm1.stop();
rm2.stop();
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@ -1012,7 +1016,7 @@ public class TestRMRestart {
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -1050,7 +1054,7 @@ public class TestRMRestart {
// Setting AMLivelinessMonitor interval to be 3 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
// verify that maxAppAttempts is set to global value
@ -1069,10 +1073,6 @@ public class TestRMRestart {
Assert.assertEquals(RMAppState.FAILED,
rmAppState.get(app1.getApplicationId()).getState());
Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState());
// stop the RM
rm1.stop();
rm2.stop();
}
@Test (timeout = 60000)
@ -1154,10 +1154,6 @@ public class TestRMRestart {
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// stop the RM
rm1.stop();
rm2.stop();
}
private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
@ -1253,8 +1249,6 @@ public class TestRMRestart {
Assert.assertArrayEquals(amrmToken.getPassword(),
rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
amrmToken.decodeIdentifier()));
rm1.stop();
rm2.stop();
}
@Test (timeout = 60000)
@ -1402,10 +1396,6 @@ public class TestRMRestart {
.getAllTokens();
Assert.assertFalse(allTokensRM2.containsKey(dtId1));
Assert.assertFalse(rmDTState.containsKey(dtId1));
// stop the RM
rm1.stop();
rm2.stop();
}
// This is to test submit an application to the new RM with the old delegation
@ -1466,7 +1456,7 @@ public class TestRMRestart {
memStore.init(conf);
// start RM
final MockRM rm1 = new MockRM(conf, memStore);
final MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
// create apps.
@ -1512,7 +1502,7 @@ public class TestRMRestart {
RMState rmState = memStore.getState();
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -1523,7 +1513,7 @@ public class TestRMRestart {
MockAM am0 = launchAM(app0, rm1, nm1);
finishApplicationMaster(app0, rm1, nm1, am0);
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
@ -1545,9 +1535,6 @@ public class TestRMRestart {
Assert.assertNull(rm2.getRMContext().getRMApps()
.get(app0.getApplicationId()));
Assert.assertNull(rmAppState.get(app0.getApplicationId()));
rm1.stop();
rm2.stop();
}
// This is to test RM does not get hang on shutdown.
@ -1564,7 +1551,7 @@ public class TestRMRestart {
memStore.init(conf);
MockRM rm1 = null;
try {
rm1 = new MockRM(conf, memStore);
rm1 = createMockRM(conf, memStore);
rm1.start();
Assert.fail();
} catch (Exception e) {
@ -1582,7 +1569,7 @@ public class TestRMRestart {
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -1698,7 +1685,11 @@ public class TestRMRestart {
}
}
};
rm1.start();
try {
rm1.start();
} finally {
rm1.stop();
}
}
@SuppressWarnings("resource")
@ -1711,7 +1702,7 @@ public class TestRMRestart {
// PHASE 1: create state in an RM
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -1749,7 +1740,7 @@ public class TestRMRestart {
// PHASE 2: create new RM and start from old state
// create new RM to represent restart and recover state
MockRM rm2 = new MockRM(conf, memStore);
MockRM rm2 = createMockRM(conf, memStore);
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
resetQueueMetrics(qm2);
assertQueueMetrics(qm2, 0, 0, 0, 0);
@ -1766,7 +1757,7 @@ public class TestRMRestart {
NMContainerStatus status =
TestRMRestart
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
.getAppAttemptId(), 1, ContainerState.COMPLETE);
.getAppAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status), null);
while (loadedApp1.getAppAttempts().size() != 2) {
@ -1795,10 +1786,6 @@ public class TestRMRestart {
// finish the AMs
finishApplicationMaster(loadedApp1, rm2, nm1, am1);
assertQueueMetrics(qm2, 1, 0, 0, 1);
// stop RM's
rm2.stop();
rm1.stop();
}
@ -1836,43 +1823,58 @@ public class TestRMRestart {
hostFile.getAbsolutePath());
writeToHostsFile("");
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
MockRM rm1 = null, rm2 = null;
try {
rm1 = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
Assert
.assertEquals(0,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
String ip = NetUtils.normalizeHostName("localhost");
// Add 2 hosts to exclude list.
writeToHostsFile("host2", ip);
// refresh nodes
rm1.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert
.assertTrue(
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
dispatcher.await();
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
rm1.stop();
rm1 = null;
Assert
.assertEquals(0,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
// restart RM.
rm2 = new MockRM(conf);
rm2.start();
Assert
.assertEquals(2,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
} finally {
if (rm1 != null) {
rm1.stop();
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
Assert
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
String ip = NetUtils.normalizeHostName("localhost");
// Add 2 hosts to exclude list.
writeToHostsFile("host2", ip);
// refresh nodes
rm1.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert
.assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
dispatcher.await();
Assert
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
rm1.stop();
Assert
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
// restart RM.
MockRM rm2 = new MockRM(conf);
rm2.start();
Assert
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
rm2.stop();
if (rm2 != null) {
rm2.stop();
}
}
}
// Test Delegation token is renewed synchronously so that recover events
@ -1887,7 +1889,7 @@ public class TestRMRestart {
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
final MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@ -1910,24 +1912,29 @@ public class TestRMRestart {
nm1.setResourceTrackerService(getResourceTrackerService());
NMContainerStatus status =
TestRMRestart.createNMContainerStatus(
am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status), null);
}
};
}
};
// Re-start RM
rm2.start();
// wait for the 2nd attempt to be started.
RMApp loadedApp0 =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
int timeoutSecs = 0;
while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
Thread.sleep(200);
try {
// Re-start RM
rm2.start();
// wait for the 2nd attempt to be started.
RMApp loadedApp0 =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
int timeoutSecs = 0;
while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
Thread.sleep(200);
}
MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
} finally {
rm2.stop();
}
MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
}
private void writeToHostsFile(String... hosts) throws IOException {

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager
.ParameterizedSchedulerTestBase;
import static org.junit.Assert.fail;
import org.junit.Before;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -74,7 +78,17 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
public class TestClientToAMTokens {
public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
private YarnConfiguration conf;
public TestClientToAMTokens(SchedulerType type) {
super(type);
}
@Before
public void setup() {
conf = getConf();
}
private interface CustomProtocol {
@SuppressWarnings("unused")
@ -151,8 +165,6 @@ public class TestClientToAMTokens {
@Test
public void testClientToAMTokens() throws Exception {
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
@ -267,6 +279,8 @@ public class TestClientToAMTokens {
// Now for an authenticated user
verifyValidToken(conf, am, token);
rm.stop();
}
private void verifyTokenWithTamperedID(final Configuration conf,