YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run with both CS and FS. (Wei Yan and kasha via kasha)
(cherry picked from commit 80d11eb68e
)
Conflicts:
hadoop-yarn-project/CHANGES.txt
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java
This commit is contained in:
parent
685790e027
commit
b579c3d405
|
@ -6,8 +6,6 @@ Release 2.6.0 - 2014-11-15
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN. (Abin
|
|
||||||
Shahab via raviprak)
|
|
||||||
|
|
||||||
YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)
|
YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||||
|
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public abstract class ParameterizedSchedulerTestBase {
|
||||||
|
protected final static String TEST_DIR =
|
||||||
|
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
|
||||||
|
private final static String FS_ALLOC_FILE =
|
||||||
|
new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath();
|
||||||
|
|
||||||
|
private SchedulerType schedulerType;
|
||||||
|
private YarnConfiguration conf = null;
|
||||||
|
|
||||||
|
public enum SchedulerType {
|
||||||
|
CAPACITY, FAIR
|
||||||
|
}
|
||||||
|
|
||||||
|
public ParameterizedSchedulerTestBase(SchedulerType type) {
|
||||||
|
schedulerType = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public YarnConfiguration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<SchedulerType[]> getParameters() {
|
||||||
|
return Arrays.asList(new SchedulerType[][]{
|
||||||
|
{SchedulerType.CAPACITY}, {SchedulerType.FAIR}});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void configureScheduler() throws IOException {
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
switch (schedulerType) {
|
||||||
|
case CAPACITY:
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER,
|
||||||
|
CapacityScheduler.class.getName());
|
||||||
|
break;
|
||||||
|
case FAIR:
|
||||||
|
configureFairScheduler(conf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void configureFairScheduler(YarnConfiguration conf) throws IOException {
|
||||||
|
// Disable queueMaxAMShare limitation for fair scheduler
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -75,13 +75,23 @@ import org.junit.Test;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public class TestRM {
|
public class TestRM extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestRM.class);
|
private static final Log LOG = LogFactory.getLog(TestRM.class);
|
||||||
|
|
||||||
// Milliseconds to sleep for when waiting for something to happen
|
// Milliseconds to sleep for when waiting for something to happen
|
||||||
private final static int WAIT_SLEEP_MS = 100;
|
private final static int WAIT_SLEEP_MS = 100;
|
||||||
|
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
|
||||||
|
public TestRM(SchedulerType type) {
|
||||||
|
super(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
conf = getConf();
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
ClusterMetrics.destroy();
|
ClusterMetrics.destroy();
|
||||||
|
@ -93,7 +103,7 @@ public class TestRM {
|
||||||
public void testGetNewAppId() throws Exception {
|
public void testGetNewAppId() throws Exception {
|
||||||
Logger rootLogger = LogManager.getRootLogger();
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
rootLogger.setLevel(Level.DEBUG);
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
MockRM rm = new MockRM();
|
MockRM rm = new MockRM(conf);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
GetNewApplicationResponse resp = rm.getNewAppId();
|
GetNewApplicationResponse resp = rm.getNewAppId();
|
||||||
|
@ -106,7 +116,7 @@ public class TestRM {
|
||||||
public void testAppWithNoContainers() throws Exception {
|
public void testAppWithNoContainers() throws Exception {
|
||||||
Logger rootLogger = LogManager.getRootLogger();
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
rootLogger.setLevel(Level.DEBUG);
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
MockRM rm = new MockRM();
|
MockRM rm = new MockRM(conf);
|
||||||
rm.start();
|
rm.start();
|
||||||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||||
|
|
||||||
|
@ -128,7 +138,6 @@ public class TestRM {
|
||||||
public void testAppOnMultiNode() throws Exception {
|
public void testAppOnMultiNode() throws Exception {
|
||||||
Logger rootLogger = LogManager.getRootLogger();
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
rootLogger.setLevel(Level.DEBUG);
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
|
||||||
conf.set("yarn.scheduler.capacity.node-locality-delay", "-1");
|
conf.set("yarn.scheduler.capacity.node-locality-delay", "-1");
|
||||||
MockRM rm = new MockRM(conf);
|
MockRM rm = new MockRM(conf);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
@ -188,7 +197,6 @@ public class TestRM {
|
||||||
// corresponding NM Token.
|
// corresponding NM Token.
|
||||||
@Test (timeout = 20000)
|
@Test (timeout = 20000)
|
||||||
public void testNMTokenSentForNormalContainer() throws Exception {
|
public void testNMTokenSentForNormalContainer() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER,
|
conf.set(YarnConfiguration.RM_SCHEDULER,
|
||||||
CapacityScheduler.class.getCanonicalName());
|
CapacityScheduler.class.getCanonicalName());
|
||||||
MockRM rm = new MockRM(conf);
|
MockRM rm = new MockRM(conf);
|
||||||
|
@ -241,7 +249,7 @@ public class TestRM {
|
||||||
|
|
||||||
@Test (timeout = 40000)
|
@Test (timeout = 40000)
|
||||||
public void testNMToken() throws Exception {
|
public void testNMToken() throws Exception {
|
||||||
MockRM rm = new MockRM();
|
MockRM rm = new MockRM(conf);
|
||||||
try {
|
try {
|
||||||
rm.start();
|
rm.start();
|
||||||
MockNM nm1 = rm.registerNode("h1:1234", 10000);
|
MockNM nm1 = rm.registerNode("h1:1234", 10000);
|
||||||
|
@ -425,8 +433,6 @@ public class TestRM {
|
||||||
|
|
||||||
@Test (timeout = 300000)
|
@Test (timeout = 300000)
|
||||||
public void testActivatingApplicationAfterAddingNM() throws Exception {
|
public void testActivatingApplicationAfterAddingNM() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
|
||||||
|
|
||||||
MockRM rm1 = new MockRM(conf);
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
|
||||||
// start like normal because state is empty
|
// start like normal because state is empty
|
||||||
|
@ -472,7 +478,6 @@ public class TestRM {
|
||||||
// is killed or failed, so that client doesn't get the wrong information.
|
// is killed or failed, so that client doesn't get the wrong information.
|
||||||
@Test (timeout = 80000)
|
@Test (timeout = 80000)
|
||||||
public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
|
public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
MockRM rm1 = new MockRM(conf);
|
MockRM rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
@ -525,7 +530,6 @@ public class TestRM {
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
|
public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
|
||||||
MockRM rm1 = new MockRM(conf);
|
MockRM rm1 = new MockRM(conf);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
|
@ -558,7 +562,6 @@ public class TestRM {
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
public void testApplicationKillAtAcceptedState() throws Exception {
|
public void testApplicationKillAtAcceptedState() throws Exception {
|
||||||
|
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
|
||||||
final Dispatcher dispatcher = new AsyncDispatcher() {
|
final Dispatcher dispatcher = new AsyncDispatcher() {
|
||||||
@Override
|
@Override
|
||||||
public EventHandler getEventHandler() {
|
public EventHandler getEventHandler() {
|
||||||
|
@ -635,15 +638,4 @@ public class TestRM {
|
||||||
Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted());
|
Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
TestRM t = new TestRM();
|
|
||||||
t.testGetNewAppId();
|
|
||||||
t.testAppWithNoContainers();
|
|
||||||
t.testAppOnMultiNode();
|
|
||||||
t.testNMToken();
|
|
||||||
t.testActivatingApplicationAfterAddingNM();
|
|
||||||
t.testInvalidateAMHostPortWhenAMFailedOrKilled();
|
|
||||||
t.testInvalidatedAMHostPortOnAMRestart();
|
|
||||||
t.testApplicationKillAtAcceptedState();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -108,7 +107,7 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestRMRestart {
|
public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
private final static File TEMP_DIR = new File(System.getProperty(
|
private final static File TEMP_DIR = new File(System.getProperty(
|
||||||
"test.build.data", "/tmp"), "decommision");
|
"test.build.data", "/tmp"), "decommision");
|
||||||
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
||||||
|
@ -116,12 +115,17 @@ public class TestRMRestart {
|
||||||
|
|
||||||
// Fake rmAddr for token-renewal
|
// Fake rmAddr for token-renewal
|
||||||
private static InetSocketAddress rmAddr;
|
private static InetSocketAddress rmAddr;
|
||||||
|
private List<MockRM> rms = new ArrayList<MockRM>();
|
||||||
|
|
||||||
|
public TestRMRestart(SchedulerType type) {
|
||||||
|
super(type);
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws UnknownHostException {
|
public void setup() throws IOException {
|
||||||
|
conf = getConf();
|
||||||
Logger rootLogger = LogManager.getRootLogger();
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
rootLogger.setLevel(Level.DEBUG);
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
conf = new YarnConfiguration();
|
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
@ -131,9 +135,24 @@ public class TestRMRestart {
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
|
for (MockRM rm : rms) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
rms.clear();
|
||||||
|
|
||||||
TEMP_DIR.delete();
|
TEMP_DIR.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return a new MockRM that will be stopped at the end of the test.
|
||||||
|
*/
|
||||||
|
private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) {
|
||||||
|
MockRM rm = new MockRM(conf, store);
|
||||||
|
rms.add(rm);
|
||||||
|
return rm;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
@Test (timeout=180000)
|
@Test (timeout=180000)
|
||||||
public void testRMRestart() throws Exception {
|
public void testRMRestart() throws Exception {
|
||||||
|
@ -150,7 +169,7 @@ public class TestRMRestart {
|
||||||
// PHASE 1: create state in an RM
|
// PHASE 1: create state in an RM
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
|
|
||||||
// start like normal because state is empty
|
// start like normal because state is empty
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
@ -246,7 +265,7 @@ public class TestRMRestart {
|
||||||
// PHASE 2: create new RM and start from old state
|
// PHASE 2: create new RM and start from old state
|
||||||
|
|
||||||
// create new RM to represent restart and recover state
|
// create new RM to represent restart and recover state
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
@ -414,7 +433,7 @@ public class TestRMRestart {
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -443,13 +462,11 @@ public class TestRMRestart {
|
||||||
rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
// assert the previous AM state is loaded back on RM recovery.
|
// assert the previous AM state is loaded back on RM recovery.
|
||||||
|
|
||||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -473,7 +490,7 @@ public class TestRMRestart {
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
final MockRM rm1 = new MockRM(conf, memStore);
|
final MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
|
||||||
|
@ -497,8 +514,7 @@ public class TestRMRestart {
|
||||||
.getAppAttemptState(), RMAppAttemptState.RUNNING);
|
.getAppAttemptState(), RMAppAttemptState.RUNNING);
|
||||||
|
|
||||||
// start new RM.
|
// start new RM.
|
||||||
MockRM rm2 = null;
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2 = new MockRM(conf, memStore);
|
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
@ -535,8 +551,7 @@ public class TestRMRestart {
|
||||||
// Now restart RM ...
|
// Now restart RM ...
|
||||||
// Setting AMLivelinessMonitor interval to be 10 Secs.
|
// Setting AMLivelinessMonitor interval to be 10 Secs.
|
||||||
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
|
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
|
||||||
MockRM rm3 = null;
|
MockRM rm3 = createMockRM(conf, memStore);
|
||||||
rm3 = new MockRM(conf, memStore);
|
|
||||||
rm3.start();
|
rm3.start();
|
||||||
|
|
||||||
// Wait for RM to process all the events as a part of rm recovery.
|
// Wait for RM to process all the events as a part of rm recovery.
|
||||||
|
@ -583,8 +598,7 @@ public class TestRMRestart {
|
||||||
memStore.getState().getApplicationState().get(app2.getApplicationId())
|
memStore.getState().getApplicationState().get(app2.getApplicationId())
|
||||||
.getAttemptCount());
|
.getAttemptCount());
|
||||||
|
|
||||||
MockRM rm4 = null;
|
MockRM rm4 = createMockRM(conf, memStore);
|
||||||
rm4 = new MockRM(conf, memStore);
|
|
||||||
rm4.start();
|
rm4.start();
|
||||||
|
|
||||||
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
|
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||||
|
@ -640,7 +654,7 @@ public class TestRMRestart {
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
|
||||||
RMApp app0 = rm1.submitApp(200);
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
@ -657,7 +671,7 @@ public class TestRMRestart {
|
||||||
Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
|
Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
|
@ -679,7 +693,7 @@ public class TestRMRestart {
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -701,7 +715,7 @@ public class TestRMRestart {
|
||||||
appState.getAttempt(am0.getApplicationAttemptId()).getState());
|
appState.getAttempt(am0.getApplicationAttemptId()).getState());
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||||
|
@ -714,8 +728,6 @@ public class TestRMRestart {
|
||||||
.contains("Failing the application."));
|
.contains("Failing the application."));
|
||||||
// failed diagnostics from attempt is lost because the diagnostics from
|
// failed diagnostics from attempt is lost because the diagnostics from
|
||||||
// attempt is not yet available by the time app is saving the app state.
|
// attempt is not yet available by the time app is saving the app state.
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -729,7 +741,7 @@ public class TestRMRestart {
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -751,7 +763,7 @@ public class TestRMRestart {
|
||||||
appState.getAttempt(am0.getApplicationAttemptId()).getState());
|
appState.getAttempt(am0.getApplicationAttemptId()).getState());
|
||||||
|
|
||||||
// restart rm
|
// restart rm
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
rm2.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
rm2.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
||||||
|
@ -762,8 +774,6 @@ public class TestRMRestart {
|
||||||
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
|
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
|
||||||
Assert.assertEquals(app0.getDiagnostics().toString(),
|
Assert.assertEquals(app0.getDiagnostics().toString(),
|
||||||
appReport.getDiagnostics());
|
appReport.getDiagnostics());
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -786,7 +796,7 @@ public class TestRMRestart {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
// create app
|
// create app
|
||||||
RMApp app0 =
|
RMApp app0 =
|
||||||
|
@ -798,7 +808,7 @@ public class TestRMRestart {
|
||||||
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
||||||
|
|
||||||
// restart rm
|
// restart rm
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
RMApp loadedApp0 =
|
RMApp loadedApp0 =
|
||||||
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
|
@ -817,7 +827,7 @@ public class TestRMRestart {
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -844,7 +854,7 @@ public class TestRMRestart {
|
||||||
Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime());
|
Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime());
|
||||||
|
|
||||||
// restart rm
|
// restart rm
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
// verify application report returns the same app info as the app info
|
// verify application report returns the same app info as the app info
|
||||||
|
@ -853,9 +863,6 @@ public class TestRMRestart {
|
||||||
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
|
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
|
||||||
appReport.getFinalApplicationStatus());
|
appReport.getFinalApplicationStatus());
|
||||||
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
|
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
|
||||||
|
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -865,7 +872,7 @@ public class TestRMRestart {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -902,7 +909,7 @@ public class TestRMRestart {
|
||||||
return spy(super.createRMAppManager());
|
return spy(super.createRMAppManager());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
rms.add(rm2);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
GetApplicationsRequest request1 =
|
GetApplicationsRequest request1 =
|
||||||
|
@ -949,9 +956,6 @@ public class TestRMRestart {
|
||||||
// check application summary is logged for the completed apps after RM restart.
|
// check application summary is logged for the completed apps after RM restart.
|
||||||
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
|
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
|
||||||
isA(ApplicationId.class));
|
isA(ApplicationId.class));
|
||||||
|
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||||
|
@ -1017,7 +1021,7 @@ public class TestRMRestart {
|
||||||
|
|
||||||
Map<ApplicationId, ApplicationState> rmAppState =
|
Map<ApplicationId, ApplicationState> rmAppState =
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -1055,7 +1059,7 @@ public class TestRMRestart {
|
||||||
// Setting AMLivelinessMonitor interval to be 3 Secs.
|
// Setting AMLivelinessMonitor interval to be 3 Secs.
|
||||||
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
|
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
|
||||||
// start new RM
|
// start new RM
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
// verify that maxAppAttempts is set to global value
|
// verify that maxAppAttempts is set to global value
|
||||||
|
@ -1074,10 +1078,6 @@ public class TestRMRestart {
|
||||||
Assert.assertEquals(RMAppState.FAILED,
|
Assert.assertEquals(RMAppState.FAILED,
|
||||||
rmAppState.get(app1.getApplicationId()).getState());
|
rmAppState.get(app1.getApplicationId()).getState());
|
||||||
Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState());
|
Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState());
|
||||||
|
|
||||||
// stop the RM
|
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -1159,10 +1159,6 @@ public class TestRMRestart {
|
||||||
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
|
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
|
||||||
Assert.assertEquals(tokenSet, rm2.getRMContext()
|
Assert.assertEquals(tokenSet, rm2.getRMContext()
|
||||||
.getDelegationTokenRenewer().getDelegationTokens());
|
.getDelegationTokenRenewer().getDelegationTokens());
|
||||||
|
|
||||||
// stop the RM
|
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
|
private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
|
||||||
|
@ -1258,8 +1254,6 @@ public class TestRMRestart {
|
||||||
Assert.assertArrayEquals(amrmToken.getPassword(),
|
Assert.assertArrayEquals(amrmToken.getPassword(),
|
||||||
rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
|
rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
|
||||||
amrmToken.decodeIdentifier()));
|
amrmToken.decodeIdentifier()));
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -1407,10 +1401,6 @@ public class TestRMRestart {
|
||||||
.getAllTokens();
|
.getAllTokens();
|
||||||
Assert.assertFalse(allTokensRM2.containsKey(dtId1));
|
Assert.assertFalse(allTokensRM2.containsKey(dtId1));
|
||||||
Assert.assertFalse(rmDTState.containsKey(dtId1));
|
Assert.assertFalse(rmDTState.containsKey(dtId1));
|
||||||
|
|
||||||
// stop the RM
|
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is to test submit an application to the new RM with the old delegation
|
// This is to test submit an application to the new RM with the old delegation
|
||||||
|
@ -1471,7 +1461,7 @@ public class TestRMRestart {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
final MockRM rm1 = new MockRM(conf, memStore);
|
final MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
|
||||||
// create apps.
|
// create apps.
|
||||||
|
@ -1517,7 +1507,7 @@ public class TestRMRestart {
|
||||||
RMState rmState = memStore.getState();
|
RMState rmState = memStore.getState();
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -1528,7 +1518,7 @@ public class TestRMRestart {
|
||||||
MockAM am0 = launchAM(app0, rm1, nm1);
|
MockAM am0 = launchAM(app0, rm1, nm1);
|
||||||
finishApplicationMaster(app0, rm1, nm1, am0);
|
finishApplicationMaster(app0, rm1, nm1, am0);
|
||||||
|
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
|
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
|
||||||
|
@ -1550,9 +1540,6 @@ public class TestRMRestart {
|
||||||
Assert.assertNull(rm2.getRMContext().getRMApps()
|
Assert.assertNull(rm2.getRMContext().getRMApps()
|
||||||
.get(app0.getApplicationId()));
|
.get(app0.getApplicationId()));
|
||||||
Assert.assertNull(rmAppState.get(app0.getApplicationId()));
|
Assert.assertNull(rmAppState.get(app0.getApplicationId()));
|
||||||
|
|
||||||
rm1.stop();
|
|
||||||
rm2.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is to test RM does not get hang on shutdown.
|
// This is to test RM does not get hang on shutdown.
|
||||||
|
@ -1569,7 +1556,7 @@ public class TestRMRestart {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
MockRM rm1 = null;
|
MockRM rm1 = null;
|
||||||
try {
|
try {
|
||||||
rm1 = new MockRM(conf, memStore);
|
rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -1587,7 +1574,7 @@ public class TestRMRestart {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -1703,7 +1690,11 @@ public class TestRMRestart {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
try {
|
||||||
rm1.start();
|
rm1.start();
|
||||||
|
} finally {
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
|
@ -1716,7 +1707,7 @@ public class TestRMRestart {
|
||||||
|
|
||||||
// PHASE 1: create state in an RM
|
// PHASE 1: create state in an RM
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -1754,7 +1745,7 @@ public class TestRMRestart {
|
||||||
|
|
||||||
// PHASE 2: create new RM and start from old state
|
// PHASE 2: create new RM and start from old state
|
||||||
// create new RM to represent restart and recover state
|
// create new RM to represent restart and recover state
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = createMockRM(conf, memStore);
|
||||||
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
|
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
|
||||||
resetQueueMetrics(qm2);
|
resetQueueMetrics(qm2);
|
||||||
assertQueueMetrics(qm2, 0, 0, 0, 0);
|
assertQueueMetrics(qm2, 0, 0, 0, 0);
|
||||||
|
@ -1799,10 +1790,6 @@ public class TestRMRestart {
|
||||||
// finish the AMs
|
// finish the AMs
|
||||||
finishApplicationMaster(loadedApp1, rm2, nm1, am1);
|
finishApplicationMaster(loadedApp1, rm2, nm1, am1);
|
||||||
assertQueueMetrics(qm2, 1, 0, 0, 1);
|
assertQueueMetrics(qm2, 1, 0, 0, 1);
|
||||||
|
|
||||||
// stop RM's
|
|
||||||
rm2.stop();
|
|
||||||
rm1.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1840,7 +1827,9 @@ public class TestRMRestart {
|
||||||
hostFile.getAbsolutePath());
|
hostFile.getAbsolutePath());
|
||||||
writeToHostsFile("");
|
writeToHostsFile("");
|
||||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
MockRM rm1 = new MockRM(conf) {
|
MockRM rm1 = null, rm2 = null;
|
||||||
|
try {
|
||||||
|
rm1 = new MockRM(conf) {
|
||||||
@Override
|
@Override
|
||||||
protected Dispatcher createDispatcher() {
|
protected Dispatcher createDispatcher() {
|
||||||
return dispatcher;
|
return dispatcher;
|
||||||
|
@ -1850,7 +1839,8 @@ public class TestRMRestart {
|
||||||
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
||||||
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
.assertEquals(0,
|
||||||
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
String ip = NetUtils.normalizeHostName("localhost");
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
// Add 2 hosts to exclude list.
|
// Add 2 hosts to exclude list.
|
||||||
writeToHostsFile("host2", ip);
|
writeToHostsFile("host2", ip);
|
||||||
|
@ -1859,25 +1849,37 @@ public class TestRMRestart {
|
||||||
rm1.getNodesListManager().refreshNodes(conf);
|
rm1.getNodesListManager().refreshNodes(conf);
|
||||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
Assert
|
Assert
|
||||||
.assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
.assertTrue(
|
||||||
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
Assert.assertTrue("The decommisioned metrics are not updated",
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
.assertEquals(2,
|
||||||
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
|
rm1 = null;
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
.assertEquals(0,
|
||||||
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
|
||||||
// restart RM.
|
// restart RM.
|
||||||
MockRM rm2 = new MockRM(conf);
|
rm2 = new MockRM(conf);
|
||||||
rm2.start();
|
rm2.start();
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
.assertEquals(2,
|
||||||
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
} finally {
|
||||||
|
if (rm1 != null) {
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
if (rm2 != null) {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Test Delegation token is renewed synchronously so that recover events
|
// Test Delegation token is renewed synchronously so that recover events
|
||||||
// can be processed before any other external incoming events, specifically
|
// can be processed before any other external incoming events, specifically
|
||||||
|
@ -1891,7 +1893,7 @@ public class TestRMRestart {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
|
||||||
// start RM
|
// start RM
|
||||||
MockRM rm1 = new MockRM(conf, memStore);
|
MockRM rm1 = createMockRM(conf, memStore);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
final MockNM nm1 =
|
final MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
@ -1920,6 +1922,8 @@ public class TestRMRestart {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
// Re-start RM
|
// Re-start RM
|
||||||
rm2.start();
|
rm2.start();
|
||||||
|
|
||||||
|
@ -1932,6 +1936,9 @@ public class TestRMRestart {
|
||||||
}
|
}
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
|
||||||
MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
|
MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
|
||||||
|
} finally {
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeToHostsFile(String... hosts) throws IOException {
|
private void writeToHostsFile(String... hosts) throws IOException {
|
||||||
|
|
|
@ -18,7 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager
|
||||||
|
.ParameterizedSchedulerTestBase;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import org.junit.Before;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -79,7 +83,17 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestClientToAMTokens {
|
public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
|
||||||
|
public TestClientToAMTokens(SchedulerType type) {
|
||||||
|
super(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
conf = getConf();
|
||||||
|
}
|
||||||
|
|
||||||
private interface CustomProtocol {
|
private interface CustomProtocol {
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
|
@ -166,8 +180,6 @@ public class TestClientToAMTokens {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClientToAMTokens() throws Exception {
|
public void testClientToAMTokens() throws Exception {
|
||||||
|
|
||||||
final Configuration conf = new Configuration();
|
|
||||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
|
Loading…
Reference in New Issue